[test]Sync chaos dir from master branch (#22454)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/22464/head
zhuwenxing 2023-02-28 12:03:47 +08:00 committed by GitHub
parent dfa407ed38
commit 4b521bef3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 214 additions and 131 deletions

View File

@ -87,7 +87,7 @@ def reconnect(connections, alias='default', timeout=360):
return connections.connect(alias)
def assert_statistic(checkers, expectations={}):
def assert_statistic(checkers, expectations={}, succ_rate_threshold=0.95, fail_rate_threshold=0.49):
for k in checkers.keys():
# expect succ if no expectations
succ_rate = checkers[k].succ_rate()
@ -95,9 +95,9 @@ def assert_statistic(checkers, expectations={}):
average_time = checkers[k].average_time
if expectations.get(k, '') == constants.FAIL:
log.info(f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
expect(succ_rate < 0.49 or total < 2,
expect(succ_rate < fail_rate_threshold or total < 2,
f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
else:
log.info(f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
expect(succ_rate > 0.90 and total > 2,
expect(succ_rate > succ_rate_threshold and total > 2,
f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")

View File

@ -1,15 +1,17 @@
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
name: test-datacoord-pod-failure
name: test-querynode-pod-failure
namespace: chaos-testing
spec:
selector:
pods:
chaos-testing:
- datacoord-standby-test-milvus-datacoord-b664b98df-c42d4
- milvus-multi-querynode-querynode-bcdc595d9-7vmcj
- milvus-multi-querynode-querynode-bcdc595d9-ccxls
- milvus-multi-querynode-querynode-bcdc595d9-dpwgp
mode: all
action: pod-failure
duration: 3m
duration: 2m
gracePeriod: 0

View File

@ -39,25 +39,29 @@ def trace(fmt=DEFAULT_FMT, prefix='chaos-test', flag=True):
def decorate(func):
@functools.wraps(func)
def inner_wrapper(self, *args, **kwargs):
start_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
start_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
t0 = time.perf_counter()
res, result = func(self, *args, **kwargs)
elapsed = time.perf_counter() - t0
end_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
end_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
operation_name = func.__name__
if flag:
collection_name = self.c_wrap.name
operation_name = func.__name__
log_str = f"[{prefix}]" + fmt.format(**locals())
# TODO: add report function in this place, like uploading to influxdb
# it is better a async way to do this, in case of blocking the request processing
log.debug(log_str)
log.info(log_str)
if result:
self.rsp_times.append(elapsed)
self.average_time = (
elapsed + self.average_time * self._succ) / (self._succ + 1)
self._succ += 1
if len(self.fail_records) > 0 and self.fail_records[-1][0] == "failure" and \
self._succ + self._fail == self.fail_records[-1][1] + 1:
self.fail_records.append(("success", self._succ + self._fail, start_time))
else:
self._fail += 1
self.fail_records.append(("failure", self._succ + self._fail, start_time))
return res, result
return inner_wrapper
return decorate
@ -75,7 +79,7 @@ def exception_handler():
e_str = str(e)
log_e = e_str[0:log_row_length] + \
'......' if len(e_str) > log_row_length else e_str
log.error(f"class: {self.__class__.__name__}, func name: {func.__name__}, error: {log_e}")
log.error(log_e)
return Error(e), False
return inner_wrapper
return wrapper
@ -88,9 +92,10 @@ class Checker:
b. count operations and success rate
"""
def __init__(self, collection_name=None, shards_num=2):
def __init__(self, collection_name=None, shards_num=2, dim=ct.default_dim):
self._succ = 0
self._fail = 0
self.fail_records = []
self._keep_running = True
self.rsp_times = []
self.average_time = 0
@ -98,11 +103,12 @@ class Checker:
c_name = collection_name if collection_name is not None else cf.gen_unique_str(
'Checker_')
self.c_wrap.init_collection(name=c_name,
schema=cf.gen_default_collection_schema(),
schema=cf.gen_default_collection_schema(dim=dim),
shards_num=shards_num,
timeout=timeout,
# active_trace=True,
enable_traceback=enable_traceback)
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH),
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH, dim=dim),
timeout=timeout,
enable_traceback=enable_traceback)
self.initial_entities = self.c_wrap.num_entities # do as a flush
@ -124,6 +130,9 @@ class Checker:
checker_name = self.__class__.__name__
checkers_result = f"{checker_name}, succ_rate: {succ_rate:.2f}, total: {total:03d}, average_time: {average_time:.4f}, max_time: {max_time:.4f}, min_time: {min_time:.4f}"
log.info(checkers_result)
log.info(f"{checker_name} rsp times: {self.rsp_times}")
if len(self.fail_records) > 0:
log.info(f"{checker_name} failed at {self.fail_records}")
return checkers_result
def terminate(self):
@ -146,7 +155,7 @@ class SearchChecker(Checker):
super().__init__(collection_name=collection_name, shards_num=shards_num)
self.c_wrap.create_index(ct.default_float_vec_field_name,
constants.DEFAULT_INDEX_PARAM,
name=cf.gen_unique_str('index_'),
index_name=cf.gen_unique_str('index_'),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
@ -318,6 +327,7 @@ class IndexChecker(Checker):
if collection_name is None:
collection_name = cf.gen_unique_str("IndexChecker_")
super().__init__(collection_name=collection_name)
self.index_name = cf.gen_unique_str('index_')
self.c_wrap.insert(data=cf.gen_default_list_data(nb=5 * constants.ENTITIES_FOR_SEARCH),
timeout=timeout, enable_traceback=enable_traceback)
# do as a flush before indexing
@ -327,8 +337,7 @@ class IndexChecker(Checker):
def create_index(self):
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
constants.DEFAULT_INDEX_PARAM,
name=cf.gen_unique_str(
'index_'),
index_name=self.index_name,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
@ -356,7 +365,7 @@ class QueryChecker(Checker):
super().__init__(collection_name=collection_name, shards_num=shards_num)
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
constants.DEFAULT_INDEX_PARAM,
name=cf.gen_unique_str(
index_name=cf.gen_unique_str(
'index_'),
timeout=timeout,
enable_traceback=enable_traceback,
@ -395,7 +404,7 @@ class LoadChecker(Checker):
self.replica_number = replica_number
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
constants.DEFAULT_INDEX_PARAM,
name=cf.gen_unique_str(
index_name=cf.gen_unique_str(
'index_'),
timeout=timeout,
enable_traceback=enable_traceback,
@ -428,7 +437,7 @@ class DeleteChecker(Checker):
super().__init__(collection_name=collection_name)
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
constants.DEFAULT_INDEX_PARAM,
name=cf.gen_unique_str(
index_name=cf.gen_unique_str(
'index_'),
timeout=timeout,
enable_traceback=enable_traceback,
@ -468,7 +477,7 @@ class CompactChecker(Checker):
self.ut = ApiUtilityWrapper()
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
constants.DEFAULT_INDEX_PARAM,
name=cf.gen_unique_str(
index_name=cf.gen_unique_str(
'index_'),
timeout=timeout,
enable_traceback=enable_traceback,
@ -532,7 +541,7 @@ class LoadBalanceChecker(Checker):
self.utility_wrap = ApiUtilityWrapper()
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
constants.DEFAULT_INDEX_PARAM,
name=cf.gen_unique_str(
index_name=cf.gen_unique_str(
'index_'),
timeout=timeout,
enable_traceback=enable_traceback,
@ -578,16 +587,26 @@ class LoadBalanceChecker(Checker):
class BulkInsertChecker(Checker):
"""check bulk load operations in a dependent thread"""
def __init__(self, collection_name=None, files=[]):
def __init__(self, collection_name=None, files=[], use_one_collection=False, dim=ct.default_dim, create_index=True):
if collection_name is None:
collection_name = cf.gen_unique_str("BulkLoadChecker_")
super().__init__(collection_name=collection_name)
collection_name = cf.gen_unique_str("BulkInsertChecker_")
super().__init__(collection_name=collection_name, dim=dim)
self.create_index = create_index
if self.create_index:
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str(
'index_'),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
self.utility_wrap = ApiUtilityWrapper()
self.schema = cf.gen_default_collection_schema()
self.files = files
self.recheck_failed_task = False
self.failed_tasks = []
self.c_name = None
self.use_one_collection = use_one_collection # if True, all tasks will use one collection to bulk insert
self.c_name = collection_name
def update(self, files=None, schema=None):
if files is not None:
@ -597,20 +616,30 @@ class BulkInsertChecker(Checker):
@trace()
def bulk_insert(self):
task_ids, result = self.utility_wrap.bulk_insert(collection_name=self.c_name,
files=self.files)
completed, result = self.utility_wrap.wait_for_bulk_insert_tasks_completed(task_ids=task_ids, timeout=60)
log.info(f"bulk insert collection name: {self.c_name}")
task_ids, result = self.utility_wrap.do_bulk_insert(collection_name=self.c_name,
files=self.files)
completed, result = self.utility_wrap.wait_for_bulk_insert_tasks_completed(task_ids=[task_ids], timeout=120)
return task_ids, completed
@exception_handler()
def run_task(self):
if self.recheck_failed_task and self.failed_tasks:
self.c_name = self.failed_tasks.pop(0)
log.debug(f"check failed task: {self.c_name}")
else:
self.c_name = cf.gen_unique_str("BulkLoadChecker_")
if not self.use_one_collection:
if self.recheck_failed_task and self.failed_tasks:
self.c_name = self.failed_tasks.pop(0)
log.debug(f"check failed task: {self.c_name}")
else:
self.c_name = cf.gen_unique_str("BulkInsertChecker_")
self.c_wrap.init_collection(name=self.c_name, schema=self.schema)
# import data
if self.create_index:
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str(
'index_'),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
# bulk insert data
task_ids, completed = self.bulk_insert()
if not completed:
self.failed_tasks.append(self.c_name)

View File

@ -1,37 +1,13 @@
cluster:
enabled: true
log:
level: debug
image:
all:
repository: milvusdb/milvus
tag: master-latest
pullPolicy: IfNotPresent
rootCoordinator:
replicas: 2
activeStandby:
enabled: true # Enable active-standby when you set multiple replicas for root coordinator
queryCoordinator:
replicas: 2
activeStandby:
enabled: true # Enable active-standby when you set multiple replicas for root coordinator
dataCoordinator:
replicas: 2
activeStandby:
enabled: true # Enable active-standby when you set multiple replicas for root coordinator
indexCoordinator:
replicas: 2
activeStandby:
enabled: true # Enable active-standby when you set multiple replicas for root coordinator
etcd:
replicaCount: 3
image:

View File

@ -3,8 +3,8 @@ import pytest
def pytest_addoption(parser):
parser.addoption("--chaos_type", action="store", default="pod_kill", help="chaos_type")
parser.addoption("--role_type", action="store", default="activated", help="role_type")
parser.addoption("--target_component", action="store", default="querynode", help="target_component")
parser.addoption("--target_number", action="store", default="1", help="target_number")
parser.addoption("--chaos_duration", action="store", default="1m", help="chaos_duration")
parser.addoption("--chaos_interval", action="store", default="10s", help="chaos_interval")
parser.addoption("--request_duration", action="store", default="3m", help="request_duration")
@ -17,13 +17,13 @@ def chaos_type(request):
@pytest.fixture
def role_type(request):
return request.config.getoption("--role_type")
def target_component(request):
return request.config.getoption("--target_component")
@pytest.fixture
def target_component(request):
return request.config.getoption("--target_component")
def target_number(request):
return request.config.getoption("--target_number")
@pytest.fixture

View File

@ -12,7 +12,7 @@ CHAOS_GROUP = 'chaos-mesh.org' # chao mesh group
CHAOS_VERSION = 'v1alpha1' # chao mesh version
SUCC = 'succ'
FAIL = 'fail'
DELTA_PER_INS = 300 # entities per insert
DELTA_PER_INS = 10 # entities per insert
ENTITIES_FOR_SEARCH = 3000 # entities for search_collection
CHAOS_CONFIG_ENV = 'CHAOS_CONFIG_PATH' # env variables for chao path

View File

@ -1,8 +1,17 @@
release=${1:-"milvs-chaos"}
ns=${2:-"chaos-testing"}
milvus_mode=${2:-"cluster"}
ns=${3:-"chaos-testing"}
bash uninstall_milvus.sh ${release} ${ns}|| true
helm repo add milvus https://milvus-io.github.io/milvus-helm/
helm repo update
helm install --wait --timeout 360s ${release} milvus/milvus -f ../cluster-values.yaml --set metrics.serviceMonitor.enabled=true -n=${ns}
if [[ ${milvus_mode} == "cluster" ]];
then
helm install --wait --timeout 360s ${release} milvus/milvus -f ../cluster-values.yaml --set metrics.serviceMonitor.enabled=true -n=${ns}
fi
if [[ ${milvus_mode} == "standalone" ]];
then
helm install --wait --timeout 360s ${release} milvus/milvus -f ../standalone-values.yaml --set metrics.serviceMonitor.enabled=true -n=${ns}
fi

View File

@ -0,0 +1,30 @@
# Exit immediately for non zero status
set -e
release=${1:-"milvus-chaos"}
ns=${2:-"chaos-testing"}
kubectl delete milvus ${release} -n=${ns} || echo "delete milvus ${release} failed"
# uninstall helm release
helm_release_list=('minio' 'etcd' 'kafka' 'pulsar')
for helm_release in ${helm_release_list[*]}; do
echo "unistall helm release ${release}-${helm_release}"
helm uninstall ${release}-${helm_release} -n=${ns} || echo "delete helm release ${release}-${helm_release} failed"
done
# delete pvc for storage
pvc_list=('minio')
for pvc in ${pvc_list[*]}; do
echo "delete pvc with label release=${release}-${pvc}"
kubectl delete pvc -l release=${release}-${pvc} -n=${ns} || echo "delete pvc with label release=${release}-${pvc} failed"
done
# delete pvc of etcd and message queue
pvc_list=('etcd' 'kafka' 'pulsar')
for pvc in ${pvc_list[*]}; do
echo "delete pvc with label app.kubernetes.io/instance=${release}-${pvc}"
kubectl delete pvc -l app.kubernetes.io/instance=${release}-${pvc} -n=${ns} || echo "delete pvc with label release=${release}-${pvc} failed"
done

View File

@ -1,7 +1,9 @@
cluster:
enabled: false
log:
level: debug
image:
all:
repository: milvusdb/milvus

View File

@ -54,7 +54,7 @@ class TestChaosApply:
chaos_res.delete(meta_name, raise_ex=False)
sleep(2)
def test_chaos_apply(self, chaos_type, target_component, chaos_duration, chaos_interval):
def test_chaos_apply(self, chaos_type, target_component, target_number, chaos_duration, chaos_interval):
# start the monitor threads to check the milvus ops
log.info("*********************Chaos Test Start**********************")
log.info(connections.get_connection_addr('default'))
@ -67,6 +67,7 @@ class TestChaosApply:
update_key_value(chaos_config, "release", release_name)
update_key_value(chaos_config, "app.kubernetes.io/instance", release_name)
update_key_value(chaos_config, "namespaces", [self.milvus_ns])
update_key_value(chaos_config, "value", target_number)
self.chaos_config = chaos_config
if "s" in chaos_interval:
schedule = f"*/{chaos_interval[:-1]} * * * * *"
@ -76,6 +77,7 @@ class TestChaosApply:
# update chaos_duration from string to int with unit second
chaos_duration = chaos_duration.replace('h', '*3600+').replace('m', '*60+').replace('s', '*1+') + '+0'
chaos_duration = eval(chaos_duration)
update_key_value(chaos_config, "duration", f"{chaos_duration//60}m")
if self.deploy_by == "milvus-operator":
update_key_name(chaos_config, "component", "app.kubernetes.io/component")
self._chaos_config = chaos_config # cache the chaos config for tear down
@ -111,12 +113,24 @@ class TestChaosApply:
sleep(2)
# wait all pods ready
t0 = time.time()
log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label app.kubernetes.io/instance={meta_name}")
wait_pods_ready(constants.CHAOS_NAMESPACE, f"app.kubernetes.io/instance={meta_name}")
log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label release={meta_name}")
wait_pods_ready(constants.CHAOS_NAMESPACE, f"release={meta_name}")
log.info("all pods are ready")
pods_ready_time = time.time() - t0
log.info(f"pods ready time: {pods_ready_time}")
# reconnect to test the service healthy
sleep(20)
self.reconnect()
start_time = time.time()
end_time = start_time + 120
while time.time() < end_time:
try:
self.reconnect()
break
except Exception as e:
log.error(e)
sleep(2)
recovery_time = time.time() - start_time
log.info(f"recovery time: {recovery_time}")
log.info("*********************Chaos Test Completed**********************")

View File

@ -119,15 +119,16 @@ class TestChaos(TestChaosBase):
entity = dict(zip(fields_name, entity_value))
entities.append(entity)
data_dict = {"rows": entities}
file_name = "/tmp/ci_logs/bulk_insert_data_source.json"
files = [file_name]
data_source = "/tmp/ci_logs/bulk_insert_data_source.json"
file_name = "bulk_insert_data_source.json"
files = ["bulk_insert_data_source.json"]
#TODO: npy file type is not supported so far
log.info("generate bulk load file")
with open(file_name, "w") as f:
with open(data_source, "w") as f:
f.write(json.dumps(data_dict, indent=4))
log.info("upload file to minio")
client = Minio(minio_endpoint, access_key="minioadmin", secret_key="minioadmin", secure=False)
client.fput_object(bucket_name, file_name, file_name)
client.fput_object(bucket_name, file_name, data_source)
self.health_checkers[Op.bulk_insert].update(schema=schema, files=files)
log.info("prepare data for bulk load done")

View File

@ -214,7 +214,7 @@ class TestChaos(TestChaosBase):
sleep(2)
# wait all pods ready
log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label app.kubernetes.io/instance={release_name}")
ready_1 = wait_pods_ready(constants.CHAOS_NAMESPACE, f"app.kubernetes.io/instance={release_name}")
ready_1 = wait_pods_ready(constants.CHAOS_NAMESPACE,f"app.kubernetes.io/instance={release_name}")
log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label release={release_name}")
ready_2 = wait_pods_ready(constants.CHAOS_NAMESPACE, f"release={release_name}")
if ready_1 and ready_2:

View File

@ -69,7 +69,6 @@ class TestChaos(TestChaosBase):
Op.load_balance: LoadBalanceChecker()
}
self.health_checkers = checkers
ms = MilvusSys()
self.prepare_bulk_insert()
def prepare_bulk_insert(self, nb=30000, row_based=True):

View File

@ -8,8 +8,10 @@ from common.common_type import CaseLabel
from utils.util_log import test_log as log
from utils.util_common import get_collections
class TestAllCollection(TestcaseBase):
""" Test case of end to end"""
@pytest.fixture(scope="function", params=get_collections())
def collection_name(self, request):
if request.param == [] or request.param == "":
@ -22,7 +24,6 @@ class TestAllCollection(TestcaseBase):
method.__name__)
log.info("skip drop collection")
@pytest.mark.tags(CaseLabel.L1)
def test_milvus_default(self, collection_name):
# create
@ -31,6 +32,11 @@ class TestAllCollection(TestcaseBase):
collection_w = self.init_collection_wrap(name=name, active_trace=True)
tt = time.time() - t0
assert collection_w.name == name
# compact collection before getting num_entities
collection_w.flush(timeout=180)
collection_w.compact()
collection_w.wait_for_compaction_completed(timeout=720)
entities = collection_w.num_entities
log.info(f"assert create collection: {tt}, init_entities: {entities}")
@ -51,15 +57,26 @@ class TestAllCollection(TestcaseBase):
entities = collection_w.num_entities
log.info(f"assert flush: {tt}, entities: {entities}")
# search
_index_params = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}
t0 = time.time()
index, _ = collection_w.create_index(field_name=ct.default_float_vec_field_name,
index_params=_index_params,
name=cf.gen_unique_str())
tt = time.time() - t0
log.info(f"assert index: {tt}")
# create index if not have
index_infos = [index.to_dict() for index in collection_w.indexes]
index_params = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}
if len(index_infos) == 0:
log.info("collection {name} does not have index, create index for it")
t0 = time.time()
index, _ = collection_w.create_index(field_name=ct.default_float_vec_field_name,
index_params=index_params,
index_name=cf.gen_unique_str())
tt = time.time() - t0
log.info(f"assert index: {tt}")
# show index infos
index_infos = [index.to_dict() for index in collection_w.indexes]
log.info(f"index info: {index_infos}")
# load
collection_w.load()
# search
search_vectors = cf.gen_vectors(1, ct.default_dim)
search_params = {"metric_type": "L2", "params": {"ef": 64}}
t0 = time.time()
@ -71,35 +88,33 @@ class TestAllCollection(TestcaseBase):
assert len(res_1) == 1
collection_w.release()
# index
# insert data
d = cf.gen_default_list_data()
collection_w.insert(d)
log.info(f"assert index entities: {collection_w.num_entities}")
_index_params = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}
t0 = time.time()
index, _ = collection_w.create_index(field_name=ct.default_float_vec_field_name,
index_params=_index_params,
name=cf.gen_unique_str())
tt = time.time() - t0
log.info(f"assert index: {tt}")
assert len(collection_w.indexes) == 1
# search
# load
t0 = time.time()
collection_w.load()
tt = time.time() - t0
log.info(f"assert load: {tt}")
search_vectors = cf.gen_vectors(1, ct.default_dim)
# search
nq = 5
topk = 5
search_vectors = cf.gen_vectors(nq, ct.default_dim)
t0 = time.time()
res_1, _ = collection_w.search(data=search_vectors,
anns_field=ct.default_float_vec_field_name,
param=search_params, limit=1)
res, _ = collection_w.search(data=search_vectors,
anns_field=ct.default_float_vec_field_name,
param=search_params, limit=topk)
tt = time.time() - t0
log.info(f"assert search: {tt}")
assert len(res) == nq
assert len(res[0]) <= topk
# query
term_expr = f'{ct.default_int64_field_name} in [1001,1201,4999,2999]'
term_expr = f'{ct.default_int64_field_name} in [1, 2, 3, 4]'
t0 = time.time()
res, _ = collection_w.query(term_expr)
tt = time.time() - t0
log.info(f"assert query result {len(res)}: {tt}")
assert len(res) >= 4

View File

@ -112,7 +112,7 @@ class TestOperations(TestBase):
request_duration = eval(request_duration)
for i in range(10):
sleep(request_duration//10)
for k,v in self.health_checkers.items():
for k, v in self.health_checkers.items():
v.check_result()
# log.info(v.check_result())
if is_check:

View File

@ -8,8 +8,9 @@ from common.common_type import CaseLabel
from utils.util_log import test_log as log
class TestDataPersistence(TestcaseBase):
class TestDataPersistence(TestcaseBase):
""" Test case of end to end"""
def teardown_method(self, method):
log.info(("*" * 35) + " teardown " + ("*" * 35))
log.info("[teardown_method] Start teardown test case %s..." %
@ -44,15 +45,28 @@ class TestDataPersistence(TestcaseBase):
entities = collection_w.num_entities
log.info(f"assert flush: {tt}, entities: {entities}")
# search
_index_params = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}
t0 = time.time()
index, _ = collection_w.create_index(field_name=ct.default_float_vec_field_name,
index_params=_index_params,
name=cf.gen_unique_str())
tt = time.time() - t0
log.info(f"assert index: {tt}")
# create index if not have
index_infos = [index.to_dict() for index in collection_w.indexes]
index_params = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}
if len(index_infos) == 0:
log.info("collection {name} does not have index, create index for it")
t0 = time.time()
index, _ = collection_w.create_index(field_name=ct.default_float_vec_field_name,
index_params=index_params,
index_name=cf.gen_unique_str())
index, _ = collection_w.create_index(field_name=ct.default_string_field_name,
index_params={},
index_name=cf.gen_unique_str())
tt = time.time() - t0
log.info(f"assert index: {tt}")
# show index infos
index_infos = [index.to_dict() for index in collection_w.indexes]
log.info(f"index info: {index_infos}")
# load
collection_w.load()
# search
search_vectors = cf.gen_vectors(1, ct.default_dim)
search_params = {"metric_type": "L2", "params": {"ef": 64}}
t0 = time.time()
@ -64,20 +78,12 @@ class TestDataPersistence(TestcaseBase):
assert len(res_1) == 1
collection_w.release()
# index
# insert data
d = cf.gen_default_list_data()
collection_w.insert(d)
log.info(f"assert index entities: {collection_w.num_entities}")
_index_params = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}
t0 = time.time()
index, _ = collection_w.create_index(field_name=ct.default_float_vec_field_name,
index_params=_index_params,
name=cf.gen_unique_str())
tt = time.time() - t0
log.info(f"assert index: {tt}")
assert len(collection_w.indexes) == 1
log.info(f"assert entities: {collection_w.num_entities}")
# search
# load and search
t0 = time.time()
collection_w.load()
tt = time.time() - t0
@ -95,4 +101,4 @@ class TestDataPersistence(TestcaseBase):
t0 = time.time()
res, _ = collection_w.query(term_expr)
tt = time.time() - t0
log.info(f"assert query result {len(res)}: {tt}")
log.info(f"assert query result {len(res)}: {tt}")

View File

@ -13,13 +13,12 @@ from utils.util_log import test_log as log
class TestGetCollections(TestcaseBase):
""" Test case of getting all collections """
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.tags(CaseLabel.L1)
def test_get_collections_by_prefix(self,):
self._connect()
all_collections = self.utility_wrap.list_collections()[0]
all_collections = [c_name for c_name in all_collections if "Checker" in c_name]
log.info(f"all_collections: {all_collections}")
selected_collections_map = {}
for c_name in all_collections:
prefix = c_name.split("_")[0]
@ -32,7 +31,6 @@ class TestGetCollections(TestcaseBase):
selected_collections = []
for value in selected_collections_map.values():
selected_collections.extend(value)
assert len(selected_collections) > 0
log.info(f"find {len(selected_collections)} collections:")
log.info(selected_collections)
data = {

View File

@ -1,7 +1,8 @@
import pytest
from time import sleep
from pymilvus import connections
from chaos.checker import (InsertChecker,
from chaos.checker import (CreateChecker,
InsertChecker,
FlushChecker,
SearchChecker,
QueryChecker,
@ -48,6 +49,7 @@ class TestOperations(TestBase):
def init_health_checkers(self, collection_name=None):
c_name = collection_name
checkers = {
Op.create: CreateChecker(collection_name=c_name),
Op.insert: InsertChecker(collection_name=c_name),
Op.flush: FlushChecker(collection_name=c_name),
Op.index: IndexChecker(collection_name=c_name),
@ -67,15 +69,15 @@ class TestOperations(TestBase):
cc.start_monitor_threads(self.health_checkers)
log.info("*********************Load Start**********************")
# wait request_duration
request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "")
request_duration = request_duration.replace("h","*3600+").replace("m","*60+").replace("s","")
if request_duration[-1] == "+":
request_duration = request_duration[:-1]
request_duration = eval(request_duration)
for i in range(10):
sleep(request_duration//10)
for k, v in self.health_checkers.items():
for k,v in self.health_checkers.items():
v.check_result()
if is_check:
assert_statistic(self.health_checkers)
assert_statistic(self.health_checkers, succ_rate_threshold=0.98)
assert_expectations()
log.info("*********************Chaos Test Completed**********************")