Update tests chaos and remove useless code (#5821)

* [skip ci] Add flush checker in monitor thread

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>

* [skip ci] update chaos parser

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>

* [skip ci] Update chaos tests

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>

* [skip ci] Move chaos to python_client folder

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>

* [skip ci] Remove useless code and comments

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>

* [skip ci] Use delayed assert for always assert everything

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>
pull/5824/head^2
yanliang567 2021-06-17 11:49:57 +08:00 committed by GitHub
parent 46d5b571a5
commit bbb3f90051
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 557 additions and 365 deletions

View File

@ -1,11 +0,0 @@
chaos:
kind: PodChaos
spec:
action: pod-kill
selector:
namespaces:
- milvus
labelSelectors:
"app.kubernetes.io/name": data node
scheduler:
cron: "@every 20s"

View File

@ -1,11 +0,0 @@
chaos:
kind: PodChaos
spec:
action: pod-kill
selector:
namespaces:
- milvus
labelSelectors:
"app.kubernetes.io/name": query node
scheduler:
cron: "@every 20s"

View File

@ -1,55 +0,0 @@
import pytest
def pytest_addoption(parser):
parser.addoption("--ip", action="store", default="localhost", help="service's ip")
parser.addoption("--host", action="store", default="localhost", help="service's ip")
parser.addoption("--service", action="store", default="", help="service address")
parser.addoption("--port", action="store", default=19530, help="service's port")
parser.addoption("--http_port", action="store", default=19121, help="http's port")
parser.addoption("--handler", action="store", default="GRPC", help="handler of request")
parser.addoption("--tag", action="store", default="all", help="only run tests matching the tag.")
parser.addoption('--dry_run', action='store_true', default=False, help="")
@pytest.fixture
def ip(request):
return request.config.getoption("--ip")
@pytest.fixture
def host(request):
return request.config.getoption("--host")
@pytest.fixture
def service(request):
return request.config.getoption("--service")
@pytest.fixture
def port(request):
return request.config.getoption("--port")
@pytest.fixture
def http_port(request):
return request.config.getoption("--http_port")
@pytest.fixture
def handler(request):
return request.config.getoption("--handler")
@pytest.fixture
def tag(request):
return request.config.getoption("--tag")
@pytest.fixture
def dry_run(request):
return request.config.getoption("--dry_run")

View File

@ -1,5 +0,0 @@
[pytest]
addopts = --host 192.168.1.239 --html=/Users/yanliang/Document/report.html
-;addopts = --host 172.28.255.155 --html=/tmp/report.html
# python3 -W ignore -m pytest

View File

@ -1,103 +0,0 @@
import logging
import pytest
import sys
import threading
from time import sleep
from base.client_base import TestcaseBase
from pymilvus_orm import connections
from checker import CreateChecker, SearchChecker, InsertChecker
from base.client_base import ApiCollectionWrapper
from common import common_func as cf
from common import common_type as ct
from utils.util_log import test_log as log
class TestsChaos:
@pytest.fixture(scope="function", autouse=True)
def coll_wrapper_4_insert(self):
connections.configure(default={"host": "192.168.1.239", "port": 19530})
res = connections.create_connection(alias='default')
if res is None:
raise Exception("no connections")
c_wrapper = ApiCollectionWrapper()
c_wrapper.init_collection(name=cf.gen_unique_str(),
schema=cf.gen_default_collection_schema(),
check_task="check_nothing")
return c_wrapper
@pytest.fixture(scope="function", autouse=True)
def coll_wrapper_4_search(self):
connections.configure(default={"host": "192.168.1.239", "port": 19530})
res = connections.create_connection(alias='default')
if res is None:
raise Exception("no connections")
c_wrapper = ApiCollectionWrapper()
_, result = c_wrapper.init_collection(name=cf.gen_unique_str(),
schema=cf.gen_default_collection_schema(),
check_task="check_nothing")
if result is False:
log.log("result: ")
# for _ in range(10):
# c_wrapper.insert(data=cf.gen_default_list_data(nb=ct.default_nb*10),
# check_res="check_nothing")
return c_wrapper
@pytest.fixture(scope="function", autouse=True)
def health_checkers(self, coll_wrapper_4_insert, coll_wrapper_4_search):
checkers = {}
# search_ch = SearchChecker(collection_wrapper=coll_wrapper_4_search)
# checkers["search"] = search_ch
# insert_ch = InsertChecker(collection_wrapper=coll_wrapper_4_insert)
# checkers["insert"] = insert_ch
create_ch = CreateChecker(collection_wrapper=coll_wrapper_4_insert)
checkers["create"] = create_ch
return checkers
'''
def teardown(self, health_checkers):
for ch in health_checkers.values():
ch.terminate()
pass
'''
def test_chaos(self, health_checkers):
# query_t = threading.Thread(target=health_checkers['create'].keep_searching, args=())
# query_t.start()
# insert_t = threading.Thread(target=health_checkers['create'].keep_inserting, args=())
# insert_t.start()
create_t = threading.Thread(target=health_checkers['create'].keep_creating, args=())
create_t.start()
# parse chaos object
# find the testcase by chaos ops in testcases
# parse the test expectations
# wait 120s
print("test_chaos starting...")
sleep(2)
print(f"succ count1: {health_checkers['create']._succ}")
print(f"succ rate1: {health_checkers['create'].statics()}")
# assert statistic:all ops 100% succ
# reset counting
# apply chaos object
# wait 300s (varies by chaos)
health_checkers["create"].reset()
print(f"succ count2: {health_checkers['create']._succ}")
print(f"succ rate2: {health_checkers['create'].statics()}")
sleep(2)
print(f"succ count3: {health_checkers['create']._succ}")
print(f"succ rate3: {health_checkers['create'].statics()}")
# assert statistic: the target ops succ <50% and the other keep 100% succ
# delete chaos
# wait 300s (varies by feature)
# assert statistic: the target ops succ >90% and the other keep 100% succ
# terminate thread
for ch in health_checkers.values():
ch.terminate()
pass

View File

@ -19,74 +19,40 @@ class ApiCollectionWrapper:
return res, check_result
@property
def schema(self, check_task=None, check_items=None):
def schema(self):
return self.collection.schema
# func_name = sys._getframe().f_code.co_name
# res, check = func_req([self.collection.schema])
# check_result = CheckFunc(res, func_name, check_task, check_items, check).run()
# return res, check_result
@property
def description(self, check_task=None, check_items=None):
def description(self):
return self.collection.description
# func_name = sys._getframe().f_code.co_name
# res, check = func_req([self.collection.description])
# check_result = CheckFunc(res, func_name, check_task, check_items, check).run()
# return res, check_result
@property
def name(self, check_task=None, check_items=None):
def name(self):
return self.collection.name
# func_name = sys._getframe().f_code.co_name
# res, check = func_req([self.collection.name])
# check_result = CheckFunc(res, func_name, check_task, check_items, check).run()
# return res, check_result
@property
def is_empty(self, check_task=None, check_items=None):
def is_empty(self):
return self.collection.is_empty
# func_name = sys._getframe().f_code.co_name
# res, check = func_req([self.collection.is_empty])
# check_result = CheckFunc(res, func_name, check_task, check_items, check).run()
# return res, check_result
@property
def num_entities(self, check_task=None, check_items=None):
def num_entities(self):
return self.collection.num_entities
# func_name = sys._getframe().f_code.co_name
# res, check = func_req([self.collection.num_entities])
# check_result = CheckFunc(res, func_name, check_task, check_items, check).run()
# return res, check_result
@property
def primary_field(self, check_task=None, check_items=None):
def primary_field(self):
return self.collection.primary_field
# func_name = sys._getframe().f_code.co_name
# res, check = func_req([self.collection.primary_field])
# check_result = CheckFunc(res, func_name, check_task, check_items, check).run()
# return res, check_result
def drop(self, check_task=None, check_items=None, **kwargs):
log.info("Dropping collection")
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.collection.drop], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
if check_result:
log.info("Dropped collection")
else:
log.info("Dropping collection failed")
return res, check_result
def load(self, partition_names=None, check_task=None, check_items=None, **kwargs):
log.info("loading data")
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.collection.load, partition_names], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check,
partition_names=partition_names, **kwargs).run()
if check_result:
log.info("loaded data")
else:
log.info("loading failed")
return res, check_result
def release(self, check_task=None, check_items=None, **kwargs):
@ -118,12 +84,8 @@ class ApiCollectionWrapper:
return res, check_result
@property
def partitions(self, check_task=None, check_items=None):
def partitions(self):
return self.collection.partitions
# func_name = sys._getframe().f_code.co_name
# res, check = func_req([self.collection.partitions])
# check_result = CheckFunc(res, func_name, check_task, check_items, check).run()
# return res, check_result
def partition(self, partition_name, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
@ -153,12 +115,8 @@ class ApiCollectionWrapper:
return res, check_result
@property
def indexes(self, check_task=None, check_items=None):
def indexes(self):
return self.collection.indexes
# func_name = sys._getframe().f_code.co_name
# res, check = func_req([self.collection.indexes])
# check_result = CheckFunc(res, func_name, check_task, check_items, check).run()
# return res, check_result
def index(self, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name

View File

@ -20,41 +20,26 @@ class ApiPartitionWrapper:
return response, check_result
@property
def description(self, check_task=None, check_items=None):
def description(self):
return self.partition.description if self.partition else None
# func_name = sys._getframe().f_code.co_name
# res, check = func_req([self.partition.description])
# check_result = CheckFunc(res, func_name, check_task, check_items, check).run()
# return res, check_result
@property
def name(self, check_task=None, check_items=None):
def name(self):
return self.partition.name if self.partition else None
# func_name = sys._getframe().f_code.co_name
# res, check = func_req([self.partition.name])
# check_result = CheckFunc(res, func_name, check_task, check_items, check).run()
# return res, check_result
@property
def is_empty(self, check_task=None, check_items=None):
def is_empty(self):
return self.partition.is_empty if self.partition else None
# func_name = sys._getframe().f_code.co_name
# res, check = func_req([self.partition.is_empty])
# check_result = CheckFunc(res, func_name, check_task, check_items, check).run()
# return res, check_result
@property
def num_entities(self, check_task=None, check_items=None):
def num_entities(self):
return self.partition.num_entities if self.partition else None
# func_name = sys._getframe().f_code.co_name
# res, check = func_req([self.partition.num_entities])
# check_result = CheckFunc(res, func_name, check_task, check_items, check).run()
# return res, check_result
def drop(self, check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, succ = api_request([self.partition.drop], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, succ, **kwargs).run()
check_result = ResponseChecker(res, func_name,
check_task, check_items, succ, **kwargs).run()
return res, check_result
def load(self, check_task=None, check_items=None, **kwargs):

View File

@ -12,49 +12,63 @@ class ApiUtilityWrapper:
ut = utility
def loading_progress(self, collection_name, partition_names=[], using="default", check_task=None, check_items=None):
def loading_progress(self, collection_name, partition_names=[],
using="default", check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.loading_progress, collection_name, partition_names, using])
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ, collection_name=collection_name,
check_result = ResponseChecker(res, func_name, check_task,
check_items, is_succ, collection_name=collection_name,
partition_names=partition_names, using=using).run()
return res, check_result
def wait_for_loading_complete(self, collection_name, partition_names=[], timeout=None, using="default", check_task=None, check_items=None):
def wait_for_loading_complete(self, collection_name, partition_names=[], timeout=None, using="default",
check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.wait_for_loading_complete, collection_name, partition_names, timeout, using])
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ, collection_name=collection_name,
partition_names=partition_names, timeout=timeout, using=using).run()
res, is_succ = api_request([self.ut.wait_for_loading_complete, collection_name,
partition_names, timeout, using])
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,
collection_name=collection_name, partition_names=partition_names,
timeout=timeout, using=using).run()
return res, check_result
def index_building_progress(self, collection_name, index_name="", using="default", check_task=None, check_items=None):
def index_building_progress(self, collection_name, index_name="", using="default",
check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.index_building_progress, collection_name, index_name, using])
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ, collection_name=collection_name, index_name=index_name,
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,
collection_name=collection_name, index_name=index_name,
using=using).run()
return res, check_result
def wait_for_index_building_complete(self, collection_name, index_name="", timeout=None, using="default", check_task=None, check_items=None):
def wait_for_index_building_complete(self, collection_name, index_name="", timeout=None, using="default",
check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.wait_for_loading_complete, collection_name, index_name, timeout, using])
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ, collection_name=collection_name, index_name=index_name,
res, is_succ = api_request([self.ut.wait_for_loading_complete, collection_name,
index_name, timeout, using])
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,
collection_name=collection_name, index_name=index_name,
timeout=timeout, using=using).run()
return res, check_result
def has_collection(self, collection_name, using="default", check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.has_collection, collection_name, using])
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ, collection_name=collection_name, using=using).run()
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,
collection_name=collection_name, using=using).run()
return res, check_result
def has_partition(self, collection_name, partition_name, using="default", check_task=None, check_items=None):
def has_partition(self, collection_name, partition_name, using="default",
check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.has_partition, collection_name, partition_name, using])
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ, collection_name=collection_name,
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,
collection_name=collection_name,
partition_name=partition_name, using=using).run()
return res, check_result
def list_collections(self, timeout=None, using="default", check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.list_collections, timeout, using])
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ, timeout=timeout, using=using).run()
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,
timeout=timeout, using=using).run()
return res, check_result

