[skip ci] Update benchmark readme (#6765)

Signed-off-by: zhenwu <zhenxiang.li@zilliz.com>
pull/6779/head
del-zhenwu 2021-07-23 15:36:12 +08:00 committed by GitHub
parent aba21baf82
commit 68db0774b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 412 additions and 264 deletions

View File

@ -0,0 +1,30 @@
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
FROM python:3.6.8-jessie
SHELL ["/bin/bash", "-o", "pipefail", "-c"]
RUN apt-get update && apt-get install -y --no-install-recommends wget apt-transport-https && \
wget -qO- "https://get.helm.sh/helm-v3.0.2-linux-amd64.tar.gz" | tar --strip-components=1 -xz -C /usr/local/bin linux-amd64/helm && \
wget -P /tmp https://mirrors.aliyun.com/kubernetes/apt/doc/apt-key.gpg && \
apt-key add /tmp/apt-key.gpg && \
sh -c 'echo deb https://mirrors.aliyun.com/kubernetes/apt/ kubernetes-xenial main > /etc/apt/sources.list.d/kubernetes.list' && \
apt-get update && apt-get install -y --no-install-recommends \
build-essential kubectl && \
apt-get remove --purge -y && \
rm -rf /var/lib/apt/lists/*
COPY requirements.txt /requirements.txt
RUN python3 -m pip install -r /requirements.txt
WORKDIR /root

View File

@ -1,50 +1,45 @@
# Quick start
`milvus_benchmark` is a non-functional testing tool which allows users to run tests on k8s cluster or at local, the primary use case is performance/load/stability testing, the objective is to expose problems in milvus project.
## Quick start
### Description
- Test suites can be organized with `yaml `
- Test can run with local mode or argo/jenkins mode, that manage the server env in argo/jenkins step or stages
- Test cases in `milvus_benchmark` can be organized with `yaml`
- Test can run with local mode or helm mode
- local: install and start your local server, and pass the host/port param when start the tests
- helm: install the server by helm, which will manage the milvus in k8s cluster, and you can interagte the test stage into argo workflow or jenkins pipeline
### Demos
### Usage
1. Using argo pipeline
Run test suites(1.x, 2.x version) in argo workflows, innernal argo url: argo-test.zilliz.cc
1. Using jenkins:
Use `ci/main_jenkinsfile` as the jenkins pipeline file
2. Using argo
example argo workflow yaml configuration: `ci/argo.yaml`
3. Local test
2. Local test
Run test with the local server
1. set python path
1). set PYTHONPATH:
`export PYTHONPATH=/yourmilvusprojectpath/tests/milvus_benchmark`
2. (optional, for `sift`/`glove` open dataset) mount NAS or update `*_DATA_DIR` in `runner.py`
`export PYTHONPATH=/your/project/path/milvus_benchmark`
2). prepare data:
if we need to use the sift/deep dataset as the raw data input, we need to mount NAS and update `RAW_DATA_DIR` in `config.py`, the example mount command:
`sudo mount -t cifs -o username=test,vers=1.0 //172.16.70.249/test /test`
3. run test
`cd milvus-benchmark/`
3). install requirements:
`python3 main.py --local --host=*.* --port=19530 --suite=suites/2_insert_data.yaml`
`pip install -r requirements.txt`
4). write test yaml and run with the yaml param:
`cd milvus-benchmark/ && python main.py --local --host=* --port=19530 --suite=suites/2_insert_data.yaml`
### Definitions of test suites
### Definitions of test suite
Testers need to write test suite config if adding a customized test into the current test framework
Test suite yaml defines the test process, users need to write test suite yaml if adding a customized test into the current test framework.
The following are the searching performance test suite
Take the test file `2_insert_data.yaml` as an example, the top level is the test type: `insert_performance`, there are lots of test types including: `search_performance/build_performance/insert_performance/accuracy/locust_insert/...`, each test type corresponds to this different runner defined in directory `runnners`, the other parts in the test yaml is the params pass to the runner, such as the field `collection_name` means which kind of collection will be created in milvus.
1. insert_search_performance: the test typealso we have:
### Test result
`search_performance`,`build_performance`,`insert_performance`,`accuracy`,`stability`,`search_stability`
2. collections: list of test cases
3. The following fields are in the `collection` field
- milvus: milvus config
- collection_name: currently support one table
- ni_per: per count of insert
- index_type: index type
- index_param: param of index
- run_count: search count
- search_params: params of search_vectors
- top_ks: top_k of search
- nqs: nq of search
## Test result
Test result will be uploaded, which will be used to tell if the test run pass or failed
Test result will be uploaded if run with the helm mode, which will be used to judge if the test run pass or failed.

View File

@ -153,6 +153,16 @@ class MilvusClient(object):
except Exception as e:
logger.error(str(e))
@time_wrapper
def insert_flush(self, entities, _async=False, collection_name=None):
tmp_collection_name = self._collection_name if collection_name is None else collection_name
try:
insert_res = self._milvus.insert(tmp_collection_name, entities)
return insert_res.primary_keys
except Exception as e:
logger.error(str(e))
self._milvus.flush([tmp_collection_name], _async=_async)
def get_dimension(self):
info = self.get_info()
for field in info["fields"]:
@ -424,28 +434,28 @@ class MilvusClient(object):
self.drop(collection_name=name)
@time_wrapper
def load_collection(self, collection_name=None):
def load_collection(self, collection_name=None, timeout=3000):
if collection_name is None:
collection_name = self._collection_name
return self._milvus.load_collection(collection_name, timeout=3000)
return self._milvus.load_collection(collection_name, timeout=timeout)
@time_wrapper
def release_collection(self, collection_name=None):
def release_collection(self, collection_name=None, timeout=3000):
if collection_name is None:
collection_name = self._collection_name
return self._milvus.release_collection(collection_name, timeout=3000)
return self._milvus.release_collection(collection_name, timeout=timeout)
@time_wrapper
def load_partitions(self, tag_names, collection_name=None):
def load_partitions(self, tag_names, collection_name=None, timeout=3000):
if collection_name is None:
collection_name = self._collection_name
return self._milvus.load_partitions(collection_name, tag_names, timeout=3000)
return self._milvus.load_partitions(collection_name, tag_names, timeout=timeout)
@time_wrapper
def release_partitions(self, tag_names, collection_name=None):
def release_partitions(self, tag_names, collection_name=None, timeout=3000):
if collection_name is None:
collection_name = self._collection_name
return self._milvus.release_partitions(collection_name, tag_names, timeout=3000)
return self._milvus.release_partitions(collection_name, tag_names, timeout=timeout)
# TODO: remove
# def get_server_version(self):

View File

@ -6,10 +6,13 @@ JOB_COLLECTION = "jobs"
REGISTRY_URL = "registry.zilliz.com/milvus/milvus"
IDC_NAS_URL = "//172.16.70.249/test"
DEFAULT_IMAGE = "milvusdb/milvus:latest"
SERVER_HOST_DEFAULT = "127.0.0.1"
SERVER_PORT_DEFAULT = 19530
SERVER_VERSION = "2.0"
SERVER_VERSION = "2.0.0-RC3"
DEFUALT_DEPLOY_MODE = "single"
HELM_NAMESPACE = "milvus"
BRANCH = "master"
@ -22,9 +25,18 @@ RAW_DATA_DIR = "/test/milvus/raw_data/"
LOG_PATH = "/test/milvus/benchmark/logs/{}/".format(BRANCH)
DEFAULT_DEPLOY_MODE = "single"
SINGLE_DEPLOY_MODE = "single"
CLUSTER_DEPLOY_MODE = "cluster"
NAMESPACE = "milvus"
CHAOS_NAMESPACE = "chaos-testing"
DEFAULT_API_VERSION = 'chaos-mesh.org/v1alpha1'
DEFAULT_GROUP = 'chaos-mesh.org'
DEFAULT_VERSION = 'v1alpha1'
# minio config
MINIO_HOST = "milvus-test-minio.qa-milvus.svc.cluster.local"
MINIO_PORT = 9000
MINIO_ACCESS_KEY = "minioadmin"
MINIO_SECRET_KEY = "minioadmin"
MINIO_BUCKET_NAME = "test"

View File

@ -2,7 +2,7 @@ version: 1
disable_existing_loggers: False
formatters:
simple:
format: "%(asctime)s - %(name)s:%(lineno)s - %(levelname)s - %(message)s"
format: "[%(asctime)-15s] [%(levelname)8s] - %(message)s (%(name)s:%(lineno)s)"
handlers:
console:

View File

@ -1,14 +1,9 @@
import os
import sys
import time
from datetime import datetime
import pdb
import argparse
import logging
import traceback
from multiprocessing import Process
from queue import Queue
from logging import handlers
# from queue import Queue
from yaml import full_load, dump
from milvus_benchmark.metrics.models.server import Server
from milvus_benchmark.metrics.models.hardware import Hardware
@ -17,7 +12,7 @@ from milvus_benchmark.metrics.models.env import Env
from milvus_benchmark.env import get_env
from milvus_benchmark.runners import get_runner
from milvus_benchmark.metrics import api
from milvus_benchmark import config
from milvus_benchmark import config, utils
from milvus_benchmark import parser
# from scheduler import back_scheduler
from logs import log
@ -25,11 +20,7 @@ from logs import log
log.setup_logging()
logger = logging.getLogger("milvus_benchmark.main")
DEFAULT_IMAGE = "milvusdb/milvus:latest"
LOG_FOLDER = "logs"
NAMESPACE = "milvus"
SERVER_VERSION = "2.0"
q = Queue()
# q = Queue()
def positive_int(s):
@ -58,7 +49,7 @@ def run_suite(run_type, suite, env_mode, env_params):
try:
start_status = False
metric = api.Metric()
deploy_mode = env_params["deploy_mode"] if "deploy_mode" in env_params else config.DEFAULT_DEPLOY_MODE
deploy_mode = env_params["deploy_mode"]
env = get_env(env_mode, deploy_mode)
metric.set_run_id()
metric.set_mode(env_mode)
@ -67,6 +58,8 @@ def run_suite(run_type, suite, env_mode, env_params):
logger.info(env_params)
if env_mode == "local":
metric.hardware = Hardware("")
if "server_tag" in env_params and env_params["server_tag"]:
metric.hardware = Hardware("server_tag")
start_status = env.start_up(env_params["host"], env_params["port"])
elif env_mode == "helm":
helm_params = env_params["helm_params"]
@ -106,8 +99,8 @@ def run_suite(run_type, suite, env_mode, env_params):
case_metric.update_message(err_message)
suite_status = False
logger.debug(case_metric.metrics)
# if env_mode == "helm":
api.save(case_metric)
if deploy_mode:
api.save(case_metric)
if suite_status:
metric.update_status(status="RUN_SUCC")
else:
@ -120,7 +113,8 @@ def run_suite(run_type, suite, env_mode, env_params):
logger.error(traceback.format_exc())
metric.update_status(status="RUN_FAILED")
finally:
api.save(metric)
if deploy_mode:
api.save(metric)
# time.sleep(10)
env.tear_down()
if metric.status != "RUN_SUCC":
@ -161,6 +155,11 @@ def main():
metavar='FILE',
help='load test suite from FILE',
default='')
arg_parser.add_argument(
'--server-config',
metavar='FILE',
help='load server config from FILE',
default='')
args = arg_parser.parse_args()
@ -216,10 +215,20 @@ def main():
elif args.local:
# for local mode
deploy_params = args.server_config
deploy_params_dict = None
if deploy_params:
with open(deploy_params) as f:
deploy_params_dict = full_load(f)
f.close()
logger.debug(deploy_params_dict)
deploy_mode = utils.get_deploy_mode(deploy_params_dict)
server_tag = utils.get_server_tag(deploy_params_dict)
env_params = {
"host": args.host,
"port": args.port,
"deploy_mode": None
"deploy_mode": deploy_mode,
"server_tag": server_tag
}
suite_file = args.suite
with open(suite_file) as f:

View File

@ -34,6 +34,10 @@ class Metric(object):
def set_mode(self, mode):
self.mode = mode
# including: metric, suite_metric
def set_case_metric_type(self):
self._type = "case"
def json_md5(self):
json_str = json.dumps(vars(self), sort_keys=True)
return hashlib.md5(json_str.encode('utf-8')).hexdigest()

View File

@ -62,6 +62,7 @@ class AccuracyRunner(BaseRunner):
"params": search_param}
# TODO: only update search_info
case_metric = copy.deepcopy(self.metric)
case_metric.set_case_metric_type()
case_metric.search = {
"nq": nq,
"topk": top_k,
@ -168,6 +169,7 @@ class AccAccuracyRunner(AccuracyRunner):
"params": search_param}
# TODO: only update search_info
case_metric = copy.deepcopy(self.metric)
case_metric.set_case_metric_type()
case_metric.index = index_info
case_metric.search = {
"nq": nq,
@ -245,7 +247,7 @@ class AccAccuracyRunner(AccuracyRunner):
logger.info(self.milvus.describe_index(index_field_name))
logger.info("Start load collection: %s" % collection_name)
# self.milvus.release_collection()
self.milvus.load_collection()
self.milvus.load_collection(timeout=600)
logger.info("End load collection: %s" % collection_name)
def run_case(self, case_metric, **case_param):

View File

@ -28,7 +28,8 @@ class BaseRunner(object):
pass
def stop(self):
logger.debug("Start clean up env: {} in runner".format(self.env.name))
logger.debug("Stop runner...")
pass
@property
def hostname(self):
@ -54,10 +55,11 @@ class BaseRunner(object):
def run_as_group(self):
return self._run_as_group
def init_metric(self, name, collection_info=None, index_info=None, search_info=None, run_params=None):
def init_metric(self, name, collection_info=None, index_info=None, search_info=None, run_params=None, t="metric"):
self._metric.collection = collection_info
self._metric.index = index_info
self._metric.search = search_info
self._metric.type = t
self._metric.run_params = run_params
self._metric.metrics = {
"type": name,

View File

@ -41,6 +41,7 @@ class BuildRunner(BaseRunner):
flush = False
self.init_metric(self.name, collection_info, index_info, search_info=None)
case_metric = copy.deepcopy(self.metric)
case_metric.set_case_metric_type()
case_metrics = list()
case_params = list()
case_metrics.append(case_metric)

View File

@ -71,6 +71,7 @@ class SimpleChaosRunner(BaseRunner):
}]
self.init_metric(self.name, {}, {}, None)
case_metric = copy.deepcopy(self.metric)
case_metric.set_case_metric_type()
case_metrics.append(case_metric)
return case_params, case_metrics

View File

@ -53,6 +53,7 @@ class GetRunner(BaseRunner):
for ids_length in ids_length_list:
ids = get_ids(ids_length, collection_size)
case_metric = copy.deepcopy(self.metric)
case_metric.set_case_metric_type()
case_params = list()
case_metric.run_params = {"ids_length": ids_length}
case_metrics.append(case_metric)

View File

@ -48,6 +48,7 @@ class InsertRunner(BaseRunner):
flush = False
self.init_metric(self.name, collection_info, index_info, None)
case_metric = copy.deepcopy(self.metric)
case_metric.set_case_metric_type()
case_metrics = list()
case_params = list()
case_metrics.append(case_metric)

View File

@ -21,11 +21,10 @@ class LocustRunner(BaseRunner):
connection_type = case_param["connection_type"]
# spawn locust requests
clients_num = task["clients_num"]
hatch_rate = task["hatch_rate"]
during_time = utils.timestr_to_int(task["during_time"])
task["during_time"] = utils.timestr_to_int(task["during_time"])
task_types = task["types"]
run_params = {"tasks": {}, "clients_num": clients_num, "spawn_rate": hatch_rate, "during_time": during_time}
run_params = {"tasks": {}}
run_params.update(task)
info_in_params = {
"index_field_name": case_param["index_field_name"],
"vector_field_name": case_param["vector_field_name"],
@ -95,6 +94,7 @@ class LocustInsertRunner(LocustRunner):
}
self.init_metric(self.name, collection_info, index_info, None, run_params)
case_metric = copy.deepcopy(self.metric)
case_metric.set_case_metric_type()
case_metrics = list()
case_params = list()
case_metrics.append(case_metric)
@ -197,6 +197,7 @@ class LocustSearchRunner(LocustRunner):
}
self.init_metric(self.name, collection_info, index_info, None, run_params)
case_metric = copy.deepcopy(self.metric)
case_metric.set_case_metric_type()
case_metrics = list()
case_params = list()
case_metrics.append(case_metric)
@ -324,6 +325,7 @@ class LocustRandomRunner(LocustRunner):
}
self.init_metric(self.name, collection_info, index_info, None, run_params)
case_metric = copy.deepcopy(self.metric)
case_metric.set_case_metric_type()
case_metrics = list()
case_params = list()
case_metrics.append(case_metric)
@ -356,40 +358,40 @@ class LocustRandomRunner(LocustRunner):
build_index = case_param["build_index"]
self.milvus.set_collection(collection_name)
# if self.milvus.exists_collection():
# logger.debug("Start drop collection")
# self.milvus.drop()
# time.sleep(runner_utils.DELETE_INTERVAL_TIME)
# self.milvus.create_collection(dimension, data_type=vector_type,
# other_fields=other_fields)
# # TODO: update fields in collection_info
# # fields = self.get_fields(self.milvus, collection_name)
# # collection_info = {
# # "dimension": dimension,
# # "metric_type": metric_type,
# # "dataset_name": collection_name,
# # "fields": fields
# # }
# if build_index is True:
# if case_param["index_type"]:
# self.milvus.create_index(index_field_name, case_param["index_type"], case_param["metric_type"], index_param=case_param["index_param"])
# logger.debug(self.milvus.describe_index(index_field_name))
# else:
# build_index = False
# logger.warning("Please specify the index_type")
# self.insert(self.milvus, collection_name, case_param["data_type"], dimension, case_param["collection_size"], case_param["ni_per"])
# build_time = 0.0
# start_time = time.time()
# self.milvus.flush()
# flush_time = round(time.time()-start_time, 2)
# logger.debug(self.milvus.count())
# if build_index is True:
# logger.debug("Start build index for last file")
# start_time = time.time()
# self.milvus.create_index(index_field_name, case_param["index_type"], case_param["metric_type"], index_param=case_param["index_param"])
# build_time = round(time.time()-start_time, 2)
# logger.debug({"flush_time": flush_time, "build_time": build_time})
# logger.info(self.milvus.count())
if self.milvus.exists_collection():
logger.debug("Start drop collection")
self.milvus.drop()
time.sleep(runner_utils.DELETE_INTERVAL_TIME)
self.milvus.create_collection(dimension, data_type=vector_type,
other_fields=other_fields)
# TODO: update fields in collection_info
# fields = self.get_fields(self.milvus, collection_name)
# collection_info = {
# "dimension": dimension,
# "metric_type": metric_type,
# "dataset_name": collection_name,
# "fields": fields
# }
if build_index is True:
if case_param["index_type"]:
self.milvus.create_index(index_field_name, case_param["index_type"], case_param["metric_type"], index_param=case_param["index_param"])
logger.debug(self.milvus.describe_index(index_field_name))
else:
build_index = False
logger.warning("Please specify the index_type")
self.insert(self.milvus, collection_name, case_param["data_type"], dimension, case_param["collection_size"], case_param["ni_per"])
build_time = 0.0
start_time = time.time()
self.milvus.flush()
flush_time = round(time.time()-start_time, 2)
logger.debug(self.milvus.count())
if build_index is True:
logger.debug("Start build index for last file")
start_time = time.time()
self.milvus.create_index(index_field_name, case_param["index_type"], case_param["metric_type"], index_param=case_param["index_param"])
build_time = round(time.time()-start_time, 2)
logger.debug({"flush_time": flush_time, "build_time": build_time})
logger.info(self.milvus.count())
logger.info("Start load collection")
load_start_time = time.time()
self.milvus.load_collection()

View File

@ -2,23 +2,21 @@ import pdb
import random
import time
import logging
import json
import math
from locust import TaskSet, task
from . import utils
dim = 128
logger = logging.getLogger("milvus_benchmark.runners.locust_tasks")
class Tasks(TaskSet):
@task(100)
@task
def query(self):
op = "query"
X = utils.generate_vectors(self.params[op]["nq"], self.op_info["dimension"])
# X = utils.generate_vectors(self.params[op]["nq"], self.op_info["dimension"])
vector_query = {"vector": {self.op_info["vector_field_name"]: {
"topk": self.params[op]["top_k"],
"query": X,
"query": self.values["X"][:self.params[op]["nq"]],
"metric_type": self.params[op]["metric_type"] if "metric_type" in self.params[op] else utils.DEFAULT_METRIC_TYPE,
"params": self.params[op]["search_param"]}
}}
@ -29,12 +27,11 @@ class Tasks(TaskSet):
filter_query.append(eval(filter["range"]))
if isinstance(filter, dict) and "term" in filter:
filter_query.append(eval(filter["term"]))
logger.debug(filter_query)
self.client.query(vector_query, filter_query=filter_query, log=False)
# logger.debug(filter_query)
self.client.query(vector_query, filter_query=filter_query, log=False, timeout=120)
@task
def flush(self):
logger.debug("Flush")
self.client.flush(log=False)
@task
@ -57,11 +54,20 @@ class Tasks(TaskSet):
@task
def insert(self):
op = "insert"
ids = [random.randint(1, 10000000) for _ in range(self.params[op]["ni_per"])]
X = [[random.random() for _ in range(dim)] for _ in range(self.params[op]["ni_per"])]
entities = utils.generate_entities(self.op_info["collection_info"], X, ids)
# ids = [random.randint(1000000, 10000000) for _ in range(self.params[op]["ni_per"])]
# X = [[random.random() for _ in range(self.op_info["dimension"])] for _ in range(self.params[op]["ni_per"])]
entities = utils.generate_entities(self.op_info["collection_info"], self.values["X"][:self.params[op]["ni_per"]], self.values["ids"][:self.params[op]["ni_per"]])
self.client.insert(entities, log=False)
@task
def insert_flush(self):
op = "insert_flush"
# ids = [random.randint(1000000, 10000000) for _ in range(self.params[op]["ni_per"])]
# X = [[random.random() for _ in range(self.op_info["dimension"])] for _ in range(self.params[op]["ni_per"])]
entities = utils.generate_entities(self.op_info["collection_info"], self.values["X"][:self.params[op]["ni_per"]], self.values["ids"][:self.params[op]["ni_per"]])
self.client.insert(entities, log=False)
self.client.flush(log=False)
@task
def insert_rand(self):
self.client.insert_rand(log=False)
@ -69,5 +75,5 @@ class Tasks(TaskSet):
@task
def get(self):
op = "get"
ids = [random.randint(1, 10000000) for _ in range(self.params[op]["ids_length"])]
self.client.get(ids)
# ids = [random.randint(1, 10000000) for _ in range(self.params[op]["ids_length"])]
self.client.get(self.values["get_ids"][:self.params[op]["ids_length"]])

View File

@ -4,22 +4,54 @@ import pdb
import gevent
# import gevent.monkey
# gevent.monkey.patch_all()
from locust import Locust, User, TaskSet, task, between, events, stats
from locust import User, between, events, stats
from locust.env import Environment
import locust.stats
import math
from locust import LoadTestShape
from locust.stats import stats_printer, print_stats
from locust.log import setup_logging, greenlet_exception_logger
from milvus_benchmark.client import MilvusClient
from .locust_task import MilvusTask
from .locust_tasks import Tasks
from . import utils
locust.stats.CONSOLE_STATS_INTERVAL_SEC = 30
locust.stats.CONSOLE_STATS_INTERVAL_SEC = 20
logger = logging.getLogger("milvus_benchmark.runners.locust_user")
nq = 10000
nb = 100000
class StepLoadShape(LoadTestShape):
"""
A step load shape
Keyword arguments:
step_time -- Time between steps
step_load -- User increase amount at each step
spawn_rate -- Users to stop/start per second at every step
time_limit -- Time limit in seconds
"""
def init(self, step_time, step_load, spawn_rate, time_limit):
self.step_time = step_time
self.step_load = step_load
self.spawn_rate = spawn_rate
self.time_limit = time_limit
def tick(self):
run_time = self.get_run_time()
if run_time > self.time_limit:
return None
current_step = math.floor(run_time / self.step_time) + 1
return (current_step * self.step_load, self.spawn_rate)
class MyUser(User):
# task_set = None
wait_time = between(0.001, 0.002)
# wait_time = between(0.001, 0.002)
pass
def locust_executor(host, port, collection_name, connection_type="single", run_params=None):
@ -33,14 +65,24 @@ def locust_executor(host, port, collection_name, connection_type="single", run_p
MyUser.tasks.update(task)
MyUser.params[op] = value["params"] if "params" in value else None
logger.info(MyUser.tasks)
MyUser.values = {
"ids": [random.randint(1000000, 10000000) for _ in range(nb)],
"get_ids": [random.randint(1, 10000000) for _ in range(nb)],
"X": utils.generate_vectors(nq, MyUser.op_info["dimension"])
}
MyUser.tasks = {Tasks.load: 1, Tasks.flush: 1}
# MyUser.tasks = {Tasks.query: 1, Tasks.flush: 1}
MyUser.client = MilvusTask(host=host, port=port, collection_name=collection_name, connection_type=connection_type,
m=m)
# MyUser.info = m.get_info(collection_name)
env = Environment(events=events, user_classes=[MyUser])
runner = env.create_local_runner()
if "load_shape" in run_params and run_params["load_shape"]:
test = StepLoadShape()
test.init(run_params["step_time"], run_params["step_load"], run_params["spawn_rate"], run_params["during_time"])
env = Environment(events=events, user_classes=[MyUser], shape_class=test)
runner = env.create_local_runner()
env.runner.start_shape()
else:
env = Environment(events=events, user_classes=[MyUser])
runner = env.create_local_runner()
# setup logging
# setup_logging("WARNING", "/dev/null")
# greenlet_exception_logger(logger=logger)
@ -48,7 +90,9 @@ def locust_executor(host, port, collection_name, connection_type="single", run_p
# env.create_web_ui("127.0.0.1", 8089)
# gevent.spawn(stats_printer(env.stats), env, "test", full_history=True)
# events.init.fire(environment=env, runner=runner)
clients_num = run_params["clients_num"]
clients_num = run_params["clients_num"] if "clients_num" in run_params else 0
step_load = run_params["step_load"] if "step_load" in run_params else 0
step_time = run_params["step_time"] if "step_time" in run_params else 0
spawn_rate = run_params["spawn_rate"]
during_time = run_params["during_time"]
runner.start(clients_num, spawn_rate=spawn_rate)

View File

@ -69,6 +69,7 @@ class SearchRunner(BaseRunner):
"params": search_param}
# TODO: only update search_info
case_metric = copy.deepcopy(self.metric)
case_metric.set_case_metric_type()
case_metric.search = {
"nq": nq,
"topk": top_k,
@ -161,18 +162,20 @@ class InsertSearchRunner(BaseRunner):
cases = list()
case_metrics = list()
self.init_metric(self.name, collection_info, index_info, None)
for search_param in search_params:
if not filters:
filters.append(None)
for filter in filters:
filter_param = []
# filter_param = []
filter_query = []
if isinstance(filter, dict) and "range" in filter:
filter_query.append(eval(filter["range"]))
filter_param.append(filter["range"])
# filter_param.append(filter["range"])
if isinstance(filter, dict) and "term" in filter:
filter_query.append(eval(filter["term"]))
filter_param.append(filter["term"])
logger.info("filter param: %s" % json.dumps(filter_param))
# filter_param.append(filter["term"])
# logger.info("filter param: %s" % json.dumps(filter_param))
for nq in nqs:
query_vectors = base_query_vectors[0:nq]
for top_k in top_ks:
@ -183,11 +186,12 @@ class InsertSearchRunner(BaseRunner):
"params": search_param}
# TODO: only update search_info
case_metric = copy.deepcopy(self.metric)
case_metric.set_case_metric_type()
case_metric.search = {
"nq": nq,
"topk": top_k,
"search_param": search_param,
"filter": filter_param
"filter": filter_query
}
vector_query = {"vector": {index_field_name: search_info}}
case = {

View File

@ -2,18 +2,7 @@ insert_performance:
collections:
-
milvus:
db_config.primary_path: /test/milvus/db_data_011/cluster/sift_1m_128_l2
cache_config.cpu_cache_capacity: 4GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: true
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
- gpu1
gpu_resource_config.build_index_resources:
- gpu0
- gpu1
db_config.primary_path: /test/milvus/db_data_2/cluster/sift_1m_128_l2
wal_enable: true
collection_name: sift_1m_128_l2
# other_fields: int,float

View File

@ -3,27 +3,19 @@ locust_insert_performance:
-
milvus:
db_config.primary_path: /test/milvus/db_data_011/insert_sift_1m_128_l2_2
cache_config.cpu_cache_capacity: 8GB
cache_config.insert_buffer_size: 2GB
engine_config.use_blas_threshold: 1100
engine_config.gpu_search_threshold: 1
gpu_resource_config.enable: false
gpu_resource_config.cache_capacity: 4GB
gpu_resource_config.search_resources:
- gpu0
gpu_resource_config.build_index_resources:
- gpu0
wal_enable: true
collection_name: sift_1m_128_l2
collection_name: local_1m_128_l2
ni_per: 50000
build_index: false
index_type: ivf_sq8
index_param:
nlist: 1024
task:
load_shape: false
step_time: 100
step_load: 50
spawn_rate: 2
connection_num: 1
clients_num: 100
hatch_rate: 2
during_time: 600
types:
-

View File

@ -0,0 +1,25 @@
locust_insert_performance:
collections:
-
milvus:
db_config.primary_path: /test/milvus/db_data_011/insert_sift_1m_128_l2_2
collection_name: local_1m_128_l2
ni_per: 50000
build_index: false
index_type: ivf_sq8
index_param:
nlist: 1024
task:
load_shape: false
step_time: 100
step_load: 50
spawn_rate: 2
connection_num: 1
clients_num: 100
during_time: 600
types:
-
type: insert_flush
weight: 1
params:
ni_per: 1

View File

@ -0,0 +1,25 @@
locust_insert_performance:
collections:
-
milvus:
db_config.primary_path: /test/milvus/db_data_011/insert_sift_1m_128_l2_2
collection_name: local_1m_128_l2
ni_per: 50000
build_index: false
index_type: ivf_sq8
index_param:
nlist: 1024
task:
load_shape: true
step_time: 100
step_load: 50
spawn_rate: 50
connection_num: 1
clients_num: 100
during_time: 600
types:
-
type: insert
weight: 1
params:
ni_per: 1

View File

@ -0,0 +1,25 @@
locust_insert_performance:
collections:
-
milvus:
db_config.primary_path: /test/milvus/db_data_011/insert_sift_1m_128_l2_2
collection_name: local_1m_128_l2
ni_per: 50000
build_index: false
index_type: ivf_sq8
index_param:
nlist: 1024
task:
load_shape: true
step_time: 100
step_load: 50
spawn_rate: 50
connection_num: 1
clients_num: 100
during_time: 600
types:
-
type: insert_flush
weight: 1
params:
ni_per: 1

View File

@ -29,6 +29,19 @@ locust_random_performance:
# GT: 1000000
search_param:
nprobe: 16
-
type: insert
weight: 20
params:
ni_per: 1
-
type: load
weight: 1
-
type: get
weight: 2
params:
ids_length: 10
connection_num: 1
clients_num: 20
hatch_rate: 2

View File

@ -1,16 +1,11 @@
import os
import sys
import time
import re
import logging
import traceback
import argparse
from yaml import full_load, dump
DEFUALT_DEPLOY_MODE = "single"
IDC_NAS_URL = "//172.16.70.249/test"
MINIO_HOST = "minio-test.qa.svc.cluster.local"
import config
import utils
def parse_server_tag(server_tag):
@ -48,21 +43,18 @@ def update_values(src_values_file, deploy_params_file):
except Exception as e:
logging.error(str(e))
raise Exception("File not found")
deploy_mode = deploy_params["deploy_mode"] if "deploy_mode" in deploy_params else DEFUALT_DEPLOY_MODE
deploy_mode = utils.get_deploy_mode(deploy_params)
print(deploy_mode)
cluster = False
values_dict["service"]["type"] = "ClusterIP"
if deploy_mode != DEFUALT_DEPLOY_MODE:
if deploy_mode != config.DEFUALT_DEPLOY_MODE:
cluster = True
values_dict["cluster"]["enabled"] = True
if "server" in deploy_params:
server = deploy_params["server"]
server_name = server["server_name"] if "server_name" in server else ""
server_tag = server["server_tag"] if "server_tag" in server else ""
else:
raise Exception("No server specified in {}".format(deploy_params_file))
server_tag = utils.get_server_tag(deploy_params)
print(server_tag)
# TODO: update milvus config
# # update values.yaml with the given host
node_config = None
# node_config = None
perf_tolerations = [{
"key": "node-role.kubernetes.io/benchmark",
"operator": "Exists",
@ -92,6 +84,20 @@ def update_values(src_values_file, deploy_params_file):
# "cpu": str(int(cpus) - 1) + ".0"
}
}
# use external minio/s3
# TODO: disable temp
# values_dict['minio']['enabled'] = False
values_dict['minio']['enabled'] = True
# values_dict["externalS3"]["enabled"] = True
values_dict["externalS3"]["enabled"] = False
values_dict["externalS3"]["host"] = config.MINIO_HOST
values_dict["externalS3"]["port"] = config.MINIO_PORT
values_dict["externalS3"]["accessKey"] = config.MINIO_ACCESS_KEY
values_dict["externalS3"]["secretKey"] = config.MINIO_SECRET_KEY
values_dict["externalS3"]["bucketName"] = config.MINIO_BUCKET_NAME
logging.debug(values_dict["externalS3"])
if cluster is False:
# TODO: support pod affinity for standalone mode
if cpus:
@ -109,13 +115,6 @@ def update_values(src_values_file, deploy_params_file):
values_dict['standalone']['tolerations'] = perf_tolerations
# values_dict['minio']['tolerations'] = perf_tolerations
values_dict['etcd']['tolerations'] = perf_tolerations
values_dict['minio']['enabled'] = False
# use external minio/s3
values_dict["externalS3"]["enabled"] = True
values_dict["externalS3"]["host"] = MINIO_HOST
values_dict["externalS3"]["accessKey"] = "minioadmin"
values_dict["externalS3"]["secretKey"] = "minioadmin"
else:
# TODO: mem limits on distributed mode
# values_dict['pulsar']["broker"]["configData"].update({"maxMessageSize": "52428800", "PULSAR_MEM": BOOKKEEPER_PULSAR_MEM})
@ -126,9 +125,9 @@ def update_values(src_values_file, deploy_params_file):
# values_dict['etcd']['nodeSelector'] = node_config
# # set limit/request cpus in resources
# values_dict['proxy']['resources'] = resources
values_dict['querynode']['resources'] = resources
values_dict['indexnode']['resources'] = resources
values_dict['datanode']['resources'] = resources
values_dict['queryNode']['resources'] = resources
values_dict['indexNode']['resources'] = resources
values_dict['dataNode']['resources'] = resources
# values_dict['minio']['resources'] = resources
# values_dict['pulsarStandalone']['resources'] = resources
if mems:
@ -143,9 +142,9 @@ def update_values(src_values_file, deploy_params_file):
logging.debug("Add tolerations into cluster server")
values_dict['proxy']['tolerations'] = perf_tolerations
values_dict['querynode']['tolerations'] = perf_tolerations
values_dict['indexnode']['tolerations'] = perf_tolerations
values_dict['datanode']['tolerations'] = perf_tolerations
values_dict['queryNode']['tolerations'] = perf_tolerations
values_dict['indexNode']['tolerations'] = perf_tolerations
values_dict['dataNode']['tolerations'] = perf_tolerations
values_dict['etcd']['tolerations'] = perf_tolerations
# values_dict['minio']['tolerations'] = perf_tolerations
values_dict['pulsarStandalone']['tolerations'] = perf_tolerations
@ -155,12 +154,19 @@ def update_values(src_values_file, deploy_params_file):
# values_dict['pulsar']['broker']['tolerations'] = perf_tolerations
# values_dict['pulsar']['bookkeeper']['tolerations'] = perf_tolerations
# values_dict['pulsar']['zookeeper']['tolerations'] = perf_tolerations
values_dict['minio']['enabled'] = False
# use external minio/s3
values_dict["externalS3"]["enabled"] = True
values_dict["externalS3"]["host"] = MINIO_HOST
values_dict["externalS3"]["accessKey"] = "minioadmin"
values_dict["externalS3"]["secretKey"] = "minioadmin"
milvus_params = deploy_params["milvus"]
if "datanode" in milvus_params:
if "replicas" in milvus_params["datanode"]:
values_dict['dataNode']["replicas"] = milvus_params["datanode"]["replicas"]
if "querynode"in milvus_params:
if "replicas" in milvus_params["querynode"]:
values_dict['queryNode']["replicas"] = milvus_params["querynode"]["replicas"]
if "indexnode"in milvus_params:
if "replicas" in milvus_params["indexnode"]:
values_dict['indexNode']["replicas"] = milvus_params["indexnode"]["replicas"]
if "proxy"in milvus_params:
if "replicas" in milvus_params["proxy"]:
values_dict['proxy']["replicas"] = milvus_params["proxy"]["replicas"]
# add extra volumes
values_dict['extraVolumes'] = [{
'name': 'test',
@ -171,7 +177,7 @@ def update_values(src_values_file, deploy_params_file):
'name': "cifs-test-secret"
},
'options': {
'networkPath': IDC_NAS_URL,
'networkPath': config.IDC_NAS_URL,
'mountOptions': "vers=1.0"
}
}
@ -186,7 +192,6 @@ def update_values(src_values_file, deploy_params_file):
f.close()
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)

View File

@ -1,25 +1,13 @@
# -*- coding: utf-8 -*-
import os
import sys
import pdb
import time
import json
import datetime
import argparse
import threading
import logging
import string
import random
# import multiprocessing
import numpy as np
# import psutil
import sklearn.preprocessing
# import docker
from yaml import full_load, dump
import yaml
import tableprint as tp
from pprint import pprint
from pymilvus import DataType
import config
logger = logging.getLogger("milvus_benchmark.utils")
@ -122,63 +110,25 @@ def print_table(headers, columns, data):
tp.table(bodys, headers)
def modify_config(k, v, type=None, file_path="conf/server_config.yaml", db_slave=None):
if not os.path.isfile(file_path):
raise Exception('File: %s not found' % file_path)
with open(file_path) as f:
config_dict = full_load(f)
f.close()
if config_dict:
if k.find("use_blas_threshold") != -1:
config_dict['engine_config']['use_blas_threshold'] = int(v)
elif k.find("use_gpu_threshold") != -1:
config_dict['engine_config']['gpu_search_threshold'] = int(v)
elif k.find("cpu_cache_capacity") != -1:
config_dict['cache_config']['cpu_cache_capacity'] = int(v)
elif k.find("enable_gpu") != -1:
config_dict['gpu_resource_config']['enable'] = v
elif k.find("gpu_cache_capacity") != -1:
config_dict['gpu_resource_config']['cache_capacity'] = int(v)
elif k.find("index_build_device") != -1:
config_dict['gpu_resource_config']['build_index_resources'] = v
elif k.find("search_resources") != -1:
config_dict['resource_config']['resources'] = v
# if db_slave:
# config_dict['db_config']['db_slave_path'] = MULTI_DB_SLAVE_PATH
with open(file_path, 'w') as f:
dump(config_dict, f, default_flow_style=False)
f.close()
else:
raise Exception('Load file:%s error' % file_path)
def get_deploy_mode(deploy_params):
deploy_mode = None
if deploy_params:
milvus_params = None
if "milvus" in deploy_params:
milvus_params = deploy_params["milvus"]
if not milvus_params:
deploy_mode = config.DEFUALT_DEPLOY_MODE
elif "deploy_mode" in milvus_params:
deploy_mode = milvus_params["deploy_mode"]
if deploy_mode not in [config.SINGLE_DEPLOY_MODE, config.CLUSTER_DEPLOY_MODE]:
raise Exception("Invalid deploy mode: %s" % deploy_mode)
return deploy_mode
# update server_config.yaml
def update_server_config(file_path, server_config):
if not os.path.isfile(file_path):
raise Exception('File: %s not found' % file_path)
with open(file_path) as f:
values_dict = full_load(f)
f.close()
for k, v in server_config.items():
if k.find("primary_path") != -1:
values_dict["db_config"]["primary_path"] = v
elif k.find("use_blas_threshold") != -1:
values_dict['engine_config']['use_blas_threshold'] = int(v)
elif k.find("gpu_search_threshold") != -1:
values_dict['engine_config']['gpu_search_threshold'] = int(v)
elif k.find("cpu_cache_capacity") != -1:
values_dict['cache_config']['cpu_cache_capacity'] = int(v)
elif k.find("cache_insert_data") != -1:
values_dict['cache_config']['cache_insert_data'] = v
elif k.find("enable") != -1:
values_dict['gpu_resource_config']['enable'] = v
elif k.find("gpu_cache_capacity") != -1:
values_dict['gpu_resource_config']['cache_capacity'] = int(v)
elif k.find("build_index_resources") != -1:
values_dict['gpu_resource_config']['build_index_resources'] = v
elif k.find("search_resources") != -1:
values_dict['gpu_resource_config']['search_resources'] = v
with open(file_path, 'w') as f:
dump(values_dict, f, default_flow_style=False)
f.close()
def get_server_tag(deploy_params):
server_tag = ""
if deploy_params and "server" in deploy_params:
server = deploy_params["server"]
# server_name = server["server_name"] if "server_name" in server else ""
server_tag = server["server_tag"] if "server_tag" in server else ""
return server_tag

View File

@ -1,7 +1,7 @@
# pymilvus==0.2.14
# pymilvus-distributed>=0.0.61
--extra-index-url https://test.pypi.org/simple/
pymilvus==2.0.0rc2.dev4
pymilvus==2.0.0rc2.dev12
scipy==1.3.1
scikit-learn==0.19.1