[skip ci]Refector custom resource definition class for test framework (#8065)

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>
pull/8072/head
yanliang567 2021-09-16 16:35:53 +08:00 committed by GitHub
parent ec89f2506f
commit 59a65e787e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 58 additions and 44 deletions

View File

@ -67,6 +67,6 @@ def get_chaos_yamls():
return glob.glob(constants.TESTS_CONFIG_LOCATION + constants.ALL_CHAOS_YAMLS)
def reconnect(conn, host, port):
conn.add_connection(default={"host": host, "port": port})
return conn.connect(alias='default')
def reconnect(connections, alias='default'):
# conn.add_connection(default={"host": host, "port": port})
return connections.connect(alias=alias)

View File

@ -13,5 +13,3 @@ spec:
app.kubernetes.io/instance: milvus-chaos
app.kubernetes.io/name: milvus
component: standalone
scheduler:
cron: '@every 5s'

View File

@ -14,10 +14,10 @@ RAW_DATA_DIR = "/test/milvus/raw_data/"
DEFAULT_DEPLOY_MODE = "single"
NAMESPACE = "chaos-testing"
DEFAULT_API_VERSION = 'chaos-mesh.org/v1alpha1'
DEFAULT_GROUP = 'chaos-mesh.org'
DEFAULT_VERSION = 'v1alpha1'
CHAOS_NAMESPACE = "chaos-testing"
CHAOS_API_VERSION = 'chaos-mesh.org/v1alpha1'
CHAOS_GROUP = 'chaos-mesh.org'
CHAOS_VERSION = 'v1alpha1'
SUCC = 'succ'
FAIL = 'fail'
DELTA_PER_INS = 10

View File

@ -4,7 +4,7 @@ from time import sleep
from pymilvus import connections
from checker import CreateChecker, InsertFlushChecker, \
SearchChecker, QueryChecker, IndexChecker, Op
from chaos_opt import ChaosOpt
from common.cus_resource_opts import CustomResourceOperations as CusResource
from utils.util_log import test_log as log
from chaos_commons import *
from common.common_type import CaseLabel
@ -56,8 +56,8 @@ class TestChaosBase:
self.expect_search = expects.get(Op.search.value, constants.SUCC)
self.expect_query = expects.get(Op.query.value, constants.SUCC)
log.info(f"self.expects: create:{self.expect_create}, insert:{self.expect_insert}, "
f"flush:{self.expect_flush}, index:{self.expect_index}, "
f"search:{self.expect_search}, query:{self.expect_query}")
f"flush:{self.expect_flush}, index:{self.expect_index}, "
f"search:{self.expect_search}, query:{self.expect_query}")
return True
return False
@ -88,9 +88,12 @@ class TestChaos(TestChaosBase):
self.health_checkers = checkers
def teardown(self):
chaos_opt = ChaosOpt(self._chaos_config['kind'])
chaos_res = CusResource(kind=self._chaos_config['kind'],
group=constants.CHAOS_GROUP,
version=constants.CHAOS_VERSION,
namespace=constants.CHAOS_NAMESPACE)
meta_name = self._chaos_config.get('metadata', None).get('name', None)
chaos_opt.delete_chaos_object(meta_name, raise_ex=False)
chaos_res.delete(meta_name, raise_ex=False)
for k, ch in self.health_checkers.items():
ch.terminate()
log.info(f"tear down: checker {k} terminated")
@ -124,8 +127,11 @@ class TestChaos(TestChaosBase):
assert_statistic(self.health_checkers)
# apply chaos object
chaos_opt = ChaosOpt(chaos_config['kind'])
chaos_opt.create_chaos_object(chaos_config)
chaos_res = CusResource(kind=chaos_config['kind'],
group=constants.CHAOS_GROUP,
version=constants.CHAOS_VERSION,
namespace=constants.CHAOS_NAMESPACE)
chaos_res.create(chaos_config)
log.info("chaos injected")
sleep(constants.WAIT_PER_OP * 2.1)
# reset counting
@ -150,7 +156,7 @@ class TestChaos(TestChaosBase):
# delete chaos
meta_name = chaos_config.get('metadata', None).get('name', None)
chaos_opt.delete_chaos_object(meta_name)
chaos_res.delete(meta_name)
log.info("chaos deleted")
for k, t in self.checker_threads.items():
log.info(f"Thread {k} is_alive(): {t.is_alive()}")

View File

