[skip ci] update benchmark scripts on 0.10.4 (#4213)

* [skip ci] update ci/jenkinsfile to 0.10.4

Signed-off-by: zw <zw@milvus.io>

* [skip ci] update version in test case

Signed-off-by: zw <zw@milvus.io>

* [skip ci] update benchmark scripts

Signed-off-by: zw <zw@milvus.io>

* [skip ci] add comments in scripts

Signed-off-by: zw <zw@milvus.io>

* [skip ci] fix style in benchmark

Signed-off-by: zw <zw@milvus.io>

* [skip ci] fix style in benchmark

Signed-off-by: zw <zw@milvus.io>

* [skip ci] fix style in benchmark

Signed-off-by: zw <zw@milvus.io>

* [skip ci] fix style in benchmark

Signed-off-by: zw <zw@milvus.io>

* [skip ci] fix style in benchmark

Signed-off-by: zw <zw@milvus.io>

* [skip ci] fix style in benchmark

Signed-off-by: zw <zw@milvus.io>

* [skip ci] fix style in benchmark

Signed-off-by: zw <zw@milvus.io>

Co-authored-by: zw <zw@milvus.io>
pull/4353/head
del-zhenwu 2020-12-01 11:30:29 +08:00 committed by GitHub
parent bf5fdc3131
commit de0669f9dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 1980 additions and 662 deletions

View File

@ -1,12 +1,12 @@
try {
def result = sh script: "helm status benchmark-test-${env.JOB_NAME}-${env.BUILD_NUMBER}", returnStatus: true
def result = sh script: "helm status -n milvus ${env.HELM_RELEASE_NAME}", returnStatus: true
if (!result) {
sh "helm del --purge benchmark-test-${env.JOB_NAME}-${env.BUILD_NUMBER}"
sh "helm uninstall -n milvus ${env.HELM_RELEASE_NAME}"
}
} catch (exc) {
def result = sh script: "helm status benchmark-test-${env.JOB_NAME}-${env.BUILD_NUMBER}", returnStatus: true
def result = sh script: "helm status -n milvus ${env.HELM_RELEASE_NAME}", returnStatus: true
if (!result) {
sh "helm del --purge benchmark-test-${env.JOB_NAME}-${env.BUILD_NUMBER}"
sh "helm uninstall -n milvus ${env.HELM_RELEASE_NAME}"
}
throw exc
}

View File

@ -0,0 +1,13 @@
try {
def result = sh script: "helm status -n milvus ${env.HELM_SHARDS_RELEASE_NAME}", returnStatus: true
if (!result) {
sh "helm uninstall -n milvus ${env.HELM_SHARDS_RELEASE_NAME}"
}
} catch (exc) {
def result = sh script: "helm status -n milvus ${env.HELM_SHARDS_RELEASE_NAME}", returnStatus: true
if (!result) {
sh "helm uninstall -n milvus ${env.HELM_SHARDS_RELEASE_NAME}"
}
throw exc
}

View File

@ -0,0 +1,21 @@
timeout(time: 12, unit: 'HOURS') {
try {
dir ("milvus-helm") {
// sh 'helm init --client-only --skip-refresh --stable-repo-url https://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts'
// sh 'helm repo update'
checkout([$class: 'GitSCM', branches: [[name: "${HELM_BRANCH}"]], userRemoteConfigs: [[url: "${HELM_URL}", name: 'origin', refspec: "+refs/heads/${HELM_BRANCH}:refs/remotes/origin/${HELM_BRANCH}"]]])
}
dir ("milvus_benchmark") {
print "Git clone url: ${TEST_URL}:${TEST_BRANCH}"
checkout([$class: 'GitSCM', branches: [[name: "${TEST_BRANCH}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "${TEST_URL}", name: 'origin', refspec: "+refs/heads/${TEST_BRANCH}:refs/remotes/origin/${TEST_BRANCH}"]]])
print "Install requirements"
// sh "python3 -m pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com"
sh "python3 -m pip install -r requirements.txt"
sh "python3 -m pip install git+${TEST_LIB_URL}"
sh "python3 main.py --image-version=${params.IMAGE_VERSION} --schedule-conf=scheduler/${params.SHARDS_CONFIG_FILE} --deploy-mode=${params.DEPLOY_MODE}"
}
} catch (exc) {
echo 'Deploy SHARDS Test Failed !'
throw exc
}
}

View File

@ -1,21 +1,19 @@
timeout(time: 4000, unit: 'MINUTES') {
try {
dir ("milvus-helm") {
// sh 'helm init --client-only --skip-refresh --stable-repo-url https://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts'
// sh 'helm repo update'
checkout([$class: 'GitSCM', branches: [[name: "${HELM_BRANCH}"]], userRemoteConfigs: [[url: "${HELM_URL}", name: 'origin', refspec: "+refs/heads/${HELM_BRANCH}:refs/remotes/origin/${HELM_BRANCH}"]]])
}
dir ("milvus_benchmark") {
print "Git clone url: ${TEST_URL}:${TEST_BRANCH}"
checkout([$class: 'GitSCM', branches: [[name: "${TEST_BRANCH}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "${TEST_URL}", name: 'origin', refspec: "+refs/heads/${TEST_BRANCH}:refs/remotes/origin/${TEST_BRANCH}"]]])
print "Install requirements"
// sh "python3 -m pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com"
sh "python3 -m pip install -r requirements.txt"
sh "python3 -m pip install git+${TEST_LIB_URL}"
sh "python3 main.py --image-version=${params.IMAGE_VERSION} --schedule-conf=scheduler/${params.CONFIG_FILE}"
}
} catch (exc) {
echo 'Deploy Test Failed !'
throw exc
try {
dir ("milvus-helm") {
// sh 'helm init --client-only --skip-refresh --stable-repo-url https://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts'
// sh 'helm repo update'
checkout([$class: 'GitSCM', branches: [[name: "${HELM_BRANCH}"]], userRemoteConfigs: [[url: "${HELM_URL}", name: 'origin', refspec: "+refs/heads/${HELM_BRANCH}:refs/remotes/origin/${HELM_BRANCH}"]]])
}
dir ("milvus_benchmark") {
print "Git clone url: ${TEST_URL}:${TEST_BRANCH}"
checkout([$class: 'GitSCM', branches: [[name: "${TEST_BRANCH}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "${TEST_URL}", name: 'origin', refspec: "+refs/heads/${TEST_BRANCH}:refs/remotes/origin/${TEST_BRANCH}"]]])
print "Install requirements"
// sh "python3 -m pip install -r requirements.txt -i http://pypi.douban.com/simple --trusted-host pypi.douban.com"
sh "python3 -m pip install -r requirements.txt"
sh "python3 -m pip install git+${TEST_LIB_URL}"
sh "python3 main.py --image-version=${params.IMAGE_VERSION} --schedule-conf=scheduler/${params.CONFIG_FILE} --deploy-mode=${params.DEPLOY_MODE}"
}
} catch (exc) {
echo 'Deploy Test Failed !'
throw exc
}

View File

@ -1,4 +1,4 @@
timeout(time: 15, unit: 'MINUTES') {
timeout(time: 30, unit: 'MINUTES') {
def imageName = "milvus/engine:${DOCKER_VERSION}"
def remoteImageName = "milvusdb/daily-build:${REMOTE_DOCKER_VERSION}"
def localDockerRegistryImage = "${params.LOCAL_DOKCER_REGISTRY_URL}/${imageName}"

View File

@ -6,17 +6,21 @@ pipeline {
}
parameters{
string defaultValue: 'master', description: 'server image version', name: 'IMAGE_VERSION', trim: true
string defaultValue: '080_data.json', description: 'test suite config yaml', name: 'CONFIG_FILE', trim: true
string defaultValue: '0.10.4', description: 'server image version', name: 'IMAGE_VERSION', trim: true
choice choices: ['single', 'shards'], description: 'server deploy mode', name: 'DEPLOY_MODE'
string defaultValue: '010_data.json', description: 'test suite config yaml', name: 'CONFIG_FILE', trim: true
string defaultValue: 'shards.json', description: 'shards test suite config yaml', name: 'SHARDS_CONFIG_FILE', trim: true
string defaultValue: '09509e53-9125-4f5d-9ce8-42855987ad67', description: 'git credentials', name: 'GIT_USER', trim: true
}
environment {
HELM_URL = "https://github.com/milvus-io/milvus-helm.git"
HELM_BRANCH = "master"
HELM_BRANCH = "0.10.3"
TEST_URL = "git@192.168.1.105:Test/milvus_benchmark.git"
TEST_BRANCH = "master"
TEST_BRANCH = "0.10.4"
TEST_LIB_URL = "http://192.168.1.105:6060/Test/milvus_metrics.git"
HELM_RELEASE_NAME = "milvus-benchmark-test-${env.BUILD_NUMBER}"
HELM_SHARDS_RELEASE_NAME = "milvus-shards-benchmark-test-${env.BUILD_NUMBER}"
}
stages {
@ -35,7 +39,7 @@ pipeline {
spec:
containers:
- name: milvus-test-env
image: registry.zilliz.com/milvus/milvus-test-env:v0.2
image: registry.zilliz.com/milvus/milvus-test-env:v0.3
command:
- cat
tty: true
@ -82,7 +86,8 @@ pipeline {
script {
boolean isNightlyTest = isTimeTriggeredBuild()
if (isNightlyTest) {
build job: 'milvus-publish-daily-docker', wait: false
// build job: 'milvus-publish-daily-docker', parameters: [[$class: 'StringParameterValue', name: 'BRANCH', value: "${params.IMAGE_VERSION}"]], wait: false
build job: 'milvus-publish-daily-docker', parameters: [string(name: 'LOCAL_DOKCER_REGISTRY_URL', value: 'registry.zilliz.com'), string(name: 'REMOTE_DOKCER_REGISTRY_URL', value: 'registry-1.docker.io'), string(name: 'REMOTE_DOCKER_CREDENTIALS_ID', value: 'milvus-docker-access-token'), string(name: 'BRANCH', value: String.valueOf(IMAGE_VERSION))], wait: false
} else {
echo "Skip publish daily docker images ..."
}
@ -93,11 +98,13 @@ pipeline {
stage("Deploy Test") {
steps {
gitlabCommitStatus(name: 'Deploy Test') {
container('milvus-test-env') {
script {
print "In Deploy Test Stage"
container('milvus-test-env') {
script {
print "In Deploy Test Stage"
if ("${params.DEPLOY_MODE}" == "single") {
load "${env.WORKSPACE}/ci/jenkinsfile/deploy_test.groovy"
} else {
load "${env.WORKSPACE}/ci/jenkinsfile/deploy_shards_test.groovy"
}
}
}
@ -106,10 +113,12 @@ pipeline {
stage ("Cleanup Env") {
steps {
gitlabCommitStatus(name: 'Cleanup Env') {
container('milvus-test-env') {
script {
container('milvus-test-env') {
script {
if ("${params.DEPLOY_MODE}" == "single") {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup.groovy"
} else {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanupShards.groovy"
}
}
}
@ -117,13 +126,6 @@ pipeline {
}
}
post {
always {
container('milvus-test-env') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup.groovy"
}
}
}
success {
script {
echo "Milvus benchmark test success !"

View File

@ -9,6 +9,7 @@ pipeline {
string defaultValue: 'registry.zilliz.com', description: 'Local Docker registry URL', name: 'LOCAL_DOKCER_REGISTRY_URL', trim: true
string defaultValue: 'registry-1.docker.io', description: 'Remote Docker registry URL', name: 'REMOTE_DOKCER_REGISTRY_URL', trim: true
string defaultValue: 'milvus-docker-access-token', description: 'Remote Docker credentials id', name: 'REMOTE_DOCKER_CREDENTIALS_ID', trim: true
string(defaultValue: "master", description: 'Milvus server version', name: 'BRANCH')
}
environment {
@ -41,9 +42,9 @@ pipeline {
stages {
stage("Publish Docker Images") {
environment {
DOCKER_VERSION = "master-${BINARY_VERSION}-${OS_NAME}-release"
REMOTE_DOCKER_VERSION = "${OS_NAME}-${BINARY_VERSION}-${DAILY_BUILD_VERSION}"
REMOTE_DOCKER_LATEST_VERSION = "${OS_NAME}-${BINARY_VERSION}-latest"
DOCKER_VERSION = "${params.BRANCH}-${BINARY_VERSION}-${OS_NAME}-release"
REMOTE_DOCKER_VERSION = "${params.BRANCH}-${OS_NAME}-${BINARY_VERSION}-${DAILY_BUILD_VERSION}"
REMOTE_DOCKER_LATEST_VERSION = "${params.BRANCH}-${OS_NAME}-${BINARY_VERSION}-latest"
}
agent {

View File

@ -6,11 +6,11 @@ import json
import time, datetime
from multiprocessing import Process
from milvus import Milvus, IndexType, MetricType
import utils
logger = logging.getLogger("milvus_benchmark.client")
SERVER_HOST_DEFAULT = "127.0.0.1"
# SERVER_HOST_DEFAULT = "192.168.1.130"
SERVER_PORT_DEFAULT = 19530
INDEX_MAP = {
"flat": IndexType.FLAT,
@ -22,6 +22,16 @@ INDEX_MAP = {
"hnsw": IndexType.HNSW,
"annoy": IndexType.ANNOY
}
METRIC_MAP = {
"l2": MetricType.L2,
"ip": MetricType.IP,
"jaccard": MetricType.JACCARD,
"hamming": MetricType.HAMMING,
"sub": MetricType.SUBSTRUCTURE,
"super": MetricType.SUPERSTRUCTURE
}
epsilon = 0.1
def time_wrapper(func):
@ -37,39 +47,63 @@ def time_wrapper(func):
return wrapper
def metric_type_to_str(metric_type):
for key, value in METRIC_MAP.items():
if value == metric_type:
return key
raise Exception("metric_type: %s mapping not found" % metric_type)
class MilvusClient(object):
def __init__(self, collection_name=None, ip=None, port=None, timeout=60):
def __init__(self, collection_name=None, host=None, port=None, timeout=60):
"""
Milvus client wrapper for python-sdk.
Default timeout set 60s
"""
self._collection_name = collection_name
try:
i = 1
start_time = time.time()
if not ip:
self._milvus = Milvus(
host = SERVER_HOST_DEFAULT,
port = SERVER_PORT_DEFAULT)
else:
# retry connect for remote server
while time.time() < start_time + timeout:
try:
self._milvus = Milvus(
host = ip,
port = port)
if self._milvus.server_status():
logger.debug("Try connect times: %d, %s" % (i, round(time.time() - start_time, 2)))
break
except Exception as e:
logger.debug("Milvus connect failed")
i = i + 1
if not host:
host = SERVER_HOST_DEFAULT
if not port:
port = SERVER_PORT_DEFAULT
logger.debug(host)
logger.debug(port)
# retry connect for remote server
i = 0
while time.time() < start_time + timeout:
try:
self._milvus = Milvus(host=host, port=port, try_connect=False, pre_ping=False)
if self._milvus.server_status():
logger.debug("Try connect times: %d, %s" % (i, round(time.time() - start_time, 2)))
break
except Exception as e:
logger.debug("Milvus connect failed: %d times" % i)
i = i + 1
if time.time() > start_time + timeout:
raise Exception("Server connect timeout")
except Exception as e:
raise e
self._metric_type = None
if self._collection_name and self.exists_collection():
self._metric_type = metric_type_to_str(self.describe()[1].metric_type)
self._dimension = self.describe()[1].dimension
def __str__(self):
return 'Milvus collection %s' % self._collection_name
def set_collection(self, name):
self._collection_name = name
def check_status(self, status):
if not status.OK():
logger.error(self._collection_name)
logger.error(status.message)
logger.error(self._milvus.server_status())
logger.error(self.count())
raise Exception("Status not ok")
def check_result_ids(self, result):
@ -82,20 +116,9 @@ class MilvusClient(object):
def create_collection(self, collection_name, dimension, index_file_size, metric_type):
if not self._collection_name:
self._collection_name = collection_name
if metric_type == "l2":
metric_type = MetricType.L2
elif metric_type == "ip":
metric_type = MetricType.IP
elif metric_type == "jaccard":
metric_type = MetricType.JACCARD
elif metric_type == "hamming":
metric_type = MetricType.HAMMING
elif metric_type == "sub":
metric_type = MetricType.SUBSTRUCTURE
elif metric_type == "super":
metric_type = MetricType.SUPERSTRUCTURE
else:
logger.error("Not supported metric_type: %s" % metric_type)
if metric_type not in METRIC_MAP.keys():
raise Exception("Not supported metric_type: %s" % metric_type)
metric_type = METRIC_MAP[metric_type]
create_param = {'collection_name': collection_name,
'dimension': dimension,
'index_file_size': index_file_size,
@ -103,25 +126,116 @@ class MilvusClient(object):
status = self._milvus.create_collection(create_param)
self.check_status(status)
def create_partition(self, tag_name):
status = self._milvus.create_partition(self._collection_name, tag_name)
self.check_status(status)
def drop_partition(self, tag_name):
status = self._milvus.drop_partition(self._collection_name, tag_name)
self.check_status(status)
def list_partitions(self):
status, tags = self._milvus.list_partitions(self._collection_name)
self.check_status(status)
return tags
@time_wrapper
def insert(self, X, ids=None):
status, result = self._milvus.add_vectors(self._collection_name, X, ids)
def insert(self, X, ids=None, collection_name=None):
if collection_name is None:
collection_name = self._collection_name
status, result = self._milvus.insert(collection_name, X, ids)
self.check_status(status)
return status, result
def insert_rand(self):
insert_xb = random.randint(1, 100)
X = [[random.random() for _ in range(self._dimension)] for _ in range(insert_xb)]
X = utils.normalize(self._metric_type, X)
count_before = self.count()
status, _ = self.insert(X)
self.check_status(status)
self.flush()
if count_before + insert_xb != self.count():
raise Exception("Assert failed after inserting")
def get_rand_ids(self, length):
while True:
status, stats = self._milvus.get_collection_stats(self._collection_name)
self.check_status(status)
segments = stats["partitions"][0]["segments"]
# random choice one segment
segment = random.choice(segments)
status, segment_ids = self._milvus.list_id_in_segment(self._collection_name, segment["name"])
if not status.OK():
logger.error(status.message)
continue
if len(segment_ids):
break
if length >= len(segment_ids):
logger.debug("Reset length: %d" % len(segment_ids))
return segment_ids
return random.sample(segment_ids, length)
def get_rand_ids_each_segment(self, length):
res = []
status, stats = self._milvus.get_collection_stats(self._collection_name)
self.check_status(status)
segments = stats["partitions"][0]["segments"]
segments_num = len(segments)
# random choice from each segment
for segment in segments:
status, segment_ids = self._milvus.list_id_in_segment(self._collection_name, segment["name"])
self.check_status(status)
res.extend(segment_ids[:length])
return segments_num, res
def get_rand_entities(self, length):
ids = self.get_rand_ids(length)
status, get_res = self._milvus.get_entity_by_id(self._collection_name, ids)
self.check_status(status)
return ids, get_res
@time_wrapper
def delete_vectors(self, ids):
status = self._milvus.delete_by_id(self._collection_name, ids)
def get_entities(self, get_ids):
status, get_res = self._milvus.get_entity_by_id(self._collection_name, get_ids)
self.check_status(status)
return get_res
@time_wrapper
def delete(self, ids, collection_name=None):
if collection_name is None:
collection_name = self._collection_name
status = self._milvus.delete_entity_by_id(collection_name, ids)
self.check_status(status)
def delete_rand(self):
delete_id_length = random.randint(1, 100)
count_before = self.count()
logger.info("%s: length to delete: %d" % (self._collection_name, delete_id_length))
delete_ids = self.get_rand_ids(delete_id_length)
self.delete(delete_ids)
self.flush()
logger.info("%s: count after delete: %d" % (self._collection_name, self.count()))
status, get_res = self._milvus.get_entity_by_id(self._collection_name, delete_ids)
self.check_status(status)
for item in get_res:
if item:
raise Exception("Assert failed after delete")
if count_before - len(delete_ids) != self.count():
raise Exception("Assert failed after delete")
@time_wrapper
def flush(self, collection_name=None):
if collection_name is None:
collection_name = self._collection_name
status = self._milvus.flush([collection_name])
self.check_status(status)
@time_wrapper
def flush(self):
status = self._milvus.flush([self._collection_name])
self.check_status(status)
@time_wrapper
def compact(self):
status = self._milvus.compact(self._collection_name)
def compact(self, collection_name=None):
if collection_name is None:
collection_name = self._collection_name
status = self._milvus.compact(collection_name)
self.check_status(status)
@time_wrapper
@ -134,7 +248,7 @@ class MilvusClient(object):
self.check_status(status)
def describe_index(self):
status, result = self._milvus.describe_index(self._collection_name)
status, result = self._milvus.get_index_info(self._collection_name)
self.check_status(status)
index_type = None
for k, v in INDEX_MAP.items():
@ -147,28 +261,53 @@ class MilvusClient(object):
logger.info("Drop index: %s" % self._collection_name)
return self._milvus.drop_index(self._collection_name)
@time_wrapper
def query(self, X, top_k, search_param=None):
status, result = self._milvus.search_vectors(self._collection_name, top_k, query_records=X, params=search_param)
def query(self, X, top_k, search_param=None, collection_name=None):
if collection_name is None:
collection_name = self._collection_name
status, result = self._milvus.search(collection_name, top_k, query_records=X, params=search_param)
self.check_status(status)
return result
@time_wrapper
def query_ids(self, top_k, ids, search_param=None):
status, result = self._milvus.search_by_ids(self._collection_name, ids, top_k, params=search_param)
self.check_result_ids(result)
return result
def query_rand(self):
top_k = random.randint(1, 100)
nq = random.randint(1, 100)
nprobe = random.randint(1, 100)
search_param = {"nprobe": nprobe}
_, X = self.get_rand_entities(nq)
logger.info("%s, Search nq: %d, top_k: %d, nprobe: %d" % (self._collection_name, nq, top_k, nprobe))
status, _ = self._milvus.search(self._collection_name, top_k, query_records=X, params=search_param)
self.check_status(status)
# for i, item in enumerate(search_res):
# if item[0].id != ids[i]:
# logger.warning("The index of search result: %d" % i)
# raise Exception("Query failed")
def count(self):
return self._milvus.count_collection(self._collection_name)[1]
# @time_wrapper
# def query_ids(self, top_k, ids, search_param=None):
# status, result = self._milvus.search_by_id(self._collection_name, ids, top_k, params=search_param)
# self.check_result_ids(result)
# return result
def delete(self, timeout=120):
def count(self, name=None):
if name is None:
name = self._collection_name
logger.debug(self._milvus.count_entities(name))
row_count = self._milvus.count_entities(name)[1]
if not row_count:
row_count = 0
logger.debug("Row count: %d in collection: <%s>" % (row_count, name))
return row_count
def drop(self, timeout=120, name=None):
timeout = int(timeout)
logger.info("Start delete collection: %s" % self._collection_name)
self._milvus.drop_collection(self._collection_name)
if name is None:
name = self._collection_name
logger.info("Start delete collection: %s" % name)
status = self._milvus.drop_collection(name)
self.check_status(status)
i = 0
while i < timeout:
if self.count():
if self.count(name=name):
time.sleep(1)
i = i + 1
continue
@ -178,26 +317,33 @@ class MilvusClient(object):
logger.error("Delete collection timeout")
def describe(self):
return self._milvus.describe_collection(self._collection_name)
# logger.info(self._milvus.get_collection_info(self._collection_name))
return self._milvus.get_collection_info(self._collection_name)
def show_collections(self):
return self._milvus.show_collections()
return self._milvus.list_collections()
def exists_collection(self, collection_name=None):
if collection_name is None:
collection_name = self._collection_name
status, res = self._milvus.has_collection(collection_name)
_, res = self._milvus.has_collection(collection_name)
# self.check_status(status)
return res
def clean_db(self):
collection_names = self.show_collections()[1]
for name in collection_names:
logger.debug(name)
self.drop(name=name)
@time_wrapper
def preload_collection(self):
status = self._milvus.preload_collection(self._collection_name, timeout=3000)
status = self._milvus.load_collection(self._collection_name, timeout=3000)
self.check_status(status)
return status
def get_server_version(self):
status, res = self._milvus.server_version()
_, res = self._milvus.server_version()
return res
def get_server_mode(self):
@ -222,151 +368,3 @@ class MilvusClient(object):
logger.info("Server command: %s, result: %s" % (command, res))
self.check_status(status)
return res
def fit(collection_name, X):
milvus = Milvus()
milvus.connect(host = SERVER_HOST_DEFAULT, port = SERVER_PORT_DEFAULT)
start = time.time()
status, ids = milvus.add_vectors(collection_name, X)
end = time.time()
logger(status, round(end - start, 2))
def fit_concurrent(collection_name, process_num, vectors):
processes = []
for i in range(process_num):
p = Process(target=fit, args=(collection_name, vectors, ))
processes.append(p)
p.start()
for p in processes:
p.join()
if __name__ == "__main__":
import numpy
import sklearn.preprocessing
# collection_name = "tset_test"
# # collection_name = "test_tset1"
# m = MilvusClient(collection_name)
# m.delete()
# time.sleep(2)
# m.create_collection(collection_name, 128, 20, "ip")
# print(m.describe())
# print(m.count())
# print(m.describe_index())
# # sys.exit()
# tmp = [[random.random() for _ in range(128)] for _ in range(20000)]
# tmp1 = sklearn.preprocessing.normalize(tmp, axis=1, norm='l2')
# print(tmp1[0][0])
# tmp = [[random.random() for _ in range(128)] for _ in range(20000)]
# tmp /= numpy.linalg.norm(tmp)
# print(tmp[0][0])
# sum_1 = 0
# sum_2 = 0
# for item in tmp:
# for i in item:
# sum_2 = sum_2 + i * i
# break
# for item in tmp1:
# for i in item:
# sum_1 = sum_1 + i * i
# break
# print(sum_1, sum_2)
# insert_vectors = tmp.tolist()
# # print(insert_vectors)
# for i in range(2):
# m.insert(insert_vectors)
# time.sleep(5)
# print(m.create_index("ivf_flat", 16384))
# X = [insert_vectors[0], insert_vectors[1], insert_vectors[2]]
# top_k = 5
# nprobe = 1
# print(m.query(X, top_k, nprobe))
# # print(m.drop_index())
# print(m.describe_index())
# sys.exit()
# # insert_vectors = [[random.random() for _ in range(128)] for _ in range(100000)]
# # for i in range(100):
# # m.insert(insert_vectors)
# # time.sleep(5)
# # print(m.describe_index())
# # print(m.drop_index())
# m.create_index("ivf_sq8h", 16384)
# print(m.count())
# print(m.describe_index())
# sys.exit()
# print(m.create_index("ivf_sq8h", 16384))
# print(m.count())
# print(m.describe_index())
import numpy as np
# def mmap_fvecs(fname):
# x = np.memmap(fname, dtype='int32', mode='r')
# d = x[0]
# return x.view('float32').reshape(-1, d + 1)[:, 1:]
# print(mmap_fvecs("/poc/deep1b/deep1B_queries.fvecs"))
# SIFT_SRC_QUERY_DATA_DIR = '/poc/yuncong/ann_1000m'
# file_name = SIFT_SRC_QUERY_DATA_DIR+'/'+'query.npy'
# data = numpy.load(file_name)
# query_vectors = data[0:2].tolist()
# print(len(query_vectors))
# results = m.query(query_vectors, 10, 10)
# result_ids = []
# for result in results[1]:
# tmp = []
# for item in result:
# tmp.append(item.id)
# result_ids.append(tmp)
# print(result_ids[0][:10])
# # gt
# file_name = SIFT_SRC_QUERY_DATA_DIR+"/gnd/"+"idx_1M.ivecs"
# a = numpy.fromfile(file_name, dtype='int32')
# d = a[0]
# true_ids = a.reshape(-1, d + 1)[:, 1:].copy()
# print(true_ids[:3, :2])
# print(len(true_ids[0]))
# import numpy as np
# import sklearn.preprocessing
# def mmap_fvecs(fname):
# x = np.memmap(fname, dtype='int32', mode='r')
# d = x[0]
# return x.view('float32').reshape(-1, d + 1)[:, 1:]
# data = mmap_fvecs("/poc/deep1b/deep1B_queries.fvecs")
# data = sklearn.preprocessing.normalize(data, axis=1, norm='l2')
# np.save("/test/milvus/deep1b/query.npy", data)
dimension = 4096
insert_xb = 10000
insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(insert_xb)]
data = sklearn.preprocessing.normalize(insert_vectors, axis=1, norm='l2')
np.save("/test/milvus/raw_data/random/query_%d.npy" % dimension, data)
sys.exit()
total_size = 100000000
# total_size = 1000000000
file_size = 100000
# file_size = 100000
dimension = 4096
file_num = total_size // file_size
for i in range(file_num):
print(i)
# fname = "/test/milvus/raw_data/deep1b/binary_96_%05d" % i
fname = "/test/milvus/raw_data/random/binary_%dd_%05d" % (dimension, i)
# print(fname, i*file_size, (i+1)*file_size)
# single_data = data[i*file_size : (i+1)*file_size]
single_data = [[random.random() for _ in range(dimension)] for _ in range(file_size)]
single_data = sklearn.preprocessing.normalize(single_data, axis=1, norm='l2')
np.save(fname, single_data)

View File

@ -1,14 +1,17 @@
import os
import logging
import pdb
import string
import time
import re
import random
import traceback
import json
import csv
from multiprocessing import Process
import numpy as np
from yaml import full_load, dump
from concurrent import futures
from client import MilvusClient
import parser
from runner import Runner
@ -18,6 +21,7 @@ import utils
logger = logging.getLogger("milvus_benchmark.k8s_runner")
namespace = "milvus"
default_port = 19530
DELETE_INTERVAL_TIME = 5
# INSERT_INTERVAL = 100000
INSERT_INTERVAL = 50000
@ -26,33 +30,45 @@ default_path = "/var/lib/milvus"
class K8sRunner(Runner):
"""run docker mode"""
def __init__(self):
"""
Run with helm mode.
Upload test result after tests finished
"""
super(K8sRunner, self).__init__()
self.name = utils.get_unique_name()
self.service_name = utils.get_unique_name()
self.host = None
self.ip = None
self.port = default_port
self.hostname = None
self.env_value = None
def init_env(self, server_config, server_host, image_type, image_tag):
def init_env(self, server_config, server_host, deploy_mode, image_type, image_tag):
"""
Deploy start server with using helm and clean up env.
If deploy or start failed
"""
logger.debug("Tests run on server host:")
logger.debug(server_host)
self.hostname = server_host
# update values
helm_path = os.path.join(os.getcwd(), "../milvus-helm")
helm_path = os.path.join(os.getcwd(), "../milvus-helm/charts/milvus")
values_file_path = helm_path+"/values.yaml"
if not os.path.exists(values_file_path):
raise Exception("File %s not existed" % values_file_path)
utils.update_values(values_file_path, server_host, server_config)
if server_config:
utils.update_values(values_file_path, deploy_mode, server_host, server_config)
try:
logger.debug("Start install server")
self.host, self.ip = utils.helm_install_server(helm_path, image_tag, image_type, self.name, namespace)
self.host = utils.helm_install_server(helm_path, deploy_mode, image_tag, image_type, self.service_name, namespace)
except Exception as e:
logger.error("Helm install server failed: %s" % str(e))
logger.error("Helm install server failed: %s" % (str(e)))
logger.error(traceback.format_exc())
logger.debug(server_config)
self.clean_up()
return False
# for debugging
# self.host = "192.168.1.101"
if not self.host:
logger.error("Helm install server failed")
self.clean_up()
@ -60,10 +76,17 @@ class K8sRunner(Runner):
return True
def clean_up(self):
logger.debug(self.name)
utils.helm_del_server(self.name, namespace)
"""
Stop server with using helm.
def report_wrapper(self, milvus_instance, env_value, hostname, collection_info, index_info, search_params):
"""
logger.debug("Start clean up: %s" % self.service_name)
utils.helm_del_server(self.service_name, namespace)
def report_wrapper(self, milvus_instance, env_value, hostname, collection_info, index_info, search_params, run_params=None):
"""
upload test result
"""
metric = Metric()
metric.set_run_id(timestamp)
metric.env = Env(env_value)
@ -76,24 +99,101 @@ class K8sRunner(Runner):
metric.collection = collection_info
metric.index = index_info
metric.search = search_params
metric.run_params = run_params
return metric
def run(self, run_type, collection):
"""
override runner.run
"""
logger.debug(run_type)
logger.debug(collection)
collection_name = collection["collection_name"]
milvus_instance = MilvusClient(collection_name=collection_name, ip=self.ip)
collection_name = collection["collection_name"] if "collection_name" in collection else None
milvus_instance = MilvusClient(collection_name=collection_name, host=self.host)
self.env_value = milvus_instance.get_server_config()
# ugly implemention
# remove some parts of result before uploading results
self.env_value.pop("logs")
if milvus_instance.get_server_mode() == "CPU":
if "gpu" in self.env_value:
self.env_value.pop("gpu")
elif "cache.enable" in self.env_value["gpu"]:
self.env_value["gpu"].pop("cache.enable")
self.env_value.pop("network")
if run_type == "insert_performance":
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
ni_per = collection["ni_per"]
build_index = collection["build_index"]
if milvus_instance.exists_collection():
milvus_instance.delete()
milvus_instance.drop()
time.sleep(10)
index_info = {}
search_params = {}
milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type)
if build_index is True:
index_type = collection["index_type"]
index_param = collection["index_param"]
index_info = {
"index_type": index_type,
"index_param": index_param
}
milvus_instance.create_index(index_type, index_param)
logger.debug(milvus_instance.describe_index())
res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per)
logger.info(res)
if "flush" in collection and collection["flush"] == "no":
logger.debug("No manual flush")
else:
milvus_instance.flush()
logger.debug(milvus_instance.count())
collection_info = {
"dimension": dimension,
"metric_type": metric_type,
"dataset_name": collection_name
}
metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params)
metric.metrics = {
"type": run_type,
"value": {
"total_time": res["total_time"],
"qps": res["qps"],
"ni_time": res["ni_time"]
}
}
report(metric)
if build_index is True:
logger.debug("Start build index for last file")
milvus_instance.create_index(index_type, index_param)
logger.debug(milvus_instance.describe_index())
elif run_type == "insert_debug_performance":
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
ni_per = collection["ni_per"]
if milvus_instance.exists_collection():
milvus_instance.drop()
time.sleep(10)
index_info = {}
search_params = {}
milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type)
insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(ni_per)]
start_time = time.time()
i = 0
while time.time() < start_time + 2 * 24 * 3600:
i = i + 1
logger.debug(i)
logger.debug("Row count: %d" % milvus_instance.count())
milvus_instance.insert(insert_vectors)
time.sleep(0.1)
elif run_type == "insert_performance_multi_collections":
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
ni_per = collection["ni_per"]
build_index = collection["build_index"]
if milvus_instance.exists_collection():
milvus_instance.drop()
time.sleep(10)
index_info = {}
search_params = {}
@ -122,7 +222,7 @@ class K8sRunner(Runner):
"total_time": res["total_time"],
"qps": res["qps"],
"ni_time": res["ni_time"]
}
}
}
report(metric)
if build_index is True:
@ -130,11 +230,11 @@ class K8sRunner(Runner):
milvus_instance.create_index(index_type, index_param)
logger.debug(milvus_instance.describe_index())
if run_type == "insert_flush_performance":
elif run_type == "insert_flush_performance":
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
ni_per = collection["ni_per"]
if milvus_instance.exists_collection():
milvus_instance.delete()
milvus_instance.drop()
time.sleep(10)
index_info = {}
search_params = {}
@ -167,6 +267,7 @@ class K8sRunner(Runner):
collection_info = {
"dimension": dimension,
"metric_type": metric_type,
"index_file_size": index_file_size,
"dataset_name": collection_name
}
index_info = {
@ -223,8 +324,8 @@ class K8sRunner(Runner):
for i in range(loops):
delete_ids = ids[i*ni_per : i*ni_per+ni_per]
logger.debug("Delete %d - %d" % (delete_ids[0], delete_ids[-1]))
milvus_instance.delete_vectors(delete_ids)
milvus_instance.flush()
milvus_instance.delete(delete_ids)
# milvus_instance.flush()
logger.debug("Table row counts: %d" % milvus_instance.count())
logger.debug("Table row counts: %d" % milvus_instance.count())
milvus_instance.flush()
@ -243,6 +344,40 @@ class K8sRunner(Runner):
}
report(metric)
elif run_type == "get_ids_performance":
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
ids_length_per_segment = collection["ids_length_per_segment"]
if not milvus_instance.exists_collection():
logger.error("Table name: %s not existed" % collection_name)
return
collection_info = {
"dimension": dimension,
"metric_type": metric_type,
"index_file_size": index_file_size,
"dataset_name": collection_name
}
search_params = {}
logger.info(milvus_instance.count())
index_info = milvus_instance.describe_index()
logger.info(index_info)
for ids_num in ids_length_per_segment:
segment_num, get_ids = milvus_instance.get_rand_ids_each_segment(ids_num)
start_time = time.time()
_ = milvus_instance.get_entities(get_ids)
total_time = time.time() - start_time
avg_time = total_time / segment_num
run_params = {"ids_num": ids_num}
logger.info("Segment num: %d, ids num per segment: %d, run_time: %f" % (segment_num, ids_num, total_time))
metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params, run_params=run_params)
metric.metrics = {
"type": run_type,
"value": {
"total_time": round(total_time, 1),
"avg_time": round(avg_time, 1)
}
}
report(metric)
elif run_type == "search_performance":
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
run_count = collection["run_count"]
@ -252,10 +387,9 @@ class K8sRunner(Runner):
collection_info = {
"dimension": dimension,
"metric_type": metric_type,
"index_file_size": index_file_size,
"dataset_name": collection_name
}
# fro debugging
# time.sleep(3600)
if not milvus_instance.exists_collection():
logger.error("Table name: %s not existed" % collection_name)
return
@ -291,6 +425,160 @@ class K8sRunner(Runner):
}
report(metric)
elif run_type == "locust_search_performance":
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(
collection_name)
### clear db
### spawn locust requests
collection_num = collection["collection_num"]
task = collection["task"]
# . generate task code
task_file = utils.get_unique_name()
task_file_script = task_file + '.py'
task_file_csv = task_file + '_stats.csv'
task_type = task["type"]
connection_type = "single"
connection_num = task["connection_num"]
if connection_num > 1:
connection_type = "multi"
clients_num = task["clients_num"]
hatch_rate = task["hatch_rate"]
during_time = task["during_time"]
def_name = task_type
task_params = task["params"]
collection_names = []
for i in range(collection_num):
suffix = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(5))
collection_names.append(collection_name + "_" + suffix)
# #####
ni_per = collection["ni_per"]
build_index = collection["build_index"]
# TODO: debug
for c_name in collection_names:
milvus_instance = MilvusClient(collection_name=c_name, host=self.host, port=self.port)
if milvus_instance.exists_collection(collection_name=c_name):
milvus_instance.drop(name=c_name)
time.sleep(10)
milvus_instance.create_collection(c_name, dimension, index_file_size, metric_type)
index_info = {
"build_index": build_index
}
if build_index is True:
index_type = collection["index_type"]
index_param = collection["index_param"]
index_info.update({
"index_type": index_type,
"index_param": index_param
})
milvus_instance.create_index(index_type, index_param)
logger.debug(milvus_instance.describe_index())
res = self.do_insert(milvus_instance, c_name, data_type, dimension, collection_size, ni_per)
logger.info(res)
if "flush" in collection and collection["flush"] == "no":
logger.debug("No manual flush")
else:
milvus_instance.flush()
logger.debug("Table row counts: %d" % milvus_instance.count(name=c_name))
if build_index is True:
logger.debug("Start build index for last file")
milvus_instance.create_index(index_type, index_param)
logger.debug(milvus_instance.describe_index())
code_str = """
import random
import string
from locust import User, task, between
from locust_task import MilvusTask
from client import MilvusClient
host = '%s'
port = %s
dim = %s
connection_type = '%s'
collection_names = %s
m = MilvusClient(host=host, port=port)
def get_collection_name():
return random.choice(collection_names)
def get_client(collection_name):
if connection_type == 'single':
return MilvusTask(m=m)
elif connection_type == 'multi':
return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name)
class QueryTask(User):
wait_time = between(0.001, 0.002)
@task()
def %s(self):
top_k = %s
X = [[random.random() for i in range(dim)] for i in range(%s)]
search_param = %s
collection_name = get_collection_name()
client = get_client(collection_name)
client.query(X, top_k, search_param, collection_name=collection_name)
""" % (self.host, self.port, dimension, connection_type, collection_names, def_name, task_params["top_k"], task_params["nq"], task_params["search_param"])
with open(task_file_script, 'w+') as fd:
fd.write(code_str)
locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % (
task_file_script,
task_file,
clients_num,
hatch_rate,
during_time)
logger.info(locust_cmd)
try:
res = os.system(locust_cmd)
except Exception as e:
logger.error(str(e))
return
# . retrieve and collect test statistics
locust_stats = None
with open(task_file_csv, newline='') as fd:
dr = csv.DictReader(fd)
for row in dr:
if row["Name"] != "Aggregated":
continue
locust_stats = row
logger.info(locust_stats)
# clean up temp files
search_params = {
"top_k": task_params["top_k"],
"nq": task_params["nq"],
"nprobe": task_params["search_param"]["nprobe"]
}
run_params = {
"connection_num": connection_num,
"clients_num": clients_num,
"hatch_rate": hatch_rate,
"during_time": during_time
}
collection_info = {
"dimension": dimension,
"metric_type": metric_type,
"index_file_size": index_file_size,
"dataset_name": collection_name
}
metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params, run_params)
metric.metrics = {
"type": run_type,
"value": {
"during_time": during_time,
"request_count": int(locust_stats["Request Count"]),
"failure_count": int(locust_stats["Failure Count"]),
"qps": locust_stats["Requests/s"],
"min_response_time": int(locust_stats["Min Response Time"]),
"max_response_time": int(locust_stats["Max Response Time"]),
"median_response_time": int(locust_stats["Median Response Time"]),
"avg_response_time": int(locust_stats["Average Response Time"])
}
}
report(metric)
elif run_type == "search_ids_stability":
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
search_params = collection["search_params"]
@ -300,6 +588,7 @@ class K8sRunner(Runner):
collection_info = {
"dimension": dimension,
"metric_type": metric_type,
"index_file_size": index_file_size,
"dataset_name": collection_name
}
if not milvus_instance.exists_collection():
@ -310,8 +599,8 @@ class K8sRunner(Runner):
logger.info(index_info)
g_top_k = int(collection["top_ks"].split("-")[1])
l_top_k = int(collection["top_ks"].split("-")[0])
g_id = int(ids.split("-")[1])
l_id = int(ids.split("-")[0])
# g_id = int(ids.split("-")[1])
# l_id = int(ids.split("-")[0])
g_id_length = int(ids_length.split("-")[1])
l_id_length = int(ids_length.split("-")[0])
@ -354,6 +643,7 @@ class K8sRunner(Runner):
collection_info = {
"dimension": dimension,
"metric_type": metric_type,
"index_file_size": index_file_size,
"dataset_name": collection_name
}
if not milvus_instance.exists_collection():
@ -367,14 +657,14 @@ class K8sRunner(Runner):
for search_param in search_params:
for top_k in top_ks:
for nq in nqs:
total = 0
# total = 0
search_param_group = {
"nq": nq,
"topk": top_k,
"search_param": search_param
}
logger.info("Query params: %s" % json.dumps(search_param_group))
result_ids, result_distances = self.do_query_ids(milvus_instance, collection_name, top_k, nq, search_param=search_param)
result_ids, _ = self.do_query_ids(milvus_instance, collection_name, top_k, nq, search_param=search_param)
acc_value = self.get_recall_value(true_ids_all[:nq, :top_k].tolist(), result_ids)
logger.info("Query accuracy: %s" % acc_value)
metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_param_group)
@ -401,18 +691,20 @@ class K8sRunner(Runner):
index_params = self.generate_combinations(index_params)
data_type, dimension, metric_type = parser.parse_ann_collection_name(collection_name)
collection_info = {
"dimension": dimension,
"metric_type": metric_type,
"dataset_name": collection_name
}
dataset = utils.get_dataset(hdf5_source_file)
if milvus_instance.exists_collection(collection_name):
logger.info("Re-create collection: %s" % collection_name)
milvus_instance.delete()
time.sleep(DELETE_INTERVAL_TIME)
true_ids = np.array(dataset["neighbors"])
for index_file_size in index_file_sizes:
collection_info = {
"dimension": dimension,
"metric_type": metric_type,
"index_file_size": index_file_size,
"dataset_name": collection_name
}
if milvus_instance.exists_collection(collection_name):
logger.info("Re-create collection: %s" % collection_name)
milvus_instance.drop()
time.sleep(DELETE_INTERVAL_TIME)
milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type)
logger.info(milvus_instance.describe())
insert_vectors = self.normalize(metric_type, np.array(dataset["train"]))
@ -459,6 +751,9 @@ class K8sRunner(Runner):
result = milvus_instance.query(query_vectors.tolist(), top_k, search_param=search_param)
else:
result = milvus_instance.query(query_vectors, top_k, search_param=search_param)
if len(result):
logger.debug(len(result))
logger.debug(len(result[0]))
result_ids = result.id_array
acc_value = self.get_recall_value(true_ids[:nq, :top_k].tolist(), result_ids)
logger.info("Query ann_accuracy: %s" % acc_value)
@ -519,6 +814,73 @@ class K8sRunner(Runner):
}
report(metric)
elif run_type == "loop_stability":
# init data
milvus_instance.clean_db()
pull_interval = collection["pull_interval"]
collection_num = collection["collection_num"]
concurrent = collection["concurrent"] if "concurrent" in collection else False
concurrent_num = collection_num
dimension = collection["dimension"] if "dimension" in collection else 128
insert_xb = collection["insert_xb"] if "insert_xb" in collection else 100000
index_types = collection["index_types"] if "index_types" in collection else ['ivf_sq8']
index_param = {"nlist": 2048}
collection_names = []
milvus_instances_map = {}
insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(insert_xb)]
for i in range(collection_num):
name = utils.get_unique_name(prefix="collection_")
collection_names.append(name)
metric_type = random.choice(["l2", "ip"])
index_file_size = random.randint(10, 20)
milvus_instance.create_collection(name, dimension, index_file_size, metric_type)
milvus_instance = MilvusClient(collection_name=name, host=self.host)
index_type = random.choice(index_types)
milvus_instance.create_index(index_type, index_param=index_param)
logger.info(milvus_instance.describe_index())
insert_vectors = utils.normalize(metric_type, insert_vectors)
milvus_instance.insert(insert_vectors)
milvus_instance.flush()
milvus_instances_map.update({name: milvus_instance})
logger.info(milvus_instance.describe_index())
logger.info(milvus_instance.describe())
# loop time unit: min -> s
pull_interval_seconds = pull_interval * 60
tasks = ["insert_rand", "delete_rand", "query_rand", "flush", "compact"]
i = 1
while True:
logger.info("Loop time: %d" % i)
start_time = time.time()
while time.time() - start_time < pull_interval_seconds:
if concurrent:
mp = []
for _ in range(concurrent_num):
tmp_collection_name = random.choice(collection_names)
task_name = random.choice(tasks)
mp.append((tmp_collection_name, task_name))
with futures.ThreadPoolExecutor(max_workers=concurrent_num) as executor:
future_results = {executor.submit(getattr(milvus_instances_map[mp[j][0]], mp[j][1])): j for j in range(concurrent_num)}
for future in futures.as_completed(future_results):
future.result()
else:
tmp_collection_name = random.choice(collection_names)
task_name = random.choice(tasks)
logger.info(tmp_collection_name)
logger.info(task_name)
task_run = getattr(milvus_instances_map[tmp_collection_name], task_name)
task_run()
logger.debug("Restart server")
utils.restart_server(self.service_name, namespace)
# new connection
for name in collection_names:
milvus_instance = MilvusClient(collection_name=name, host=self.host)
milvus_instances_map.update({name: milvus_instance})
i = i + 1
elif run_type == "stability":
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
search_params = collection["search_params"]
@ -566,7 +928,7 @@ class K8sRunner(Runner):
ids.extend(insert_ids)
status, res = milvus_instance.insert(insert_vectors, ids=insert_ids)
logger.debug("%d, row_count: %d" % (i, milvus_instance.count()))
milvus_instance.delete_vectors(ids[-delete_xb:])
milvus_instance.delete(ids[-delete_xb:])
milvus_instance.flush()
milvus_instance.compact()
end_mem_usage = milvus_instance.get_mem_info()["memory_used"]
@ -584,7 +946,168 @@ class K8sRunner(Runner):
}
report(metric)
elif run_type == "locust_mix_performance":
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(
collection_name)
ni_per = collection["ni_per"]
build_index = collection["build_index"]
# # TODO: debug
if milvus_instance.exists_collection():
milvus_instance.drop()
time.sleep(10)
index_info = {}
search_params = {}
milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type)
if build_index is True:
index_type = collection["index_type"]
index_param = collection["index_param"]
index_info = {
"index_tyoe": index_type,
"index_param": index_param
}
milvus_instance.create_index(index_type, index_param)
logger.debug(milvus_instance.describe_index())
res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per)
logger.info(res)
if "flush" in collection and collection["flush"] == "no":
logger.debug("No manual flush")
else:
milvus_instance.flush()
if build_index is True:
logger.debug("Start build index for last file")
milvus_instance.create_index(index_type, index_param)
logger.debug(milvus_instance.describe_index())
### spawn locust requests
task = collection["tasks"]
# generate task code
task_file = utils.get_unique_name()
task_file_script = task_file + '.py'
task_file_csv = task_file + '_stats.csv'
task_types = task["types"]
connection_type = "single"
connection_num = task["connection_num"]
if connection_num > 1:
connection_type = "multi"
clients_num = task["clients_num"]
hatch_rate = task["hatch_rate"]
during_time = task["during_time"]
def_strs = ""
for task_type in task_types:
_type = task_type["type"]
weight = task_type["weight"]
if _type == "flush":
def_str = """
@task(%d)
def flush(self):
client = get_client(collection_name)
client.flush(collection_name=collection_name)
""" % weight
if _type == "compact":
def_str = """
@task(%d)
def compact(self):
client = get_client(collection_name)
client.compact(collection_name)
""" % weight
if _type == "query":
def_str = """
@task(%d)
def query(self):
client = get_client(collection_name)
params = %s
X = [[random.random() for i in range(dim)] for i in range(params["nq"])]
client.query(X, params["top_k"], params["search_param"], collection_name=collection_name)
""" % (weight, task_type["params"])
if _type == "insert":
def_str = """
@task(%d)
def insert(self):
client = get_client(collection_name)
params = %s
ids = [random.randint(10, 1000000) for i in range(params["nb"])]
X = [[random.random() for i in range(dim)] for i in range(params["nb"])]
client.insert(X,ids=ids, collection_name=collection_name)
""" % (weight, task_type["params"])
if _type == "delete":
def_str = """
@task(%d)
def delete(self):
client = get_client(collection_name)
ids = [random.randint(1, 1000000) for i in range(1)]
client.delete(ids, collection_name)
""" % weight
def_strs += def_str
code_str = """
import random
import json
from locust import User, task, between
from locust_task import MilvusTask
from client import MilvusClient
host = '%s'
port = %s
collection_name = '%s'
dim = %s
connection_type = '%s'
m = MilvusClient(host=host, port=port)
def get_client(collection_name):
if connection_type == 'single':
return MilvusTask(m=m)
elif connection_type == 'multi':
return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name)
class MixTask(User):
wait_time = between(0.001, 0.002)
%s
""" % (self.host, self.port, collection_name, dimension, connection_type, def_strs)
print(def_strs)
with open(task_file_script, "w+") as fd:
fd.write(code_str)
locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % (
task_file_script,
task_file,
clients_num,
hatch_rate,
during_time)
logger.info(locust_cmd)
try:
res = os.system(locust_cmd)
except Exception as e:
logger.error(str(e))
return
# . retrieve and collect test statistics
locust_stats = None
with open(task_file_csv, newline='') as fd:
dr = csv.DictReader(fd)
for row in dr:
if row["Name"] != "Aggregated":
continue
locust_stats = row
logger.info(locust_stats)
collection_info = {
"dimension": dimension,
"metric_type": metric_type,
"dataset_name": collection_name
}
metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params)
metric.metrics = {
"type": run_type,
"value": {
"during_time": during_time,
"request_count": int(locust_stats["Request Count"]),
"failure_count": int(locust_stats["Failure Count"]),
"qps": locust_stats["Requests/s"],
"min_response_time": int(locust_stats["Min Response Time"]),
"max_response_time": int(locust_stats["Max Response Time"]),
"median_response_time": int(locust_stats["Median Response Time"]),
"avg_response_time": int(locust_stats["Average Response Time"])
}
}
report(metric)
else:
logger.warning("Run type not defined")
logger.warning("Run type: %s not defined" % run_type)
return
logger.debug("Test finished")

View File

@ -3,7 +3,9 @@ import logging
import pdb
import time
import random
import string
import json
import csv
from multiprocessing import Process
import numpy as np
import concurrent.futures
@ -18,17 +20,21 @@ logger = logging.getLogger("milvus_benchmark.local_runner")
class LocalRunner(Runner):
"""run local mode"""
def __init__(self, ip, port):
def __init__(self, host, port):
"""
Run tests at local mode.
Make sure the server has started
"""
super(LocalRunner, self).__init__()
self.ip = ip
self.host = host
self.port = port
def run(self, run_type, collection):
logger.debug(run_type)
logger.debug(collection)
collection_name = collection["collection_name"]
milvus_instance = MilvusClient(collection_name=collection_name, ip=self.ip, port=self.port)
collection_name = collection["collection_name"] if "collection_name" in collection else None
milvus_instance = MilvusClient(collection_name=collection_name, host=self.host, port=self.port)
logger.info(milvus_instance.show_collections())
env_value = milvus_instance.get_server_config()
logger.debug(env_value)
@ -38,7 +44,7 @@ class LocalRunner(Runner):
ni_per = collection["ni_per"]
build_index = collection["build_index"]
if milvus_instance.exists_collection():
milvus_instance.delete()
milvus_instance.drop()
time.sleep(10)
milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type)
if build_index is True:
@ -61,17 +67,17 @@ class LocalRunner(Runner):
logger.error(milvus_instance.show_collections())
logger.warning("Table: %s not found" % collection_name)
return
length = milvus_instance.count()
length = milvus_instance.count()
ids = [i for i in range(length)]
loops = int(length / ni_per)
for i in range(loops):
delete_ids = ids[i*ni_per : i*ni_per+ni_per]
logger.debug("Delete %d - %d" % (delete_ids[0], delete_ids[-1]))
milvus_instance.delete_vectors(delete_ids)
milvus_instance.flush()
milvus_instance.delete(delete_ids)
milvus_instance.flush()
logger.debug("Table row counts: %d" % milvus_instance.count())
logger.debug("Table row counts: %d" % milvus_instance.count())
milvus_instance.flush()
milvus_instance.flush()
logger.debug("Table row counts: %d" % milvus_instance.count())
elif run_type == "build_performance":
@ -121,6 +127,118 @@ class LocalRunner(Runner):
mem_usage = milvus_instance.get_mem_info()["memory_used"]
logger.info(mem_usage)
elif run_type == "locust_search_performance":
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
### spawn locust requests
collection_num = collection["collection_num"]
task = collection["task"]
#. generate task code
task_file = utils.get_unique_name()
task_file_script = task_file+'.py'
task_file_csv = task_file+'_stats.csv'
task_type = task["type"]
connection_type = "single"
connection_num = task["connection_num"]
if connection_num > 1:
connection_type = "multi"
clients_num = task["clients_num"]
hatch_rate = task["hatch_rate"]
during_time = task["during_time"]
def_name = task_type
task_params = task["params"]
collection_names = []
for i in range(collection_num):
suffix = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(5))
collection_names.append(collection_name + "_" + suffix)
# collection_names = ['sift_1m_1024_128_l2_Kg6co', 'sift_1m_1024_128_l2_egkBK', 'sift_1m_1024_128_l2_D0wtE',
# 'sift_1m_1024_128_l2_9naps', 'sift_1m_1024_128_l2_iJ0jj', 'sift_1m_1024_128_l2_nqUTm',
# 'sift_1m_1024_128_l2_GIF0D', 'sift_1m_1024_128_l2_EL2qk', 'sift_1m_1024_128_l2_qLRnC',
# 'sift_1m_1024_128_l2_8Ditg']
# #####
ni_per = collection["ni_per"]
build_index = collection["build_index"]
for c_name in collection_names:
milvus_instance = MilvusClient(collection_name=c_name, host=self.host, port=self.port)
if milvus_instance.exists_collection(collection_name=c_name):
milvus_instance.drop(name=c_name)
time.sleep(10)
milvus_instance.create_collection(c_name, dimension, index_file_size, metric_type)
if build_index is True:
index_type = collection["index_type"]
index_param = collection["index_param"]
milvus_instance.create_index(index_type, index_param)
logger.debug(milvus_instance.describe_index())
res = self.do_insert(milvus_instance, c_name, data_type, dimension, collection_size, ni_per)
milvus_instance.flush()
logger.debug("Table row counts: %d" % milvus_instance.count(name=c_name))
if build_index is True:
logger.debug("Start build index for last file")
milvus_instance.create_index(index_type, index_param)
logger.debug(milvus_instance.describe_index())
code_str = """
import random
import string
from locust import User, task, between
from locust_task import MilvusTask
from client import MilvusClient
host = '%s'
port = %s
dim = %s
connection_type = '%s'
collection_names = %s
m = MilvusClient(host=host, port=port)
def get_collection_name():
return random.choice(collection_names)
def get_client(collection_name):
if connection_type == 'single':
return MilvusTask(m=m)
elif connection_type == 'multi':
return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name)
class QueryTask(User):
wait_time = between(0.001, 0.002)
@task()
def %s(self):
top_k = %s
X = [[random.random() for i in range(dim)] for i in range(%s)]
search_param = %s
collection_name = get_collection_name()
print(collection_name)
client = get_client(collection_name)
client.query(X, top_k, search_param, collection_name=collection_name)
""" % (self.host, self.port, dimension, connection_type, collection_names, def_name, task_params["top_k"], task_params["nq"], task_params["search_param"])
with open(task_file_script, 'w+') as fd:
fd.write(code_str)
locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % (
task_file_script,
task_file,
clients_num,
hatch_rate,
during_time)
logger.info(locust_cmd)
try:
res = os.system(locust_cmd)
except Exception as e:
logger.error(str(e))
return
#. retrieve and collect test statistics
metric = None
with open(task_file_csv, newline='') as fd:
dr = csv.DictReader(fd)
for row in dr:
if row["Name"] != "Aggregated":
continue
metric = row
logger.info(metric)
# clean up temp files
elif run_type == "search_ids_stability":
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
search_params = collection["search_params"]
@ -182,18 +300,18 @@ class LocalRunner(Runner):
for nq in nqs:
mem_usage = milvus_instance.get_mem_info()["memory_used"]
logger.info(mem_usage)
query_vectors = self.normalize(metric_type, np.array(dataset["test"][:nq]))
query_vectors = self.normalize(metric_type, np.array(dataset["test"][:nq]))
logger.debug(search_params)
for search_param in search_params:
logger.info("Search param: %s" % json.dumps(search_param))
total_time = 0.0
if use_single_connection is True:
connections = [MilvusClient(collection_name=collection_name, ip=self.ip, port=self.port)]
connections = [MilvusClient(collection_name=collection_name, host=self.host, port=self.port)]
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_num) as executor:
future_results = {executor.submit(
self.do_query_qps, connections[0], query_vectors, top_k, search_param=search_param) : index for index in range(concurrent_num)}
else:
connections = [MilvusClient(collection_name=collection_name, ip=self.ip, port=self.port) for i in range(concurrent_num)]
connections = [MilvusClient(collection_name=collection_name, host=self.hos, port=self.port) for i in range(concurrent_num)]
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_num) as executor:
future_results = {executor.submit(
self.do_query_qps, connections[index], query_vectors, top_k, search_param=search_param) : index for index in range(concurrent_num)}
@ -222,7 +340,7 @@ class LocalRunner(Runner):
dataset = utils.get_dataset(hdf5_source_file)
if milvus_instance.exists_collection(collection_name):
logger.info("Re-create collection: %s" % collection_name)
milvus_instance.delete()
milvus_instance.drop()
time.sleep(DELETE_INTERVAL_TIME)
true_ids = np.array(dataset["neighbors"])
for index_file_size in index_file_sizes:
@ -297,7 +415,7 @@ class LocalRunner(Runner):
query_vectors = [[random.random() for _ in range(dimension)] for _ in range(10000)]
while time.time() < start_time + during_time * 60:
i = i + 1
for j in range(insert_interval):
for _ in range(insert_interval):
top_k = random.randint(l_top_k, g_top_k)
nq = random.randint(l_nq, g_nq)
search_param = {}
@ -308,9 +426,9 @@ class LocalRunner(Runner):
count = milvus_instance.count()
insert_ids = [(count+x) for x in range(len(insert_vectors))]
ids.extend(insert_ids)
status, res = milvus_instance.insert(insert_vectors, ids=insert_ids)
_, res = milvus_instance.insert(insert_vectors, ids=insert_ids)
logger.debug("%d, row_count: %d" % (i, milvus_instance.count()))
milvus_instance.delete_vectors(ids[-delete_xb:])
milvus_instance.delete(ids[-delete_xb:])
milvus_instance.flush()
milvus_instance.compact()
end_mem_usage = milvus_instance.get_mem_info()["memory_used"]
@ -324,7 +442,187 @@ class LocalRunner(Runner):
}
logger.info(metrics)
elif run_type == "loop_stability":
# init data
milvus_instance.clean_db()
pull_interval = collection["pull_interval"]
pull_interval_seconds = pull_interval * 60
collection_num = collection["collection_num"]
dimension = collection["dimension"] if "dimension" in collection else 128
insert_xb = collection["insert_xb"] if "insert_xb" in collection else 100000
index_types = collection["index_types"] if "index_types" in collection else ['ivf_sq8']
index_param = {"nlist": 2048}
collection_names = []
milvus_instances_map = {}
insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(insert_xb)]
for i in range(collection_num):
name = utils.get_unique_name(prefix="collection_")
collection_names.append(name)
metric_type = random.choice(["l2", "ip"])
index_file_size = random.randint(10, 20)
milvus_instance.create_collection(name, dimension, index_file_size, metric_type)
milvus_instance = MilvusClient(collection_name=name, host=self.host)
index_type = random.choice(index_types)
milvus_instance.create_index(index_type, index_param=index_param)
logger.info(milvus_instance.describe_index())
insert_vectors = utils.normalize(metric_type, insert_vectors)
milvus_instance.insert(insert_vectors)
milvus_instance.flush()
milvus_instances_map.update({name: milvus_instance})
logger.info(milvus_instance.describe_index())
logger.info(milvus_instance.describe())
tasks = ["insert_rand", "delete_rand", "query_rand", "flush"]
i = 1
while True:
logger.info("Loop time: %d" % i)
start_time = time.time()
while time.time() - start_time < pull_interval_seconds:
# choose collection
tmp_collection_name = random.choice(collection_names)
# choose task from task
task_name = random.choice(tasks)
logger.info(tmp_collection_name)
logger.info(task_name)
# execute task
task_run = getattr(milvus_instances_map[tmp_collection_name], task_name)
task_run()
# new connection
for name in collection_names:
milvus_instance = MilvusClient(collection_name=name, host=self.host)
milvus_instances_map.update({name: milvus_instance})
i = i + 1
elif run_type == "locust_mix_performance":
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
# ni_per = collection["ni_per"]
# build_index = collection["build_index"]
# if milvus_instance.exists_collection():
# milvus_instance.drop()
# time.sleep(10)
# milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type)
# if build_index is True:
# index_type = collection["index_type"]
# index_param = collection["index_param"]
# milvus_instance.create_index(index_type, index_param)
# logger.debug(milvus_instance.describe_index())
# res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per)
# milvus_instance.flush()
# logger.debug("Table row counts: %d" % milvus_instance.count())
# if build_index is True:
# logger.debug("Start build index for last file")
# milvus_instance.create_index(index_type, index_param)
# logger.debug(milvus_instance.describe_index())
task = collection["tasks"]
task_file = utils.get_unique_name()
task_file_script = task_file + '.py'
task_file_csv = task_file + '_stats.csv'
task_types = task["types"]
connection_type = "single"
connection_num = task["connection_num"]
if connection_num > 1:
connection_type = "multi"
clients_num = task["clients_num"]
hatch_rate = task["hatch_rate"]
during_time = task["during_time"]
def_strs = ""
for task_type in task_types:
_type = task_type["type"]
weight = task_type["weight"]
if _type == "flush":
def_str = """
@task(%d)
def flush(self):
client = get_client(collection_name)
client.flush(collection_name=collection_name)
""" % weight
if _type == "compact":
def_str = """
@task(%d)
def compact(self):
client = get_client(collection_name)
client.compact(collection_name)
""" % weight
if _type == "query":
def_str = """
@task(%d)
def query(self):
client = get_client(collection_name)
params = %s
X = [[random.random() for i in range(dim)] for i in range(params["nq"])]
client.query(X, params["top_k"], params["search_param"], collection_name=collection_name)
""" % (weight, task_type["params"])
if _type == "insert":
def_str = """
@task(%d)
def insert(self):
client = get_client(collection_name)
params = %s
ids = [random.randint(10, 1000000) for i in range(params["nb"])]
X = [[random.random() for i in range(dim)] for i in range(params["nb"])]
client.insert(X,ids=ids, collection_name=collection_name)
""" % (weight, task_type["params"])
if _type == "delete":
def_str = """
@task(%d)
def delete(self):
client = get_client(collection_name)
ids = [random.randint(1, 1000000) for i in range(1)]
client.delete(ids, collection_name)
""" % weight
def_strs += def_str
print(def_strs)
code_str = """
import random
import json
from locust import User, task, between
from locust_task import MilvusTask
from client import MilvusClient
host = '%s'
port = %s
collection_name = '%s'
dim = %s
connection_type = '%s'
m = MilvusClient(host=host, port=port)
def get_client(collection_name):
if connection_type == 'single':
return MilvusTask(m=m)
elif connection_type == 'multi':
return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name)
class MixTask(User):
wait_time = between(0.001, 0.002)
%s
""" % (self.host, self.port, collection_name, dimension, connection_type, def_strs)
with open(task_file_script, "w+") as fd:
fd.write(code_str)
locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % (
task_file_script,
task_file,
clients_num,
hatch_rate,
during_time)
logger.info(locust_cmd)
try:
res = os.system(locust_cmd)
except Exception as e:
logger.error(str(e))
return
# . retrieve and collect test statistics
metric = None
with open(task_file_csv, newline='') as fd:
dr = csv.DictReader(fd)
for row in dr:
if row["Name"] != "Aggregated":
continue
metric = row
logger.info(metric)
else:
logger.warning("Run type not defined")
return
logger.debug("Test finished")
logger.debug("Test finished")

View File

@ -0,0 +1,30 @@
import random
from locust import HttpUser, task, between
collection_name = "random_1m_2048_512_ip_sq8"
headers = {'Content-Type': "application/json"}
url = '/collections/%s/vectors' % collection_name
top_k = 2
nq = 1
dim = 512
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
data = {
"search":{
"topk": top_k,
"vectors": vectors,
"params": {
"nprobe": 1
}
}
}
class MyUser(HttpUser):
wait_time = between(0, 0.1)
host = "http://192.168.1.112:19122"
@task
def search(self):
response = self.client.put(url=url, json=data, headers=headers, timeout=2)
print(response)

View File

@ -0,0 +1,45 @@
import time
from locust import events
from client import MilvusClient
class MilvusTask(object):
def __init__(self, connection_type="single", **kwargs):
"""
Generate milvus client for locust.
To make sure we can use the same function name in client as task name in Taskset/User.
Params: connection_type, single/multi is optional
other args: host/port/collection_name
"""
self.request_type = "grpc"
if connection_type == "single":
self.m = kwargs.get("m")
elif connection_type == "multi":
host = kwargs.get("host")
port = kwargs.get("port")
collection_name = kwargs.get("collection_name")
self.m = MilvusClient(host=host, port=port, collection_name=collection_name)
def __getattr__(self, name):
"""
Register success and failure event with using locust.events.
Make sure the task function name in locust equals to te name of function in MilvusClient
"""
func = getattr(self.m, name)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
_ = func(*args, **kwargs)
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type=self.request_type, name=name, response_time=total_time,
response_length=0)
except Exception as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type=self.request_type, name=name, response_time=total_time,
exception=e, response_length=0)
return wrapper

View File

@ -38,14 +38,20 @@ def positive_int(s):
def get_image_tag(image_version, image_type):
return "%s-%s-centos7-release" % (image_version, image_type)
# return "%s-%s-centos7-release" % ("0.7.1", image_type)
# return "%s-%s-centos7-release" % ("PR-2159", image_type)
# return "%s-%s-centos7-release" % ("PR-2780", image_type)
def queue_worker(queue):
"""
Using queue to make sure only one test process on each host.
Workers can be run concurrently on different host
"""
while not queue.empty():
q = queue.get()
suite = q["suite"]
server_host = q["server_host"]
deploy_mode = q["deploy_mode"]
image_type = q["image_type"]
image_tag = q["image_tag"]
@ -58,10 +64,10 @@ def queue_worker(queue):
collections = run_params["collections"]
for collection in collections:
# run tests
server_config = collection["server"]
server_config = collection["server"] if "server" in collection else None
logger.debug(server_config)
runner = K8sRunner()
if runner.init_env(server_config, server_host, image_type, image_tag):
if runner.init_env(server_config, server_host, deploy_mode, image_type, image_tag):
logger.debug("Start run tests")
try:
runner.run(run_type, collection)
@ -69,10 +75,12 @@ def queue_worker(queue):
logger.error(str(e))
logger.error(traceback.format_exc())
finally:
time.sleep(10)
runner.clean_up()
else:
logger.error("Runner init failed")
logger.debug("All task finished in queue: %s" % server_host)
if server_host:
logger.debug("All task finished in queue: %s" % server_host)
def main():
@ -88,6 +96,10 @@ def main():
metavar='FILE',
default='',
help="load test schedule from FILE")
arg_parser.add_argument(
"--deploy-mode",
default='',
help="single or shards")
# local mode
arg_parser.add_argument(
@ -116,15 +128,17 @@ def main():
if not args.image_version:
raise Exception("Image version not given")
image_version = args.image_version
deploy_mode = args.deploy_mode
with open(args.schedule_conf) as f:
schedule_config = full_load(f)
f.close()
queues = []
server_names = set()
# server_names = set()
server_names = []
for item in schedule_config:
server_host = item["server"]
server_host = item["server"] if "server" in item else ""
suite_params = item["suite_params"]
server_names.add(server_host)
server_names.append(server_host)
q = Queue()
for suite_param in suite_params:
suite = "suites/"+suite_param["suite"]
@ -133,11 +147,12 @@ def main():
q.put({
"suite": suite,
"server_host": server_host,
"deploy_mode": deploy_mode,
"image_tag": image_tag,
"image_type": image_type
})
queues.append(q)
logger.debug(server_names)
logging.error(queues)
thread_num = len(server_names)
processes = []
@ -145,7 +160,7 @@ def main():
x = Process(target=queue_worker, args=(queues[i], ))
processes.append(x)
x.start()
time.sleep(5)
time.sleep(10)
for x in processes:
x.join()

View File

@ -0,0 +1,44 @@
import random
from locust import User, task, between
from locust_task import MilvusTask
from client import MilvusClient
connection_type = "single"
host = "192.168.1.29"
port = 19531
collection_name = "sift_128_euclidean"
dim = 128
m = MilvusClient(host=host, port=port, collection_name=collection_name)
class MixTask(User):
wait_time = between(0.001, 0.002)
if connection_type == "single":
client = MilvusTask(m=m)
else:
client = MilvusTask(host=host, port=port, collection_name=collection_name)
@task(30)
def query(self):
top_k = 10
X = [[random.random() for i in range(dim)] for i in range(1)]
search_param = {"nprobe": 16}
self.client.query(X, top_k, search_param)
@task(10)
def insert(self):
_id = random.randint(10000000, 10000000000)
X = [[random.random() for i in range(dim)] for i in range(1)]
self.client.insert(X, ids=[_id])
@task(5)
def flush(self):
self.client.flush()
@task(10)
def delete(self):
self.client.delete([random.randint(1, 1000000)])
@task(1)
def compact(self):
self.client.compact()

View File

@ -13,6 +13,11 @@ def operations_parser(operations):
def collection_parser(collection_name):
"""
Parse the collection name defined in the suites.
Return data info with the given collection_name
"""
tmp = collection_name.split("_")
# if len(tmp) != 5:
# return None
@ -32,7 +37,7 @@ def collection_parser(collection_name):
def parse_ann_collection_name(collection_name):
data_type = collection_name.split("_")[0]
dimension = int(collection_name.split("_")[1])
metric = collection_name.split("_")[-1]
metric = collection_name.split("_")[2]
# metric = collection_name.attrs['distance']
# dimension = len(collection_name["train"][0])
if metric == "euclidean":
@ -47,6 +52,11 @@ def parse_ann_collection_name(collection_name):
def search_params_parser(param):
"""
Parse the params on search field defined in suites.
Return search params: nq/top-k/nprobe
"""
# parse top-k, set default value if top-k not in param
if "top_ks" not in param:
top_ks = [10]
@ -83,4 +93,4 @@ def search_params_parser(param):
else:
logger.warning("Invalid format nprobes: %s" % str(nprobes))
return top_ks, nqs, nprobes
return top_ks, nqs, nprobes

View File

@ -0,0 +1,46 @@
import random, string
from locust import User, task, between
from locust_task import MilvusTask
from client import MilvusClient
connection_type = "single"
host = "192.168.1.6"
port = 19530
collection_name = "sift_128_euclidean"
dim = 128
m = MilvusClient(host=host, port=port, collection_name=collection_name)
# if m.exists_collection(collection_name):
# m.drop(name=collection_name)
# m.create_collection(collection_name, dim, 512, "l2")
class QueryTask(User):
wait_time = between(0.001, 0.002)
print("in query task")
if connection_type == "single":
client = MilvusTask(m=m)
else:
client = MilvusTask(host=host, port=port, collection_name=collection_name)
ids = [random.randint(0, 1000000000) for i in range(1)]
X = [[random.random() for i in range(dim)] for i in range(1)]
# @task(1)
# def test_partition(self):
# tag_name = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(8))
# self.client.create_partition(tag_name)
# # self.client.insert(self.X, ids=self.ids, tag=tag_name)
# # count = self.count(collection_name)
# # logging.info(count)
@task(2)
def test_count(self):
self.client.count(collection_name)
@task(1)
def test_drop(self):
tags = m.list_partitions()
tag = random.choice(tags)
if tag.tag != "_default":
self.client.drop_partition(tag.tag)

View File

@ -1,4 +1,5 @@
pymilvus-test>=0.2.0
# pymilvus-test>=0.3.0,<0.4.0
pymilvus==0.2.14
scipy==1.3.1
scikit-learn==0.19.1
h5py==2.7.1
@ -9,3 +10,5 @@ ansicolors==1.1.8
scipy==1.3.1
kubernetes==10.0.1
# rq==1.2.0
locust==1.0.2
pymongo==3.10.0

View File

@ -89,6 +89,9 @@ def get_vectors_from_binary(nq, dimension, data_type):
class Runner(object):
def __init__(self):
"""Run each tests defined in the suites.
"""
pass
def normalize(self, metric_type, X):
@ -100,7 +103,7 @@ class Runner(object):
X = X.astype(np.float32)
elif metric_type in ["jaccard", "hamming", "sub", "super"]:
tmp = []
for index, item in enumerate(X):
for _, item in enumerate(X):
new_vector = bytes(np.packbits(item, axis=-1).tolist())
tmp.append(new_vector)
X = tmp
@ -170,7 +173,7 @@ class Runner(object):
end_id = start_id + len(vectors)
logger.info("Start id: %s, end id: %s" % (start_id, end_id))
ids = [k for k in range(start_id, end_id)]
status, ids = milvus.insert(vectors, ids=ids)
_, ids = milvus.insert(vectors, ids=ids)
# milvus.flush()
logger.debug(milvus.count())
ni_end_time = time.time()
@ -191,13 +194,13 @@ class Runner(object):
tmp_res = []
vectors = base_query_vectors[0:nq]
for top_k in top_ks:
avg_query_time = 0.0
# avg_query_time = 0.0
min_query_time = 0.0
logger.info("Start query, query params: top-k: {}, nq: {}, actually length of vectors: {}".format(top_k, nq, len(vectors)))
for i in range(run_count):
logger.info("Start run query, run %d of %s" % (i+1, run_count))
start_time = time.time()
query_res = milvus.query(vectors, top_k, search_param=search_param)
milvus.query(vectors, top_k, search_param=search_param)
interval_time = time.time() - start_time
if (i == 0) or (min_query_time > interval_time):
min_query_time = interval_time
@ -208,7 +211,7 @@ class Runner(object):
def do_query_qps(self, milvus, query_vectors, top_k, search_param):
start_time = time.time()
result = milvus.query(query_vectors, top_k, search_param)
milvus.query(query_vectors, top_k, search_param)
end_time = time.time()
return end_time - start_time

View File

@ -0,0 +1,65 @@
[
{
"server": "eros",
"suite_params": [
{
"suite": "080_gpu_accuracy.yaml",
"image_type": "gpu"
},
{
"suite": "080_search_stability.yaml",
"image_type": "gpu"
},
{
"suite": "gpu_accuracy_ann.yaml",
"image_type": "gpu"
}
]
},
{
"server": "poseidon",
"suite_params": [
{
"suite": "080_gpu_search.yaml",
"image_type": "gpu"
},
{
"suite": "080_cpu_search.yaml",
"image_type": "cpu"
},
{
"suite": "080_gpu_build.yaml",
"image_type": "gpu"
},
{
"suite": "080_cpu_accuracy.yaml",
"image_type": "cpu"
},
{
"suite": "locust_search.yaml",
"image_type": "cpu"
}
]
},
{
"server": "apollo",
"suite_params": [
{
"suite": "cpu_accuracy_ann.yaml",
"image_type": "cpu"
},
{
"suite": "080_cpu_build.yaml",
"image_type": "cpu"
},
{
"suite": "080_insert_performance.yaml",
"image_type": "cpu"
},
{
"suite": "add_flush_performance.yaml",
"image_type": "cpu"
}
]
}
]

View File

@ -0,0 +1,65 @@
[
{
"server": "athena",
"suite_params": [
{
"suite": "011_gpu_accuracy.yaml",
"image_type": "gpu"
},
{
"suite": "011_search_stability.yaml",
"image_type": "gpu"
},
{
"suite": "011_gpu_accuracy_ann.yaml",
"image_type": "gpu"
}
]
},
{
"server": "poseidon",
"suite_params": [
{
"suite": "011_gpu_search.yaml",
"image_type": "gpu"
},
{
"suite": "011_cpu_search.yaml",
"image_type": "cpu"
},
{
"suite": "011_gpu_build.yaml",
"image_type": "gpu"
},
{
"suite": "011_cpu_accuracy.yaml",
"image_type": "cpu"
},
{
"suite": "011_locust_search.yaml",
"image_type": "cpu"
}
]
},
{
"server": "apollo",
"suite_params": [
{
"suite": "cpu_accuracy_ann.yaml",
"image_type": "cpu"
},
{
"suite": "011_cpu_build.yaml",
"image_type": "cpu"
},
{
"suite": "011_insert_performance.yaml",
"image_type": "cpu"
},
{
"suite": "011_add_flush_performance.yaml",
"image_type": "cpu"
}
]
}
]

View File

@ -5,6 +5,14 @@
{
"suite": "080_gpu_accuracy.yaml",
"image_type": "gpu"
},
{
"suite": "080_search_stability.yaml",
"image_type": "gpu"
},
{
"suite": "gpu_accuracy_ann.yaml",
"image_type": "gpu"
}
]
},
@ -28,7 +36,7 @@
"image_type": "cpu"
},
{
"suite": "080_cpu_build.yaml",
"suite": "locust_search.yaml",
"image_type": "cpu"
}
]
@ -41,23 +49,17 @@
"image_type": "cpu"
},
{
"suite": "080_cpu_search_stability.yaml",
"suite": "080_cpu_build.yaml",
"image_type": "cpu"
},
{
"suite": "080_insert_performance.yaml",
"image_type": "cpu"
},
{
"suite": "add_flush_performance.yaml",
"image_type": "cpu"
}
]
},
{
"server": "eros",
"suite_params": [
{
"suite": "gpu_accuracy_ann.yaml",
"image_type": "gpu"
},
{
"suite": "080_gpu_stability.yaml",
"image_type": "gpu"
}
]
}
]

View File

@ -1,11 +0,0 @@
[
{
"server": "eros",
"suite_params": [
{
"suite": "070_ann.yaml",
"image_type": "gpu"
}
]
}
]

View File

@ -3,11 +3,7 @@
"server": "apollo",
"suite_params": [
{
"suite": "070_insert_10m.yaml",
"image_type": "cpu"
},
{
"suite": "070_cpu_build.yaml",
"suite": "080_get_id.yaml",
"image_type": "cpu"
}
]

View File

@ -1,9 +1,9 @@
[
{
"server": "athena",
"server": "eros",
"suite_params": [
{
"suite": "add_flush_performance.yaml",
"suite": "cpu_accuracy_ann_debug.yaml",
"image_type": "cpu"
}
]

View File

@ -1,12 +0,0 @@
[
{
"server": "poseidon",
"suite_params": [
{
"suite": "070_gpu_build.yaml",
"image_type": "gpu"
}
]
}
]

View File

@ -1,11 +0,0 @@
[
{
"server": "eros",
"suite_params": [
{
"suite": "070_stability.yaml",
"image_type": "gpu"
}
]
}
]

View File

@ -3,8 +3,12 @@
"server": "poseidon",
"suite_params": [
{
"suite": "debug.yaml",
"suite": "080_gpu_build_debug.yaml",
"image_type": "gpu"
},
{
"suite": "080_cpu_search_debug.yaml",
"image_type": "cpu"
}
]
}

View File

@ -3,8 +3,8 @@
"server": "eros",
"suite_params": [
{
"suite": "080_gpu_search_id.yaml",
"image_type": "gpu"
"suite": "010_locust_search.yaml",
"image_type": "cpu"
}
]
}

View File

@ -1,10 +1,9 @@
[
{
"server": "poseidon",
"suite_params": [
{
"suite": "insert_performance.yaml",
"image_type": "gpu"
"suite": "080_insert_performance.yaml",
"image_type": "cpu"
}
]
}

View File

@ -3,7 +3,7 @@
"server": "poseidon",
"suite_params": [
{
"suite": "crud_add.yaml",
"suite": "locust_search.yaml",
"image_type": "cpu"
}
]

View File

@ -3,9 +3,9 @@
"server": "athena",
"suite_params": [
{
"suite": "crud_search.yaml",
"suite": "locust_mix.yaml",
"image_type": "gpu"
}
]
}
]
]

View File

@ -3,9 +3,9 @@
"server": "athena",
"suite_params": [
{
"suite": "070_gpu_search.yaml",
"suite": "locust_search.yaml",
"image_type": "gpu"
}
]
}
]
]