View File

@ -0,0 +1,77 @@
import os
import threading
import glob
from delayed_assert import expect
import constants
from yaml import full_load
def check_config(chaos_config):
if not chaos_config.get('kind', None):
raise Exception("kind is must be specified")
if not chaos_config.get('spec', None):
raise Exception("spec is must be specified")
if "action" not in chaos_config.get('spec', None):
raise Exception("action is must be specified in spec")
if "selector" not in chaos_config.get('spec', None):
raise Exception("selector is must be specified in spec")
return True
def reset_counting(checkers={}):
for ch in checkers.values():
ch.reset()
def gen_experiment_config(yaml):
with open(yaml) as f:
_config = full_load(f)
f.close()
return _config
def start_monitor_threads(checkers={}):
for k in checkers.keys():
v = checkers[k]
t = threading.Thread(target=v.keep_running, args=())
t.start()
def assert_statistic(checkers, expectations={}):
for k in checkers.keys():
# expect succ if no expectations
succ_rate = checkers[k].succ_rate()
if expectations.get(k, '') == constants.FAIL:
print(f"Expect Fail: {str(checkers[k])} current succ rate {succ_rate}")
expect(succ_rate < 0.49)
else:
print(f"Expect Succ: {str(checkers[k])} current succ rate {succ_rate}")
expect(succ_rate > 0.90)
def get_env_variable_by_name(name):
""" get env variable by name"""
try:
env_var = os.environ[name]
print(f"env_variable: {env_var}")
return str(env_var)
except Exception as e:
print(f"fail to get env variables, error: {str(e)}")
return None
def get_chaos_yamls():
chaos_env = get_env_variable_by_name(constants.CHAOS_CONFIG_ENV)
if chaos_env is not None:
if os.path.isdir(chaos_env):
print(f"chaos_env is a dir: {chaos_env}")
return glob.glob(chaos_env + 'chaos_*.yaml')
elif os.path.isfile(chaos_env):
print(f"chaos_env is a file: {chaos_env}")
return [chaos_env]
else:
# not a valid directory, return default
pass
print("not a valid directory or file, return default")
return glob.glob(constants.TESTS_CONFIG_LOCATION + 'chaos_*.yaml')