@ -4,11 +4,11 @@ from time import sleep
from pymilvus import connections, utility
from base.collection_wrapper import ApiCollectionWrapper
from chaos_opt import ChaosOpt
from common.cus_resource_opts import CustomResourceOperations as CusResource
from common import common_func as cf
from common import common_type as ct
from chaos_commons import *
from common.common_type import CaseLabel, CheckTasks
from common.common_type import CaseLabel
from chaos import constants
@ -17,19 +17,20 @@ def reboot_pod(chaos_yaml):
chaos_config = gen_experiment_config(chaos_yaml)
log.debug(chaos_config)
# inject chaos
chaos_opt = ChaosOpt(chaos_config['kind'])
chaos_opt.create_chaos_object(chaos_config)
chaos_res = CusResource(kind=chaos_config['kind'],
group=constants.CHAOS_GROUP,
version=constants.CHAOS_VERSION,
namespace=constants.CHAOS_NAMESPACE)
chaos_res.create(chaos_config)
log.debug("chaos injected")
sleep(1)
# delete chaos
meta_name = chaos_config.get('metadata', None).get('name', None)
chaos_opt.delete_chaos_object(meta_name)
chaos_res.delete(meta_name)
log.debug("chaos deleted")
class TestChaosData:
host = 'localhost'
port = 19530
@pytest.fixture(scope="function", autouse=True)
def connection(self, host, port):
@ -37,8 +38,6 @@ class TestChaosData:
conn = connections.connect(alias='default')
if conn is None:
raise Exception("no connections")
self.host = host
self.port = port
return conn
@pytest.mark.tags(CaseLabel.L3)
@ -76,9 +75,10 @@ class TestChaosData:
collection_w.load()
search_vectors = cf.gen_vectors(1, ct.default_dim)
t0 = datetime.datetime.now()
search_params = {"metric_type": "L2", "params": {"nprobe": 16}}
search_res, _ = collection_w.search(data=search_vectors,
anns_field=ct.default_float_vec_field_name,
param={"nprobe": 16}, limit=1)
param=search_params, limit=1)
tt = datetime.datetime.now() - t0
log.debug(f"assert search: {tt}")
assert len(search_res) == 1
@ -93,7 +93,7 @@ class TestChaosData:
assert len(collection_w.indexes) == 1
# query
term_expr = f'{ct.default_int64_field_name} in [3001,4001,4999,2999]'
term_expr = f'{ct.default_int64_field_name} in [1001,1201,999,99]'
t0 = datetime.datetime.now()
query_res, _ = collection_w.query(term_expr)
tt = datetime.datetime.now() - t0
@ -104,8 +104,8 @@ class TestChaosData:
reboot_pod(chaos_yaml)
# reconnect if needed
sleep(constants.WAIT_PER_OP * 4)
reconnect(connections, self.host, self.port)
sleep(constants.WAIT_PER_OP * 3)
reconnect(connections, alias='default')
# verify collection persists
assert utility.has_collection(c_name)
@ -119,9 +119,15 @@ class TestChaosData:
assert collection_w2.has_index(i_name)
log.debug("assert index persists")
# verify search results persist
collection_w2.load()
search_res, _ = collection_w.search(data=search_vectors,
anns_field=ct.default_float_vec_field_name,
param=search_params, limit=1)
tt = datetime.datetime.now() - t0
log.debug(f"assert search: {tt}")
assert len(search_res) == 1
# verify query results persist
query_res2, _ = collection_w2.query(term_expr)
assert query_res2 == query_res
assert len(query_res2) == len(query_res)
log.debug("assert query result persists")

View File

@ -4,40 +4,44 @@ from kubernetes.client.rest import ApiException
from chaos import constants as cf
from utils.util_log import test_log as log
_GROUP = 'milvus.io'
_VERSION = 'v1alpha1'
_NAMESPACE = "default"
class ChaosOpt(object):
def __init__(self, kind, group=cf.DEFAULT_GROUP, version=cf.DEFAULT_VERSION, namespace=cf.NAMESPACE):
class CustomResourceOperations(object):
def __init__(self, kind, group=_GROUP, version=_VERSION, namespace=_NAMESPACE):
self.group = group
self.version = version
self.namespace = namespace
self.plural = kind.lower()
def create_chaos_object(self, body):
def create(self, body):
pretty = 'true'
config.load_kube_config()
api_instance = client.CustomObjectsApi()
try:
api_response = api_instance.create_namespaced_custom_object(self.group, self.version, self.namespace,
plural=self.plural, body=body, pretty=pretty)
log.debug(f"create chaos response: {api_response}")
log.debug(f"create custom resource response: {api_response}")
except ApiException as e:
log.error("Exception when calling CustomObjectsApi->create_namespaced_custom_object: %s\n" % e)
raise Exception(str(e))
def delete_chaos_object(self, metadata_name, raise_ex=True):
def delete(self, metadata_name, raise_ex=True):
print(metadata_name)
try:
config.load_kube_config()
api_instance = client.CustomObjectsApi()
data = api_instance.delete_namespaced_custom_object(self.group, self.version, self.namespace, self.plural,
metadata_name)
log.debug(f"delete chaos response: {data}")
log.debug(f"delete custom resource response: {data}")
except ApiException as e:
if raise_ex:
log.error("Exception when calling CustomObjectsApi->delete_namespaced_custom_object: %s\n" % e)
raise Exception(str(e))
def list_chaos_object(self):
def list_all(self):
try:
config.load_kube_config()
api_instance = client.CustomObjectsApi()
@ -48,10 +52,10 @@ class ChaosOpt(object):
raise Exception(str(e))
return data
def delete_all_chaos_object(self):
chaos_objects = self.list_chaos_object()
if len(chaos_objects["items"]) > 0:
for item in chaos_objects["items"]:
def delete_all(self):
cus_objects = self.list_all()
if len(cus_objects["items"]) > 0:
for item in cus_objects["items"]:
metadata_name = item["metadata"]["name"]
self.delete_chaos_object(metadata_name)
self.delete(metadata_name)