View File

@ -1,9 +1,9 @@
[
{
"server": "apollo",
"server": "eros",
"suite_params": [
{
"suite": "crud_add_flush.yaml",
"suite": "loop_stability.yaml",
"image_type": "cpu"
}
]

View File

@ -3,23 +3,7 @@
"server": "poseidon",
"suite_params": [
{
"suite": "080_gpu_search.yaml",
"image_type": "gpu"
},
{
"suite": "080_cpu_search.yaml",
"image_type": "cpu"
},
{
"suite": "080_gpu_build.yaml",
"image_type": "gpu"
},
{
"suite": "080_cpu_accuracy.yaml",
"image_type": "cpu"
},
{
"suite": "080_cpu_build.yaml",
"suite": "010_locust_search.yaml",
"image_type": "cpu"
}
]

View File

@ -1,10 +1,10 @@
[
{
"server": "eros",
"server": "poseidon",
"suite_params": [
{
"suite": "debug.yaml",
"image_type": "cpu"
"image_type": "gpu"
}
]
}

View File

@ -0,0 +1,18 @@
[
{
"suite_params": [
{
"suite": "shards_insert_performance.yaml",
"image_type": "cpu"
},
{
"suite": "shards_ann_debug.yaml",
"image_type": "cpu"
},
{
"suite": "shards_loop_stability.yaml",
"image_type": "cpu"
}
]
}
]

View File

@ -1,9 +1,8 @@
[
{
"server": "poseidon",
"suite_params": [
{
"suite": "cpu_accuracy_ann_crud_debug.yaml",
"suite": "shards_ann_debug.yaml",
"image_type": "cpu"
}
]

View File

@ -0,0 +1,10 @@
[
{
"suite_params": [
{
"suite": "shards_loop_stability.yaml",
"image_type": "cpu"
}
]
}
]

View File

@ -1,86 +0,0 @@
[
{
"job_name": "milvus-apollo",
"build_params": {
"SUITE": "cpu_accuracy_ann.yaml",
"IMAGE_TYPE": "cpu",
"IMAGE_VERSION": "master",
"SERVER_HOST": "apollo"
}
},
{
"job_name": "milvus-apollo",
"build_params": {
"SUITE": "cpu_stability_sift50m.yaml",
"IMAGE_TYPE": "cpu",
"IMAGE_VERSION": "master",
"SERVER_HOST": "apollo"
}
},
{
"job_name": "milvus-eros",
"build_params": {
"SUITE": "gpu_accuracy_ann.yaml",
"IMAGE_TYPE": "gpu",
"IMAGE_VERSION": "master",
"SERVER_HOST": "eros"
}
},
{
"job_name": "milvus-eros",
"build_params": {
"SUITE": "gpu_search_stability.yaml",
"IMAGE_TYPE": "gpu",
"IMAGE_VERSION": "master",
"SERVER_HOST": "eros"
}
},
{
"job_name": "milvus-eros",
"build_params": {
"SUITE": "gpu_build_performance.yaml",
"IMAGE_TYPE": "gpu",
"IMAGE_VERSION": "master",
"SERVER_HOST": "eros"
}
},
{
"job_name": "milvus-poseidon",
"build_params": {
"SUITE": "gpu_search_performance.yaml",
"IMAGE_TYPE": "gpu",
"IMAGE_VERSION": "master",
"SERVER_HOST": "poseidon"
}
},
{
"job_name": "milvus-poseidon",
"build_params": {
"SUITE": "cpu_search_performance.yaml",
"IMAGE_TYPE": "cpu",
"IMAGE_VERSION": "master",
"SERVER_HOST": "poseidon"
}
},
{
"job_name": "milvus-poseidon",
"build_params": {
"SUITE": "insert_performance.yaml",
"IMAGE_TYPE": "gpu",
"IMAGE_VERSION": "master",
"SERVER_HOST": "poseidon"
}
},
{
"job_name": "milvus-poseidon",
"build_params": {
"SUITE": "gpu_accuracy.yaml",
"IMAGE_TYPE": "gpu",
"IMAGE_VERSION": "master",
"SERVER_HOST": "poseidon"
}
}
]

View File

@ -1,28 +0,0 @@
import json
from pprint import pprint
import jenkins
JENKINS_URL = "****"
server = jenkins.Jenkins(JENKINS_URL, username='****', password='****')
user = server.get_whoami()
version = server.get_version()
print('Hello %s from Jenkins %s' % (user['fullName'], version))
# print(job_config)
# build_params = {
# "SUITE": "gpu_accuracy_ann_debug.yaml",
# "IMAGE_TYPE": "cpu",
# "IMAGE_VERSION": "tanimoto_distance",
# "SERVER_HOST": "eros"
# }
# print(server.build_job(job_name, build_params))
with open("default_config.json") as json_file:
data = json.load(json_file)
for config in data:
build_params = config["build_params"]
job_name = config["job_name"]
res = server.build_job(job_name, build_params)
print(job_name, res)

View File

@ -0,0 +1,37 @@
import random, string
from locust import User, task, between
from locust_task import MilvusTask
from client import MilvusClient
connection_type = "single"
host = "192.168.1.29"
port = 19531
collection_name = "sift_128_euclidean"
dim = 128
m = MilvusClient(host=host, port=port, collection_name=collection_name)
class QueryTask(User):
wait_time = between(0.001, 0.002)
print("in query task")
if connection_type == "single":
client = MilvusTask(m=m)
else:
client = MilvusTask(host=host, port=port, collection_name=collection_name)
# @task()
# def query(self):
# top_k = 10
# X = [[random.random() for i in range(dim)] for i in range(1)]
# search_param = {"nprobe": 16}
# self.client.query(X, top_k, search_param)
@task(1)
def test_create(self):
tag_name = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(8))
self.client.create_partition(tag_name)
@task(1)
def test_drop(self):
tags = m.list_partitions()
tag = random.choice(tags)
self.client.drop_partition(tag.tag)

View File

@ -2,11 +2,11 @@ ann_accuracy:
collections:
-
server:
cache_config.cpu_cache_capacity: 16
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: false
gpu_resource_config.cache_capacity: 4
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
@ -15,8 +15,32 @@ ann_accuracy:
- gpu1
source_file: /test/milvus/ann_hdf5/sift-128-euclidean.hdf5
collection_name: sift_128_euclidean
index_file_sizes: [1024]
index_types: ['flat', 'ivf_flat', 'ivf_sq8']
index_file_sizes: [50, 1024]
index_types: ['flat']
index_params:
nlist: [16384]
top_ks: [10]
nqs: [10000]
search_params:
nprobe: [1, 512, 16384]
-
server:
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: false
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
gpu_resource_config.build_index_resources:
- gpu0
- gpu1
source_file: /test/milvus/ann_hdf5/sift-128-euclidean.hdf5
collection_name: sift_128_euclidean
index_file_sizes: [50, 1024]
index_types: ['ivf_flat', 'ivf_sq8']
index_params:
nlist: [16384]
top_ks: [10]
@ -26,11 +50,11 @@ ann_accuracy:
-
server:
cache_config.cpu_cache_capacity: 16
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: false
gpu_resource_config.cache_capacity: 4
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
@ -39,7 +63,7 @@ ann_accuracy:
- gpu1
source_file: /test/milvus/ann_hdf5/sift-128-euclidean.hdf5
collection_name: sift_128_euclidean
index_file_sizes: [1024]
index_file_sizes: [50, 1024]
index_types: ['ivf_pq']
index_params:
nlist: [16384]
@ -51,11 +75,11 @@ ann_accuracy:
-
server:
cache_config.cpu_cache_capacity: 16
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: false
gpu_resource_config.cache_capacity: 4
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
@ -64,7 +88,7 @@ ann_accuracy:
- gpu1
source_file: /test/milvus/ann_hdf5/sift-128-euclidean.hdf5
collection_name: sift_128_euclidean
index_file_sizes: [1024]
index_file_sizes: [50, 1024]
index_types: ['annoy']
index_params:
n_trees: [8, 32]
@ -75,11 +99,11 @@ ann_accuracy:
-
server:
cache_config.cpu_cache_capacity: 16
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: true
gpu_resource_config.cache_capacity: 4
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
@ -88,7 +112,7 @@ ann_accuracy:
- gpu1
source_file: /test/milvus/ann_hdf5/sift-128-euclidean.hdf5
collection_name: sift_128_euclidean
index_file_sizes: [256]
index_file_sizes: [50, 256]
index_types: ['hnsw']
index_params:
M: [16]
@ -100,11 +124,11 @@ ann_accuracy:
-
server:
cache_config.cpu_cache_capacity: 16
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: false
gpu_resource_config.cache_capacity: 4
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
@ -113,8 +137,32 @@ ann_accuracy:
- gpu1
source_file: /test/milvus/ann_hdf5/glove-200-angular.hdf5
collection_name: glove_200_angular
index_file_sizes: [1024]
index_types: ['flat', 'ivf_flat', 'ivf_sq8']
index_file_sizes: [50, 1024]
index_types: ['flat']
index_params:
nlist: [16384]
top_ks: [10]
nqs: [10000]
search_params:
nprobe: [1, 512, 16384]
-
server:
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: false
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
gpu_resource_config.build_index_resources:
- gpu0
- gpu1
source_file: /test/milvus/ann_hdf5/glove-200-angular.hdf5
collection_name: glove_200_angular
index_file_sizes: [50, 1024]
index_types: ['ivf_flat', 'ivf_sq8']
index_params:
nlist: [16384]
top_ks: [10]
@ -124,11 +172,11 @@ ann_accuracy:
-
server:
cache_config.cpu_cache_capacity: 16
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: false
gpu_resource_config.cache_capacity: 4
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
@ -137,7 +185,7 @@ ann_accuracy:
- gpu1
source_file: /test/milvus/ann_hdf5/glove-200-angular.hdf5
collection_name: glove_200_angular
index_file_sizes: [1024]
index_file_sizes: [50, 1024]
index_types: ['ivf_pq']
index_params:
nlist: [16384]
@ -149,11 +197,11 @@ ann_accuracy:
-
server:
cache_config.cpu_cache_capacity: 16
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: false
gpu_resource_config.cache_capacity: 4
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
@ -162,7 +210,7 @@ ann_accuracy:
- gpu1
source_file: /test/milvus/ann_hdf5/glove-200-angular.hdf5
collection_name: glove_200_angular
index_file_sizes: [256]
index_file_sizes: [50, 256]
index_types: ['hnsw']
index_params:
M: [36]

View File

@ -2,11 +2,11 @@ ann_accuracy:
collections:
-
server:
cache_config.cpu_cache_capacity: 16
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: true
gpu_resource_config.cache_capacity: 4
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
@ -15,22 +15,22 @@ ann_accuracy:
- gpu1
source_file: /test/milvus/ann_hdf5/sift-128-euclidean.hdf5
collection_name: sift_128_euclidean
index_file_sizes: [1024]
index_file_sizes: [50, 1024]
index_types: ['flat', 'ivf_flat', 'ivf_sq8', 'ivf_sq8h']
index_params:
nlist: [16384]
top_ks: [10]
nqs: [10000]
search_params:
nprobe: [1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
nprobe: [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 2048]
-
server:
cache_config.cpu_cache_capacity: 16
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: true
gpu_resource_config.cache_capacity: 4
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
@ -39,7 +39,7 @@ ann_accuracy:
- gpu1
source_file: /test/milvus/ann_hdf5/sift-128-euclidean.hdf5
collection_name: sift_128_euclidean
index_file_sizes: [1024]
index_file_sizes: [50, 1024]
index_types: ['ivf_pq']
index_params:
nlist: [16384]
@ -51,11 +51,11 @@ ann_accuracy:
-
server:
cache_config.cpu_cache_capacity: 16
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: true
gpu_resource_config.cache_capacity: 4
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
@ -64,7 +64,7 @@ ann_accuracy:
- gpu1
source_file: /test/milvus/ann_hdf5/sift-128-euclidean.hdf5
collection_name: sift_128_euclidean
index_file_sizes: [256]
index_file_sizes: [50, 256]
index_types: ['hnsw']
index_params:
M: [16]
@ -76,11 +76,11 @@ ann_accuracy:
-
server:
cache_config.cpu_cache_capacity: 16
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: true
gpu_resource_config.cache_capacity: 4
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
@ -89,22 +89,22 @@ ann_accuracy:
- gpu1
source_file: /test/milvus/ann_hdf5/glove-200-angular.hdf5
collection_name: glove_200_angular
index_file_sizes: [1024]
index_file_sizes: [50, 1024]
index_types: ['flat', 'ivf_flat', 'ivf_sq8', 'ivf_sq8h']
index_params:
nlist: [16384]
top_ks: [10]
nqs: [10000]
search_params:
nprobe: [1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
nprobe: [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 2048]
-
server:
cache_config.cpu_cache_capacity: 16
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: true
gpu_resource_config.cache_capacity: 4
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
@ -113,7 +113,7 @@ ann_accuracy:
- gpu1
source_file: /test/milvus/ann_hdf5/glove-200-angular.hdf5
collection_name: glove_200_angular
index_file_sizes: [256]
index_file_sizes: [50, 256]
index_types: ['hnsw']
index_params:
M: [36]
@ -125,11 +125,36 @@ ann_accuracy:
-
server:
cache_config.cpu_cache_capacity: 16
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: true
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
gpu_resource_config.build_index_resources:
- gpu0
- gpu1
source_file: /test/milvus/ann_hdf5/glove-200-angular.hdf5
collection_name: glove_200_angular
index_file_sizes: [50, 1024]
index_types: ['ivf_pq']
index_params:
nlist: [16384]
m: [20]
top_ks: [10]
nqs: [10000]
search_params:
nprobe: [1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
-
server:
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: false
gpu_resource_config.cache_capacity: 4
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
@ -138,7 +163,7 @@ ann_accuracy:
- gpu1
source_file: /test/milvus/ann_hdf5/kosarak-27983-jaccard.hdf5
collection_name: kosarak_27984_jaccard
index_file_sizes: [1024]
index_file_sizes: [50, 1024]
index_types: ['flat', 'ivf_flat']
index_params:
nlist: [2048]
@ -149,11 +174,11 @@ ann_accuracy:
-
server:
cache_config.cpu_cache_capacity: 16
cache_config.cpu_cache_capacity: 16GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: false
gpu_resource_config.cache_capacity: 4
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
@ -162,7 +187,7 @@ ann_accuracy:
- gpu1
source_file: /test/milvus/ann_hdf5/sift-256-hamming.hdf5
collection_name: sift_256_hamming
index_file_sizes: [1024]
index_file_sizes: [50, 1024]
index_types: ['flat', 'ivf_flat']
index_params:
nlist: [2048]

View File

@ -1,8 +1,4 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
__true_print = print # noqa
import os
import sys
import pdb
@ -15,22 +11,62 @@ import logging
import string
import random
# import multiprocessing
# import numpy
import numpy as np
# import psutil
import sklearn.preprocessing
import h5py
# import docker
from yaml import full_load, dump
import yaml
import tableprint as tp
from pprint import pprint
logger = logging.getLogger("milvus_benchmark.utils")
MULTI_DB_SLAVE_PATH = "/opt/milvus/data2;/opt/milvus/data3"
REGISTRY_URL = "registry.zilliz.com/milvus/engine"
def get_unique_name():
return "benchmark-test-"+"".join(random.choice(string.ascii_letters + string.digits) for _ in range(8)).lower()
class literal_str(str): pass
def change_style(style, representer):
def new_representer(dumper, data):
scalar = representer(dumper, data)
scalar.style = style
return scalar
return new_representer
from yaml.representer import SafeRepresenter
# represent_str does handle some corner cases, so use that
# instead of calling represent_scalar directly
represent_literal_str = change_style('|', SafeRepresenter.represent_str)
yaml.add_representer(literal_str, represent_literal_str)
def normalize(metric_type, X):
"""
Normalize the vectors.
If type equals ip, using sklearn.preprocessing.normalize to convert it
"""
if metric_type == "ip":
logger.info("Set normalize for metric_type: %s" % metric_type)
X = sklearn.preprocessing.normalize(X, axis=1, norm='l2')
X = X.tolist()
elif metric_type in ["jaccard", "hamming", "sub", "super"]:
tmp = []
for _, item in enumerate(X):
new_vector = bytes(np.packbits(item, axis=-1).tolist())
tmp.append(new_vector)
X = tmp
return X
def get_unique_name(prefix=None):
if prefix is None:
prefix = "milvus-benchmark-test-"
return prefix+"".join(random.choice(string.ascii_letters + string.digits) for _ in range(8)).lower()
def get_current_time():
@ -74,9 +110,8 @@ def modify_config(k, v, type=None, file_path="conf/server_config.yaml", db_slave
config_dict['gpu_resource_config']['build_index_resources'] = v
elif k.find("search_resources") != -1:
config_dict['resource_config']['resources'] = v
if db_slave:
config_dict['db_config']['db_slave_path'] = MULTI_DB_SLAVE_PATH
# if db_slave:
# config_dict['db_config']['db_slave_path'] = MULTI_DB_SLAVE_PATH
with open(file_path, 'w') as f:
dump(config_dict, f, default_flow_style=False)
f.close()
@ -116,7 +151,7 @@ def update_server_config(file_path, server_config):
# update values.yaml
def update_values(file_path, hostname, server_config):
def update_values(file_path, deploy_mode, hostname, server_config):
from kubernetes import client, config
client.rest.logger.setLevel(logging.WARNING)
@ -137,64 +172,69 @@ def update_values(file_path, hostname, server_config):
for k, v in server_config.items():
if k.find("primary_path") != -1:
values_dict["primaryPath"] = v
values_dict['wal']['path'] = v+"/wal"
elif k.find("use_blas_threshold") != -1:
values_dict['useBLASThreshold'] = int(v)
suffix_path = server_config["suffix_path"] if "suffix_path" in server_config else None
path_value = v
if suffix_path:
path_value = v + "_" + str(int(time.time()))
values_dict["primaryPath"] = path_value
values_dict['wal']['path'] = path_value+"/wal"
values_dict['logs']['path'] = path_value+"/logs"
# elif k.find("use_blas_threshold") != -1:
# values_dict['useBLASThreshold'] = int(v)
elif k.find("gpu_search_threshold") != -1:
values_dict['gpuSearchThreshold'] = int(v)
values_dict['gpu']['gpuSearchThreshold'] = int(v)
elif k.find("cpu_cache_capacity") != -1:
values_dict['cpuCacheCapacity'] = int(v)
elif k.find("cache_insert_data") != -1:
values_dict['cacheInsertData'] = v
values_dict['cache']['cacheSize'] = v
# elif k.find("cache_insert_data") != -1:
# values_dict['cache']['cacheInsertData'] = v
elif k.find("insert_buffer_size") != -1:
values_dict['insertBufferSize'] = v
values_dict['cache']['insertBufferSize'] = v
elif k.find("gpu_resource_config.enable") != -1:
values_dict['gpu']['enabled'] = v
elif k.find("gpu_resource_config.cache_capacity") != -1:
values_dict['gpu']['cacheCapacity'] = int(v)
values_dict['gpu']['cacheSize'] = v
elif k.find("build_index_resources") != -1:
values_dict['gpu']['buildIndexResources'] = v
elif k.find("search_resources") != -1:
values_dict['gpu']['searchResources'] = v
values_dict['gpu']['buildIndexDevices'] = v
elif k.find("search_resources") != -1:
values_dict['gpu']['searchDevices'] = v
# wal
elif k.find("auto_flush_interval") != -1:
values_dict['autoFlushInterval'] = v
values_dict['storage']['autoFlushInterval'] = v
elif k.find("wal_enable") != -1:
values_dict['wal']['enabled'] = v
# if values_dict['nodeSelector']:
# logger.warning("nodeSelector has been set: %s" % str(values_dict['engine']['nodeSelector']))
# return
values_dict["wal"]["ignoreErrorLog"] = True
values_dict["wal"]["recoveryErrorIgnore"] = True
# enable monitor
values_dict["metrics"]["enabled"] = True
values_dict["metrics"]["address"] = "192.168.1.237"
values_dict["metrics"]["port"] = 9091
# Using sqlite for single mode
if deploy_mode == "single":
values_dict["mysql"]["enabled"] = False
# update values.yaml with the given host
values_dict['nodeSelector'] = {'kubernetes.io/hostname': hostname}
# Using sqlite
values_dict["mysql"]["enabled"] = False
config.load_kube_config()
v1 = client.CoreV1Api()
# node = v1.read_node(hostname)
cpus = v1.read_node(hostname).status.allocatable.get("cpu")
# DEBUG
# set limit/request cpus in resources
values_dict['resources'] = {
"limits": {
"cpu": str(int(cpus))+".0"
},
"requests": {
"cpu": str(int(cpus)-1)+".0"
if hostname:
config.load_kube_config()
v1 = client.CoreV1Api()
values_dict['nodeSelector'] = {'kubernetes.io/hostname': hostname}
# node = v1.read_node(hostname)
cpus = v1.read_node(hostname).status.allocatable.get("cpu")
# set limit/request cpus in resources
values_dict["image"]['resources'] = {
"limits": {
"cpu": str(int(cpus))+".0"
},
"requests": {
"cpu": str(int(cpus)-1)+".0"
}
}
}
values_dict['tolerations'] = [{
"key": "worker",
"operator": "Equal",
"value": "performance",
"effect": "NoSchedule"
values_dict['tolerations'] = [{
"key": "worker",
"operator": "Equal",
"value": "performance",
"effect": "NoSchedule"
}]
# add extra volumes
values_dict['extraVolumes'] = [{
@ -215,51 +255,102 @@ def update_values(file_path, hostname, server_config):
'name': 'test',
'mountPath': '/test'
}]
# add extra volumes for mysql
# values_dict['mysql']['persistence']['enabled'] = True
# values_dict['mysql']['configurationFilesPath'] = "/etc/mysql/mysql.conf.d/"
# values_dict['mysql']['imageTag'] = '5.6'
# values_dict['mysql']['securityContext'] = {
# 'enabled': True}
# mysql_db_path = "/test"
if deploy_mode == "shards":
# mount_path = values_dict["primaryPath"]+'/data'
# long_str = '- name: test-mysql\n flexVolume:\n driver: fstab/cifs\n fsType: cifs\n secretRef:\n name: cifs-test-secret\n options:\n networkPath: //192.168.1.126/test\n mountOptions: vers=1.0'
# values_dict['mysql']['extraVolumes'] = literal_str(long_str)
# long_str_2 = "- name: test-mysql\n mountPath: %s" % mysql_db_path
# values_dict['mysql']['extraVolumeMounts'] = literal_str(long_str_2)
# mysql_cnf_str = '[mysqld]\npid-file=%s/mysql.pid\ndatadir=%s' % (mount_path, mount_path)
# values_dict['mysql']['configurationFiles'] = {}
# values_dict['mysql']['configurationFiles']['mysqld.cnf'] = literal_str(mysql_cnf_str)
values_dict['mysql']['enabled'] = False
values_dict['externalMysql']['enabled'] = True
values_dict['externalMysql']["ip"] = "192.168.1.197"
values_dict['externalMysql']["port"] = 3306
values_dict['externalMysql']["user"] = "root"
values_dict['externalMysql']["password"] = "Fantast1c"
values_dict['externalMysql']["database"] = "db"
logger.debug(values_dict)
# print(dump(values_dict))
with open(file_path, 'w') as f:
dump(values_dict, f, default_flow_style=False)
f.close()
# DEBUG
with open(file_path) as f:
for line in f.readlines():
line = line.strip("\n")
logger.debug(line)
# deploy server
def helm_install_server(helm_path, image_tag, image_type, name, namespace):
def helm_install_server(helm_path, deploy_mode, image_tag, image_type, name, namespace):
"""Deploy server with using helm.
"""
from kubernetes import client, config
client.rest.logger.setLevel(logging.WARNING)
timeout = 300
install_cmd = "helm install --wait --timeout %ds \
--set image.repository=%s \
--set image.tag=%s \
--set image.pullPolicy=Always \
--set service.type=ClusterIP \
-f ci/filebeat/values.yaml \
--namespace %s \
%s ." % (timeout, REGISTRY_URL, image_tag, namespace, name)
logger.debug("Server deploy mode: %s" % deploy_mode)
host = "%s.%s.svc.cluster.local" % (name, namespace)
if deploy_mode == "single":
install_cmd = "helm install --wait --timeout %ds \
--set image.repository=%s \
--set image.tag=%s \
--set image.pullPolicy=Always \
--set service.type=ClusterIP \
-f ci/filebeat/values.yaml \
--namespace %s \
%s ." % (timeout, REGISTRY_URL, image_tag, namespace, name)
elif deploy_mode == "shards":
install_cmd = "helm install --wait --timeout %ds \
--set cluster.enabled=true \
--set persistence.enabled=true \
--set mishards.image.tag=test \
--set mishards.image.pullPolicy=Always \
--set image.repository=%s \
--set image.tag=%s \
--set image.pullPolicy=Always \
--set service.type=ClusterIP \
-f ci/filebeat/values.yaml \
--namespace %s \
%s ." % (timeout, REGISTRY_URL, image_tag, namespace, name)
logger.debug(install_cmd)
logger.debug(host)
if os.system("cd %s && %s" % (helm_path, install_cmd)):
logger.error("Helm install failed")
return None
time.sleep(5)
config.load_kube_config()
v1 = client.CoreV1Api()
host = "%s.%s.svc.cluster.local" % (name, namespace)
logger.debug(host)
pod_name = None
pod_id = None
pods = v1.list_namespaced_pod(namespace)
for i in pods.items:
if i.metadata.name.find(name) != -1:
pod_name = i.metadata.name
pod_ip = i.status.pod_ip
logger.debug(pod_name)
logger.debug(pod_ip)
return pod_name, pod_ip
# config.load_kube_config()
# v1 = client.CoreV1Api()
# pod_name = None
# pod_id = None
# pods = v1.list_namespaced_pod(namespace)
# for i in pods.items:
# if i.metadata.name.find(name) != -1:
# pod_name = i.metadata.name
# pod_ip = i.status.pod_ip
# logger.debug(pod_name)
# logger.debug(pod_ip)
# return pod_name, pod_ip
return host
# delete server
def helm_del_server(name, namespace):
# del_cmd = "helm uninstall -n milvus benchmark-test-gzelwvgk"
# os.system(del_cmd)
"""
Delete server with using helm uninstall.
Return status if uninstall successfully or not
"""
# logger.debug("Sleep 600s before uninstall server")
# time.sleep(600)
del_cmd = "helm uninstall -n milvus %s" % name
logger.debug(del_cmd)
if os.system(del_cmd):
@ -268,6 +359,81 @@ def helm_del_server(name, namespace):
return True
def restart_server(helm_release_name, namespace):
res = True
timeout = 120
from kubernetes import client, config
client.rest.logger.setLevel(logging.WARNING)
# service_name = "%s.%s.svc.cluster.local" % (helm_release_name, namespace)
config.load_kube_config()
v1 = client.CoreV1Api()
pod_name = None
# config_map_names = v1.list_namespaced_config_map(namespace, pretty='true')
# body = {"replicas": 0}
pods = v1.list_namespaced_pod(namespace)
for i in pods.items:
if i.metadata.name.find(helm_release_name) != -1 and i.metadata.name.find("mysql") == -1:
pod_name = i.metadata.name
break
# v1.patch_namespaced_config_map(config_map_name, namespace, body, pretty='true')
# status_res = v1.read_namespaced_service_status(helm_release_name, namespace, pretty='true')
logger.debug("Pod name: %s" % pod_name)
if pod_name is not None:
try:
v1.delete_namespaced_pod(pod_name, namespace)
except Exception as e:
logging.error(str(e))
logging.error("Exception when calling CoreV1Api->delete_namespaced_pod")
res = False
return res
logging.error("Sleep 10s after pod deleted")
time.sleep(10)
# check if restart successfully
pods = v1.list_namespaced_pod(namespace)
for i in pods.items:
pod_name_tmp = i.metadata.name
logging.error(pod_name_tmp)
if pod_name_tmp == pod_name:
continue
elif pod_name_tmp.find(helm_release_name) == -1 or pod_name_tmp.find("mysql") != -1:
continue
else:
status_res = v1.read_namespaced_pod_status(pod_name_tmp, namespace, pretty='true')
logging.error(status_res.status.phase)
start_time = time.time()
while time.time() - start_time <= timeout:
logging.error(time.time())
status_res = v1.read_namespaced_pod_status(pod_name_tmp, namespace, pretty='true')
if status_res.status.phase == "Running":
logging.error("Running")
logging.error("Sleep after restart")
break
else:
time.sleep(1)
if time.time() - start_time > timeout:
logging.error("Restart pod: %s timeout" % pod_name_tmp)
res = False
return res
else:
raise Exception("Pod: %s not found" % pod_name)
follow = True
pretty = True
previous = True # bool | Return previous terminated container logs. Defaults to false. (optional)
since_seconds = 56 # int | A relative time in seconds before the current time from which to show logs. If this value precedes the time a pod was started, only logs since the pod start will be returned. If this value is in the future, no logs will be returned. Only one of sinceSeconds or sinceTime may be specified. (optional)
timestamps = True # bool | If true, add an RFC3339 or RFC3339Nano timestamp at the beginning of every line of log output. Defaults to false. (optional)
container = "milvus"
try:
api_response = v1.read_namespaced_pod_log(pod_name_tmp, namespace, container=container, follow=follow, pretty=pretty, previous=previous, since_seconds=since_seconds, timestamps=timestamps)
logging.error(api_response)
except Exception as e:
logging.error("Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n" % e)
res = False
return res
time.sleep(30)
return res
# def pull_image(image):
# registry = image.split(":")[0]
# image_tag = image.split(":")[1]
@ -355,7 +521,6 @@ def helm_del_server(name, namespace):
# # return container
# # elif exit_code is not None:
# # print(colors.color(container.logs().decode(), fg='red'))
# # raise Exception('Child process raised exception %s' % str(exit_code))
# def restart_server(container):
# client = docker.APIClient(base_url='unix://var/run/docker.sock')
@ -397,5 +562,4 @@ def helm_del_server(name, namespace):
if __name__ == '__main__':
# print(pull_image('branch-0.3.1-debug'))
stop_server()
update_values("","shards",None,None)