View File

@ -0,0 +1,16 @@
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
name: test-data-node-pod-kill
namespace:
spec:
action: pod-kill
mode: one
selector:
namespaces:
- default # target namespace of milvus deployment
labelSelectors:
app.kubernetes.io/name: milvus-ha
component: 'datanode'
scheduler:
cron: '@every 20s'

View File

@ -0,0 +1,16 @@
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
name: test-proxy-node-pod-kill
namespace:
spec:
action: pod-kill
mode: one
selector:
namespaces:
- default # target namespace of milvus deployment
labelSelectors:
app.kubernetes.io/name: milvus-ha
component: 'proxynode'
scheduler:
cron: '@every 20s'

View File

@ -0,0 +1,16 @@
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
name: test-query-node-pod-kill
namespace:
spec:
action: pod-kill
mode: one
selector:
namespaces:
- default # target namespace of milvus deployment
labelSelectors:
app.kubernetes.io/name: milvus-ha
component: 'querynode'
scheduler:
cron: '@every 20s'

View File

@ -0,0 +1,15 @@
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
name: test-standalone-pod-kill
namespace:
spec:
action: pod-kill
mode: one
selector:
namespaces:
- default # target namespace of milvus deployment
labelSelectors:
app.kubernetes.io/name: milvus-ha # pod of standalone milvus
scheduler:
cron: '@every 20s'

View File

@ -10,16 +10,25 @@
Collections:
-
testcase: test_querynode_podkill
testcase:
name: test_standalone_podkill
chaos: chaos_standalone_podkill.yaml
expectation:
cluster_1_node:
create: fail
insert: fail
flush: fail
index: fail
search: fail
query: fail
cluster_n_nodes:
search: degrade # keep functional, but performance degraded
query: degrade
-
testcase:
name: test_querynode_podkill
chaos: chaos_querynode_podkill.yaml
expectation:
standalone:
create: succ # succ by default if not specified
insert: succ
flush: succ
index: succ
search: fail
query: fail
cluster_1_node:
search: fail
query: fail
@ -27,50 +36,71 @@ Collections:
search: degrade # keep functional, but performance degraded
query: degrade
-
testcase: test_queryservice_podkill
testcase:
name: test_queryservice_podkill
chaos: chaos_queryservice_podkill.yaml
-
testcase: test_datanode_podkill
testcase:
name: test_datanode_podkill
chaos: chaos_datanode_podkill.yaml
expectation:
standalone:
insert: fail
cluster_1_node:
insert: fail
insert: succ
flush: fail
cluster_n_nodes:
insert: degrade
-
testcase: test_dataservice_podkill
testcase:
name: test_dataservice_podkill
chaos: chaos_dataservice_podkill.yaml
-
testcase: test_indexnode_podkill
testcase:
name: test_indexnode_podkill
chaos: chaos_indexnode_podkill.yaml
-
testcase: test_indexservice_podkill
testcase:
name: test_indexservice_podkill
chaos: chaos_indexservice_podkill.yaml
-
testcase: test_proxy_podkill
testcase:
name: test_proxy_podkill
chaos: chaos_proxy_podkill.yaml
expectation:
cluster_1_node:
create: fail
insert: fail
flush: fail
index: fail
search: fail
query: fail
cluster_n_nodes:
insert: degrade
-
testcase: test_master_podkill
testcase:
name: test_master_podkill
chaos: chaos_master_podkill.yaml
-
testcase: test_etcd_podkill
testcase:
name: test_etcd_podkill
chaos: chaos_etcd_podkill.yaml
-
testcase: test_minio_podkill
testcase:
name: test_minio_podkill
chaos: chaos_minio_podkill.yaml
-
testcase: test_querynode_cpu100p
testcase:
name: test_querynode_cpu100p
chaos: chaos_querynode_cpu100p.yaml
# and 10 more for the other pods
-
testcase: test_querynode_mem100p
testcase:
name: test_querynode_mem100p
chaos: chaos_querynode_mem100p.yaml
# and 10 more for the other pods
-
testcase: test_querynode_network_isolation
testcase:
name: test_querynode_network_isolation
chaos: chaos_querynode_network_isolation.yaml
# and 10 more for the other pods

View File

@ -0,0 +1,66 @@
from __future__ import print_function
import logging
from kubernetes import client, config
from kubernetes.client.rest import ApiException
import constants as cf
logger = logging.getLogger("milvus_benchmark.chaos.chaosOpt")
class ChaosOpt(object):
def __init__(self, kind, group=cf.DEFAULT_GROUP, version=cf.DEFAULT_VERSION, namespace=cf.NAMESPACE):
self.group = group
self.version = version
self.namespace = namespace
self.plural = kind.lower()
# def get_metadata_name(self):
# return self.metadata_name
def create_chaos_object(self, body):
# body = create_chaos_config(self.plural, self.metadata_name, spec_params)
# logger.info(body)
pretty = 'true'
config.load_kube_config()
api_instance = client.CustomObjectsApi()
try:
api_response = api_instance.create_namespaced_custom_object(self.group, self.version, self.namespace,
plural=self.plural, body=body, pretty=pretty)
print(api_response)
logging.getLogger().info(api_instance)
except ApiException as e:
logger.error("Exception when calling CustomObjectsApi->create_namespaced_custom_object: %s\n" % e)
raise Exception(str(e))
def delete_chaos_object(self, metadata_name):
print(metadata_name)
try:
config.load_kube_config()
api_instance = client.CustomObjectsApi()
data = api_instance.delete_namespaced_custom_object(self.group, self.version, self.namespace, self.plural,
metadata_name)
logger.info(data)
except ApiException as e:
logger.error("Exception when calling CustomObjectsApi->delete_namespaced_custom_object: %s\n" % e)
raise Exception(str(e))
def list_chaos_object(self):
try:
config.load_kube_config()
api_instance = client.CustomObjectsApi()
data = api_instance.list_namespaced_custom_object(self.group, self.version, self.namespace,
plural=self.plural)
# pprint(data)
except ApiException as e:
logger.error("Exception when calling CustomObjectsApi->list_namespaced_custom_object: %s\n" % e)
raise Exception(str(e))
return data
def delete_all_chaos_object(self):
chaos_objects = self.list_chaos_object()
if len(chaos_objects["items"]) > 0:
for item in chaos_objects["items"]:
metadata_name = item["metadata"]["name"]
self.delete_chaos_object(metadata_name)

View File

@ -1,11 +1,21 @@
import sys
import threading
from enum import Enum
from time import sleep
from base.collection_wrapper import ApiCollectionWrapper
from common import common_func as cf
from common import common_type as ct
import constants
nums = 0
class Op(Enum):
create = 'create'
insert = 'insert'
flush = 'flush'
index = 'index'
search = 'search'
query = 'query'
unknown = 'unknown'
class Checker:
@ -17,7 +27,7 @@ class Checker:
def total(self):
return self._succ + self._fail
def statics(self):
def succ_rate(self):
return self._succ / self.total() if self.total() != 0 else 0
def terminate(self):
@ -29,14 +39,14 @@ class Checker:
class SearchChecker(Checker):
def __init__(self, collection_wrapper):
def __init__(self, collection_wrap):
super().__init__()
self.c_wrapper = collection_wrapper
self.c_wrap = collection_wrap
def keep_running(self):
while self._running is True:
search_vec = cf.gen_vectors(5, ct.default_dim)
_, result = self.c_wrapper.search(
_, result = self.c_wrap.search(
data=search_vec,
params={"nprobe": 32},
limit=1,
@ -48,55 +58,43 @@ class SearchChecker(Checker):
self._fail += 1
class InsertAndFlushChecker(Checker):
def __init__(self, collection_wrapper):
class InsertFlushChecker(Checker):
def __init__(self, connection, collection_wrap, do_flush=False):
super().__init__()
self._flush_succ = 0
self._flush_fail = 0
self.c_wrapper = collection_wrapper
self.conn = connection
self.c_wrap = collection_wrap
self._do_flush = do_flush
def keep_running(self):
while self._running is True:
sleep(1)
_, insert_result = self.c_wrapper.insert(
data=cf.gen_default_list_data(nb=ct.default_nb),
check_task="nothing")
if insert_result is True:
self._succ += 1
num_entities = self.c_wrapper.num_entities
self.connection.flush([self.c_wrapper.collection.name])
if self.c_wrapper.num_entities == (num_entities + ct.default_nb):
self._flush_succ += 1
_, insert_result = self.c_wrap.insert(
data=cf.gen_default_dataframe_data(nb=constants.DELTA_PER_INS)
)
if self._do_flush is False:
if insert_result is True:
self._succ += 1
else:
self._flush_fail += 1
self._fail += 1
else:
self._fail += 1
self._flush_fail += 1
def insert_statics(self):
return self.statics()
def flush_statics(self):
return self._flush_succ / self.total() if self.total() != 0 else 0
def reset(self):
self._succ = 0
self._fail = 0
self._flush_succ = 0
self._flush_fail = 0
entities_1 = self.c_wrap.num_entities
self.conn.flush([self.c_wrap.name])
entities_2 = self.c_wrap.num_entities
if entities_2 == (entities_1 + constants.DELTA_PER_INS):
self._succ += 1
else:
self._fail += 1
class CreateChecker(Checker):
def __init__(self, collection_wrapper):
def __init__(self):
super().__init__()
self.c_wrapper = collection_wrapper
self.num = 0
self.c_wrapper = ApiCollectionWrapper()
def keep_running(self):
while self._running is True:
sleep(2)
collection, result = self.c_wrapper.init_collection(
name=cf.gen_unique_str(),
name=cf.gen_unique_str("CreateChecker_"),
schema=cf.gen_default_collection_schema(),
check_task="check_nothing"
)
@ -122,11 +120,3 @@ class QueryChecker(Checker):
def keep_running(self):
pass
class FlushChecker(Checker):
def __init__(self):
super().__init__()
def keep_running(self):
pass

View File

@ -0,0 +1,35 @@
# MONGO_SERVER = 'mongodb://192.168.1.234:27017/'
MONGO_SERVER = 'mongodb://mongodb.test:27017/'
SCHEDULER_DB = "scheduler"
JOB_COLLECTION = "jobs"
REGISTRY_URL = "registry.zilliz.com/milvus/milvus"
IDC_NAS_URL = "//172.16.70.249/test"
SERVER_HOST_DEFAULT = "127.0.0.1"
SERVER_PORT_DEFAULT = 19530
SERVER_VERSION = "2.0"
HELM_NAMESPACE = "milvus"
BRANCH = "master"
DEFAULT_CPUS = 48
RAW_DATA_DIR = "/test/milvus/raw_data/"
# nars log
LOG_PATH = "/test/milvus/benchmark/logs/{}/".format(BRANCH)
DEFAULT_DEPLOY_MODE = "single"
NAMESPACE = "default"
DEFAULT_API_VERSION = 'chaos-mesh.org/v1alpha1'
DEFAULT_GROUP = 'chaos-mesh.org'
DEFAULT_VERSION = 'v1alpha1'
SUCC = 'succ'
FAIL = 'fail'
DELTA_PER_INS = 10
CHAOS_CONFIG_ENV = 'CHAOS_CONFIG_PATH' # env variables for chao path
TESTS_CONFIG_LOCATION = 'chaos_objects/'

View File

@ -0,0 +1,158 @@
import pytest
from time import sleep
from pymilvus_orm import connections
from checker import CreateChecker, Op
from chaos_opt import ChaosOpt
from utils.util_log import test_log as log
from base.collection_wrapper import ApiCollectionWrapper
from common import common_func as cf
from chaos_commons import *
import constants
class TestChaosBase:
expect_create = constants.SUCC
expect_insert = constants.SUCC
expect_flush = constants.SUCC
expect_index = constants.SUCC
expect_search = constants.SUCC
expect_query = constants.SUCC
chaos_location = ''
health_checkers = {}
def parser_testcase_config(self, chaos_yaml):
tests_yaml = constants.TESTS_CONFIG_LOCATION + 'testcases.yaml'
tests_config = gen_experiment_config(tests_yaml)
test_collections = tests_config.get('Collections', None)
for t in test_collections:
test_chaos = t.get('testcase', {}).get('chaos', {})
if test_chaos in chaos_yaml:
expects = t.get('testcase', {}).get('expectation', {}) \
.get('cluster_1_node', {})
self.expect_create = expects.get(Op.create, constants.SUCC)
self.expect_insert = expects.get(Op.insert, constants.SUCC)
self.expect_flush = expects.get(Op.flush, constants.SUCC)
self.expect_index = expects.get(Op.index, constants.SUCC)
self.expect_search = expects.get(Op.search, constants.SUCC)
self.expect_query = expects.get(Op.query, constants.SUCC)
return True
return False
class TestChaos(TestChaosBase):
@pytest.fixture(scope="function", autouse=True)
def connection(self):
connections.add_connection(default={"host": "192.168.1.239", "port": 19530})
conn = connections.connect(alias='default')
if conn is None:
raise Exception("no connections")
return conn
@pytest.fixture(scope="function")
def collection_wrap_4_insert(self, connection):
c_wrap = ApiCollectionWrapper()
c_wrap.init_collection(name=cf.gen_unique_str("collection_4_insert"),
schema=cf.gen_default_collection_schema(),
check_task="check_nothing")
return c_wrap
@pytest.fixture(scope="function")
def collection_wrap_4_flush(self, connection):
c_wrap = ApiCollectionWrapper()
c_wrap.init_collection(name=cf.gen_unique_str("collection_4_insert"),
schema=cf.gen_default_collection_schema(),
check_task="check_nothing")
return c_wrap
@pytest.fixture(scope="function")
def collection_wrap_4_search(self, connection):
c_wrap = ApiCollectionWrapper()
c_wrap.init_collection(name=cf.gen_unique_str("collection_4_search_"),
schema=cf.gen_default_collection_schema(),
check_task="check_nothing")
c_wrap.insert(data=cf.gen_default_dataframe_data(nb=10000))
return c_wrap
@pytest.fixture(scope="function", autouse=True)
def init_health_checkers(self, connection, collection_wrap_4_insert,
collection_wrap_4_flush, collection_wrap_4_search):
checkers = {}
# search_ch = SearchChecker(collection_wrap=collection_wrap_4_search)
# checkers[Op.search] = search_ch
# insert_ch = InsertFlushChecker(connection=connection,
# collection_wrap=collection_wrap_4_insert)
# checkers[Op.insert] = insert_ch
# flush_ch = InsertFlushChecker(connection=connection,
# collection_wrap=collection_wrap_4_flush,
# do_flush=True)
# checkers[Op.flush] = flush_ch
create_ch = CreateChecker()
checkers[Op.create] = create_ch
self.health_checkers = checkers
def teardown(self):
for ch in self.health_checkers.values():
ch.terminate()
pass
@pytest.mark.parametrize('chaos_yaml', get_chaos_yamls())
def test_chaos(self, chaos_yaml):
# start the monitor threads to check the milvus ops
start_monitor_threads(self.health_checkers)
# parse chaos object
print("test.start")
chaos_config = gen_experiment_config(chaos_yaml)
log.debug(chaos_config)
# parse the test expectations in testcases.yaml
self.parser_testcase_config(chaos_yaml)
# wait 120s
sleep(1)
# assert statistic:all ops 100% succ
assert_statistic(self.health_checkers)
# reset counting
reset_counting(self.health_checkers)
# apply chaos object
# chaos_opt = ChaosOpt(chaos_config['kind'])
# chaos_opt.create_chaos_object(chaos_config)
# wait 120s
sleep(1)
# assert statistic
assert_statistic(self.health_checkers, expectations={Op.create: self.expect_create,
Op.insert: self.expect_insert,
Op.flush: self.expect_flush,
Op.index: self.expect_index,
Op.search: self.expect_search,
Op.query: self.expect_query
})
#
# delete chaos
# meta_name = chaos_config.get('metadata', None).get('name', None)
# chaos_opt.delete_chaos_object(meta_name)
# reset counting again
reset_counting(self.health_checkers)
# wait 300s (varies by feature)
sleep(1)
# assert statistic: all ops success again
assert_statistic(self.health_checkers)
# terminate thread
for ch in self.health_checkers.values():
ch.terminate()
# log.debug("*******************Test Completed.*******************")

View File

@ -130,7 +130,7 @@ class TestConnectionOperation(TestcaseBase):
"""
@pytest.mark.xfail(reason="#5684")
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
def test_connection_add_wrong_format(self):
"""
target: test add_connection, regardless of whether the connection exists
@ -159,7 +159,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.get_connection_addr(alias="testing", check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {"": ""}})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
def test_connection_add_more(self):
"""
target: test add_connection passes in multiple parameters
@ -188,7 +188,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.get_connection_addr(alias="alias2", check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {"host": "192.168.1.1", "port": "123"}})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
def test_connection_add_single_more(self):
"""
target: test add connections separately
@ -214,7 +214,7 @@ class TestConnectionOperation(TestcaseBase):
check_items={
"dict_content": {"host": "192.168.1.1", "port": "123"}})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L0)
def test_connection_add_default(self):
"""
target: add_connection passes default params successfully
@ -234,7 +234,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': 'localhost', 'port': '19530'}})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L0)
def test_connection_add_cover_default(self):
"""
target: add a connection to override the default connection
@ -257,7 +257,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': '192.168.1.1', 'port': '12345'}})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
def test_connection_get_addr_not_exist(self):
"""
target: get addr of alias that is not exist and return {}
@ -270,13 +270,14 @@ class TestConnectionOperation(TestcaseBase):
check_items={"dict_content": {}})
@pytest.mark.skip("The maximum number of add_connection is not set")
@pytest.mark.tags(CaseLabel.L2)
def test_connection_add_max(self):
"""
The maximum number of add_connection is not set
"""
pass
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
def test_connection_add_after_connect(self, host, port):
"""
target: add_connect passes different params after normal connect
@ -297,7 +298,7 @@ class TestConnectionOperation(TestcaseBase):
# add connection with the same params
self.connection_wrap.add_connection(test_alias_name={"host": host, "port": port})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
def test_connection_add_after_default_connect(self, host, port):
"""
target: add_connect passes different params after normal connect passes default alias
@ -318,7 +319,7 @@ class TestConnectionOperation(TestcaseBase):
# add connection with the same params
self.connection_wrap.add_connection(test_alias_name={"host": host, "port": port})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
def test_connection_add_after_disconnect(self, host, port):
"""
target: add_connect after normal connectdisconnect
@ -343,7 +344,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.get_connection_addr(alias="test_alias_name", check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {"host": "localhost", "port": "1"}})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
def test_connection_add_after_remove(self, host, port):
"""
target: add_connect after normal connectremove_connection
@ -368,7 +369,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.get_connection_addr(alias="test_alias_name", check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {"host": "localhost", "port": "1"}})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
def test_connection_connect_alias_not_exist(self):
"""
target: connect passes alias that is not exist and raise error
@ -389,7 +390,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': "localhost", 'port': "19530"}})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
def test_connection_connect_default_alias_invalid(self, port):
"""
target: connect passes configure is not exist and raise error
@ -413,7 +414,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': "host", 'port': port}})
@ pytest.mark.tags(CaseLabel.L3)
@ pytest.mark.tags(CaseLabel.L0)
def test_connection_connect_default_alias_effective(self, host, port):
"""
target: connect passes useful configure that adds by add_connect
@ -436,7 +437,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': host, 'port': port}})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING])
def test_connection_connect_repeat(self, host, port, connect_name):
"""
@ -472,7 +473,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.connect(alias=connect_name, host="host", port=port,
check_task=CheckTasks.err_res, check_items={"err_code": -1, "err_msg": err_msg})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING, "test_alias_nme"])
def test_connection_connect_params(self, host, port, connect_name):
"""
@ -545,7 +546,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {"host": "localhost", "port": "19530"}})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L0)
def test_connection_disconnect_after_default_connect(self, host, port):
"""
target: disconnect default connect and check result
@ -581,7 +582,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': host, 'port': port}})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
def test_connection_disconnect_after_connect(self, host, port):
"""
target: disconnect test connect and check result
@ -618,7 +619,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.get_connection_addr(alias=test_alias_name, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': host, 'port': port}})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
def test_connection_remove_connection_not_exist(self):
"""
target: remove connection that is not exist and check result
@ -633,7 +634,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None)]})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
def test_connection_remove_default_alias(self):
"""
target: remove default alias connect and check result
@ -648,7 +649,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": []})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING, "test_alias_name"])
def test_connection_remove_after_connect(self, host, port, connect_name):
"""
@ -674,7 +675,7 @@ class TestConnectionOperation(TestcaseBase):
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": list_content})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING, "test_alias_name"])
def test_connection_remove_after_disconnect(self, host, port, connect_name):
"""