diff --git a/tests/python_client/chaos/chaos_commons.py b/tests/python_client/chaos/chaos_commons.py index 732a9b6a5f..1c97ed66bd 100644 --- a/tests/python_client/chaos/chaos_commons.py +++ b/tests/python_client/chaos/chaos_commons.py @@ -4,7 +4,7 @@ import glob from chaos import constants from yaml import full_load from utils.util_log import test_log as log - +from delayed_assert import expect def check_config(chaos_config): if not chaos_config.get('kind', None): @@ -72,3 +72,19 @@ def reconnect(connections, alias='default'): res = connections.get_connection_addr(alias) connections.remove_connection(alias) return connections.connect(alias, host=res["host"], port=res["port"]) + + +def assert_statistic(checkers, expectations={}): + for k in checkers.keys(): + # expect succ if no expectations + succ_rate = checkers[k].succ_rate() + total = checkers[k].total() + average_time = checkers[k].average_time + if expectations.get(k, '') == constants.FAIL: + log.info(f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") + expect(succ_rate < 0.49 or total < 2, + f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") + else: + log.info(f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") + expect(succ_rate > 0.90 and total > 2, + f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") \ No newline at end of file diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index 360ef9d5d7..572c369a55 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -15,15 +15,19 @@ from utils.util_log import test_log as log class Op(Enum): - create = 'create' - insert = 'insert' - flush = 'flush' - index = 'index' - search = 'search' - query = 'query' - bulk_load = 'bulk_load' + create = "create" + insert = "insert" + flush = "flush" + index = "index" + search = "search" + query = "query" + delete = "delete" + compact = "compact" + drop = "drop" + load_balance = "load_balance" + bulk_load = "bulk_load" - unknown = 'unknown' + unknown = "unknown" timeout = 20 @@ -43,16 +47,21 @@ class Checker: self.rsp_times = [] self.average_time = 0 self.c_wrap = ApiCollectionWrapper() - c_name = collection_name if collection_name is not None else cf.gen_unique_str('Checker_') - self.c_wrap.init_collection(name=c_name, - schema=cf.gen_default_collection_schema(), - shards_num=shards_num, - timeout=timeout, - enable_traceback=enable_traceback) - self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH), - timeout=timeout, - enable_traceback=enable_traceback) + c_name = collection_name if collection_name is not None else cf.gen_unique_str("Checker_") + self.c_wrap.init_collection( + name=c_name, + schema=cf.gen_default_collection_schema(), + shards_num=shards_num, + timeout=timeout, + enable_traceback=enable_traceback, + ) + self.c_wrap.insert( + data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH), + timeout=timeout, + enable_traceback=enable_traceback, + ) self.initial_entities = self.c_wrap.num_entities # do as a flush + self.c_wrap.release() def total(self): return self._succ + self._fail @@ -81,6 +90,8 @@ class SearchChecker(Checker): """check search operations in a dependent thread""" def __init__(self, collection_name=None, shards_num=2, replica_number=1): + if collection_name is None: + collection_name = cf.gen_unique_str("SearchChecker_") super().__init__(collection_name=collection_name, shards_num=shards_num) self.c_wrap.load(replica_number=replica_number) # do load before search @@ -92,16 +103,21 @@ class SearchChecker(Checker): data=search_vec, anns_field=ct.default_float_vec_field_name, param={"nprobe": 32}, - limit=1, timeout=timeout, + limit=1, + timeout=timeout, enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing + check_task=CheckTasks.check_nothing, ) t1 = time.time() if result: self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1) + self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( + self._succ + 1 + ) self._succ += 1 - log.debug(f"search success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}") + log.debug( + f"search success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" + ) else: self._fail += 1 sleep(constants.WAIT_PER_OP / 10) @@ -111,6 +127,11 @@ class InsertFlushChecker(Checker): """check Insert and flush operations in a dependent thread""" def __init__(self, collection_name=None, flush=False, shards_num=2): + if collection_name is None: + if flush: + collection_name = cf.gen_unique_str("FlushChecker_") + else: + collection_name = cf.gen_unique_str("InsertChecker_") super().__init__(collection_name=collection_name, shards_num=shards_num) self._flush = flush self.initial_entities = self.c_wrap.num_entities @@ -118,18 +139,23 @@ class InsertFlushChecker(Checker): def keep_running(self): while True: t0 = time.time() - _, insert_result = \ - self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS), - timeout=timeout, - enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing) + _, insert_result = self.c_wrap.insert( + data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing, + ) t1 = time.time() if not self._flush: if insert_result: self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1) + self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( + self._succ + 1 + ) self._succ += 1 - log.debug(f"insert success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}") + log.debug( + f"insert success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" + ) else: self._fail += 1 sleep(constants.WAIT_PER_OP / 10) @@ -140,9 +166,13 @@ class InsertFlushChecker(Checker): t1 = time.time() if num_entities == (self.initial_entities + constants.DELTA_PER_INS): self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1) + self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( + self._succ + 1 + ) self._succ += 1 - log.debug(f"flush success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}") + log.debug( + f"flush success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" + ) self.initial_entities += constants.DELTA_PER_INS else: self._fail += 1 @@ -151,8 +181,10 @@ class InsertFlushChecker(Checker): class CreateChecker(Checker): """check create operations in a dependent thread""" - def __init__(self): - super().__init__() + def __init__(self, collection_name=None): + if collection_name is None: + collection_name = cf.gen_unique_str("CreateChecker_") + super().__init__(collection_name=collection_name) def keep_running(self): while True: @@ -162,13 +194,18 @@ class CreateChecker(Checker): schema=cf.gen_default_collection_schema(), timeout=timeout, enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing) + check_task=CheckTasks.check_nothing, + ) t1 = time.time() if result: self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1) + self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( + self._succ + 1 + ) self._succ += 1 - log.debug(f"create success, time: {t1 - t0:.4f}, average_time: {self.average_time:4f}") + log.debug( + f"create success, time: {t1 - t0:.4f}, average_time: {self.average_time:4f}" + ) self.c_wrap.drop(timeout=timeout) else: @@ -179,27 +216,40 @@ class CreateChecker(Checker): class IndexChecker(Checker): """check Insert operations in a dependent thread""" - def __init__(self): - super().__init__() - self.c_wrap.insert(data=cf.gen_default_list_data(nb=5 * constants.ENTITIES_FOR_SEARCH), - timeout=timeout, enable_traceback=enable_traceback) - log.debug(f"Index ready entities: {self.c_wrap.num_entities}") # do as a flush before indexing + def __init__(self, collection_name=None): + if collection_name is None: + collection_name = cf.gen_unique_str("IndexChecker_") + super().__init__(collection_name=collection_name) + self.c_wrap.insert( + data=cf.gen_default_list_data(nb=5 * constants.ENTITIES_FOR_SEARCH), + timeout=timeout, + enable_traceback=enable_traceback, + ) + log.debug( + f"Index ready entities: {self.c_wrap.num_entities}" + ) # do as a flush before indexing def keep_running(self): while True: t0 = time.time() - _, result = self.c_wrap.create_index(ct.default_float_vec_field_name, - constants.DEFAULT_INDEX_PARAM, - name=cf.gen_unique_str('index_'), - timeout=timeout, - enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing) + _, result = self.c_wrap.create_index( + ct.default_float_vec_field_name, + constants.DEFAULT_INDEX_PARAM, + name=cf.gen_unique_str("index_"), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing, + ) t1 = time.time() if result: self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1) + self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( + self._succ + 1 + ) self._succ += 1 - log.debug(f"index success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}") + log.debug( + f"index success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" + ) self.c_wrap.drop_index(timeout=timeout) else: self._fail += 1 @@ -209,6 +259,8 @@ class QueryChecker(Checker): """check query operations in a dependent thread""" def __init__(self, collection_name=None, shards_num=2, replica_number=1): + if collection_name is None: + collection_name = cf.gen_unique_str("QueryChecker_") super().__init__(collection_name=collection_name, shards_num=shards_num) self.c_wrap.load(replica_number=replica_number) # do load before query @@ -217,17 +269,24 @@ class QueryChecker(Checker): int_values = [] for _ in range(5): int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH)) - term_expr = f'{ct.default_int64_field_name} in {int_values}' + term_expr = f"{ct.default_int64_field_name} in {int_values}" t0 = time.time() - _, result = self.c_wrap.query(term_expr, timeout=timeout, - enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing) + _, result = self.c_wrap.query( + term_expr, + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing, + ) t1 = time.time() if result: self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1) + self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( + self._succ + 1 + ) self._succ += 1 - log.debug(f"query success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}") + log.debug( + f"query success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" + ) else: self._fail += 1 sleep(constants.WAIT_PER_OP / 10) @@ -237,24 +296,32 @@ class DeleteChecker(Checker): """check delete operations in a dependent thread""" def __init__(self, collection_name=None): + if collection_name is None: + collection_name = cf.gen_unique_str("DeleteChecker_") super().__init__(collection_name=collection_name) self.c_wrap.load() # load before query def keep_running(self): while True: - term_expr = f'{ct.default_int64_field_name} > 0' - res, _ = self.c_wrap.query(term_expr, output_fields=[ct.default_int64_field_name]) + term_expr = f"{ct.default_int64_field_name} > 0" + res, _ = self.c_wrap.query( + term_expr, output_fields=[ct.default_int64_field_name] + ) ids = [r[ct.default_int64_field_name] for r in res] delete_ids = random.sample(ids, 2) - expr = f'{ct.default_int64_field_name} in {delete_ids}' + expr = f"{ct.default_int64_field_name} in {delete_ids}" t0 = time.time() _, result = self.c_wrap.delete(expr=expr, timeout=timeout) tt = time.time() - t0 if result: self.rsp_times.append(tt) - self.average_time = (tt + self.average_time * self._succ) / (self._succ + 1) + self.average_time = (tt + self.average_time * self._succ) / ( + self._succ + 1 + ) self._succ += 1 - log.debug(f"delete success, time: {tt:.4f}, average_time: {self.average_time:.4f}") + log.debug( + f"delete success, time: {tt:.4f}, average_time: {self.average_time:.4f}" + ) else: self._fail += 1 sleep(constants.WAIT_PER_OP / 10) @@ -264,6 +331,8 @@ class CompactChecker(Checker): """check compact operations in a dependent thread""" def __init__(self, collection_name=None): + if collection_name is None: + collection_name = cf.gen_unique_str("CompactChecker_") super().__init__(collection_name=collection_name) self.ut = ApiUtilityWrapper() self.c_wrap.load(enable_traceback=enable_traceback) # load before compact @@ -279,9 +348,13 @@ class CompactChecker(Checker): t1 = time.time() if result: self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1) + self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( + self._succ + 1 + ) self._succ += 1 - log.debug(f"compact success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}") + log.debug( + f"compact success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" + ) else: self._fail += 1 sleep(constants.WAIT_PER_OP / 10) @@ -291,6 +364,8 @@ class DropChecker(Checker): """check drop operations in a dependent thread""" def __init__(self, collection_name=None): + if collection_name is None: + collection_name = cf.gen_unique_str("DropChecker_") super().__init__(collection_name=collection_name) # self.c_wrap.load(enable_traceback=enable_traceback) # load before compact @@ -301,9 +376,13 @@ class DropChecker(Checker): t1 = time.time() if result: self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1) + self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( + self._succ + 1 + ) self._succ += 1 - log.debug(f"drop success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}") + log.debug( + f"drop success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" + ) else: self._fail += 1 sleep(constants.WAIT_PER_OP / 10) @@ -313,6 +392,8 @@ class LoadBalanceChecker(Checker): """check loadbalance operations in a dependent thread""" def __init__(self, collection_name=None): + if collection_name is None: + collection_name = cf.gen_unique_str("LoadBalanceChecker_") super().__init__(collection_name=collection_name) self.utility_wrap = ApiUtilityWrapper() self.c_wrap.load(enable_traceback=enable_traceback) @@ -335,14 +416,23 @@ class LoadBalanceChecker(Checker): sealed_segment_ids = segment_distribution[src_node_id]["sealed"] # load balance t0 = time.time() - _, result = self.utility_wrap.load_balance(c_name, src_node_id, dst_node_ids, sealed_segment_ids) + _, result = self.utility_wrap.load_balance( + c_name, src_node_id, dst_node_ids, sealed_segment_ids + ) t1 = time.time() # get segments distribution after load balance time.sleep(3) res, _ = self.utility_wrap.get_query_segment_info(c_name) segment_distribution = cf.get_segment_distribution(res) - sealed_segment_ids_after_load_banalce = segment_distribution[src_node_id]["sealed"] - check_1 = len(set(sealed_segment_ids) & set(sealed_segment_ids_after_load_banalce)) == 0 + sealed_segment_ids_after_load_banalce = segment_distribution[src_node_id][ + "sealed" + ] + check_1 = ( + len( + set(sealed_segment_ids) & set(sealed_segment_ids_after_load_banalce) + ) + == 0 + ) des_sealed_segment_ids = [] for des_node_id in dst_node_ids: des_sealed_segment_ids += segment_distribution[des_node_id]["sealed"] @@ -351,9 +441,13 @@ class LoadBalanceChecker(Checker): if result and (check_1 and check_2): self.rsp_times.append(t1 - t0) - self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1) + self.average_time = ((t1 - t0) + self.average_time * self._succ) / ( + self._succ + 1 + ) self._succ += 1 - log.debug(f"load balance success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}") + log.debug( + f"load balance success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}" + ) else: self._fail += 1 sleep(10) @@ -362,8 +456,10 @@ class LoadBalanceChecker(Checker): class BulkLoadChecker(Checker): """check bulk load operations in a dependent thread""" - def __init__(self, flush=False): - super().__init__() + def __init__(self, collection_name=None, flush=False): + if collection_name is None: + collection_name = cf.gen_unique_str("BulkLoadChecker_") + super().__init__(collection_name=collection_name) self.utility_wrap = ApiUtilityWrapper() self.schema = cf.gen_default_collection_schema() self.flush = flush @@ -395,18 +491,24 @@ class BulkLoadChecker(Checker): log.info(f"flush before bulk load, cost time: {tt:.4f}") # import data t0 = time.time() - task_ids, res_1 = self.utility_wrap.bulk_load(collection_name=c_name, - row_based=self.row_based, - files=self.files) + task_ids, res_1 = self.utility_wrap.bulk_load( + collection_name=c_name, row_based=self.row_based, files=self.files + ) log.info(f"bulk load task ids:{task_ids}") - completed, res_2 = self.utility_wrap.wait_for_bulk_load_tasks_completed(task_ids=task_ids, timeout=30) + completed, res_2 = self.utility_wrap.wait_for_bulk_load_tasks_completed( + task_ids=task_ids, timeout=30 + ) tt = time.time() - t0 # added_num = sum(res_2[task_id].row_count for task_id in task_ids) if completed: self.rsp_times.append(tt) - self.average_time = (tt + self.average_time * self._succ) / (self._succ + 1) + self.average_time = (tt + self.average_time * self._succ) / ( + self._succ + 1 + ) self._succ += 1 - log.info(f"bulk load success for collection {c_name}, time: {tt:.4f}, average_time: {self.average_time:4f}") + log.info( + f"bulk load success for collection {c_name}, time: {tt:.4f}, average_time: {self.average_time:4f}" + ) if self.flush: t0 = time.time() cur_entities_num = self.c_wrap.num_entities @@ -416,7 +518,9 @@ class BulkLoadChecker(Checker): self._fail += 1 # if the task failed, store the failed collection name for further checking after chaos self.failed_tasks.append(c_name) - log.info(f"bulk load failed for collection {c_name} time: {tt:.4f}, average_time: {self.average_time:4f}") + log.info( + f"bulk load failed for collection {c_name} time: {tt:.4f}, average_time: {self.average_time:4f}" + ) sleep(constants.WAIT_PER_OP / 10) @@ -427,11 +531,14 @@ def assert_statistic(checkers, expectations={}): total = checkers[k].total() checker_result = k.check_result() - if expectations.get(k, '') == constants.FAIL: + if expectations.get(k, "") == constants.FAIL: log.info(f"Expect Fail: {str(k)} {checker_result}") - expect(succ_rate < 0.49 or total < 2, - f"Expect Fail: {str(k)} {checker_result}") + expect( + succ_rate < 0.49 or total < 2, f"Expect Fail: {str(k)} {checker_result}" + ) else: log.info(f"Expect Succ: {str(k)} {checker_result}") - expect(succ_rate > 0.90 or total > 2, - f"Expect Succ: {str(k)} {checker_result}") + expect( + succ_rate > 0.90 or total > 2, f"Expect Succ: {str(k)} {checker_result}" + ) + diff --git a/tests/python_client/chaos/scripts/get_all_collections.py b/tests/python_client/chaos/scripts/get_all_collections.py new file mode 100644 index 0000000000..62091dc811 --- /dev/null +++ b/tests/python_client/chaos/scripts/get_all_collections.py @@ -0,0 +1,36 @@ +from collections import defaultdict +import json +import argparse +from pymilvus import connections, list_collections +TIMEOUT = 120 + + +def save_all_checker_collections(host="127.0.0.1", prefix="Checker"): + # create connection + connections.connect(host=host, port="19530") + all_collections = list_collections() + if prefix is None: + all_collections = [c_name for c_name in all_collections] + else: + all_collections = [c_name for c_name in all_collections if prefix in c_name] + m = defaultdict(list) + for c_name in all_collections: + prefix = c_name.split("_")[0] + if len(m[prefix]) <= 10: + m[prefix].append(c_name) + selected_collections = [] + for v in m.values(): + selected_collections.extend(v) + data = { + "all": selected_collections + } + print("selected_collections is") + print(selected_collections) + with open("/tmp/ci_logs/all_collections.json", "w") as f: + f.write(json.dumps(data)) + + +parser = argparse.ArgumentParser(description='host ip') +parser.add_argument('--host', type=str, default='127.0.0.1', help='host ip') +args = parser.parse_args() +save_all_checker_collections(args.host) \ No newline at end of file diff --git a/tests/python_client/chaos/scripts/verify_all_collections.py b/tests/python_client/chaos/scripts/verify_all_collections.py index d8818f1ea5..1d3c125001 100644 --- a/tests/python_client/chaos/scripts/verify_all_collections.py +++ b/tests/python_client/chaos/scripts/verify_all_collections.py @@ -9,7 +9,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing permissions and limitations under the License. - +from collections import defaultdict import random import numpy as np import time @@ -110,15 +110,21 @@ args = parser.parse_args() print(f"\nStart time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}") # create connection connections.connect(host=args.host, port="19530") -print(f"\nList collections...") -collection_list = list_collections() -print(collection_list) -# keep 10 collections with prefix "CreateChecker_", others will be skiped +print("\nList collections...") +all_collections = list_collections() +print(all_collections) +all_collections = [c_name for c_name in all_collections if "Checker" in c_name] +m = defaultdict(list) +for c_name in all_collections: + prefix = c_name.split("_")[0] + if len(m[prefix]) <= 5: + m[prefix].append(c_name) +selected_collections = [] +for v in m.values(): + selected_collections.extend(v) +print("selected_collections is") +print(selected_collections) cnt = 0 -for collection_name in collection_list: - if collection_name.startswith("CreateChecker_"): - cnt += 1 - if collection_name.startswith("CreateChecker_") and cnt > 10: - continue +for collection_name in selected_collections: print(f"check collection {collection_name}") hello_milvus(collection_name) diff --git a/tests/python_client/chaos/test_chaos.py b/tests/python_client/chaos/test_chaos.py index 0ac0cd2e85..49636b8a9d 100644 --- a/tests/python_client/chaos/test_chaos.py +++ b/tests/python_client/chaos/test_chaos.py @@ -1,5 +1,4 @@ import threading - import pytest import os import time @@ -14,25 +13,10 @@ from utils.util_log import test_log as log from utils.util_k8s import wait_pods_ready, get_pod_list from utils.util_common import findkeys from chaos import chaos_commons as cc +from chaos.chaos_commons import assert_statistic from common.common_type import CaseLabel from chaos import constants -from delayed_assert import expect, assert_expectations - - -def assert_statistic(checkers, expectations={}): - for k in checkers.keys(): - # expect succ if no expectations - succ_rate = checkers[k].succ_rate() - total = checkers[k].total() - average_time = checkers[k].average_time - if expectations.get(k, '') == constants.FAIL: - log.info(f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") - expect(succ_rate < 0.49 or total < 2, - f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") - else: - log.info(f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") - expect(succ_rate > 0.90 or total > 2, - f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") +from delayed_assert import assert_expectations def check_cluster_nodes(chaos_config): diff --git a/tests/python_client/chaos/testcases/test_concurrent_operation.py b/tests/python_client/chaos/testcases/test_concurrent_operation.py new file mode 100644 index 0000000000..75e02f9273 --- /dev/null +++ b/tests/python_client/chaos/testcases/test_concurrent_operation.py @@ -0,0 +1,106 @@ +import threading +import pytest +import json +from time import sleep +from pymilvus import connections +from chaos.checker import (InsertFlushChecker, + SearchChecker, + QueryChecker, + IndexChecker, + DeleteChecker, + Op) +from common.cus_resource_opts import CustomResourceOperations as CusResource +from utils.util_log import test_log as log +from chaos import chaos_commons as cc +from common.common_type import CaseLabel +from chaos import constants +from delayed_assert import expect, assert_expectations + + +def assert_statistic(checkers, expectations={}): + for k in checkers.keys(): + # expect succ if no expectations + succ_rate = checkers[k].succ_rate() + total = checkers[k].total() + average_time = checkers[k].average_time + if expectations.get(k, '') == constants.FAIL: + log.info( + f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") + expect(succ_rate < 0.49 or total < 2, + f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") + else: + log.info( + f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") + expect(succ_rate > 0.90 and total > 2, + f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") + + +def get_all_collections(): + try: + with open("/tmp/ci_logs/all_collections.json", "r") as f: + data = json.load(f) + all_collections = data["all"] + except Exception as e: + log.error(f"get_all_collections error: {e}") + return [] + return all_collections + + +class TestBase: + expect_create = constants.SUCC + expect_insert = constants.SUCC + expect_flush = constants.SUCC + expect_index = constants.SUCC + expect_search = constants.SUCC + expect_query = constants.SUCC + host = '127.0.0.1' + port = 19530 + _chaos_config = None + health_checkers = {} + + +class TestOperatiions(TestBase): + + @pytest.fixture(scope="function", autouse=True) + def connection(self, host, port): + connections.add_connection(default={"host": host, "port": port}) + connections.connect(alias='default') + + if connections.has_connection("default") is False: + raise Exception("no connections") + self.host = host + self.port = port + + def init_health_checkers(self, collection_name=None): + c_name = collection_name + checkers = { + Op.insert: InsertFlushChecker(collection_name=c_name), + Op.flush: InsertFlushChecker(collection_name=c_name, flush=True), + Op.index: IndexChecker(collection_name=c_name), + Op.search: SearchChecker(collection_name=c_name), + Op.query: QueryChecker(collection_name=c_name), + Op.delete: DeleteChecker(collection_name=c_name), + } + self.health_checkers = checkers + + @pytest.fixture(scope="function", params=get_all_collections()) + def collection_name(self, request): + if request.param == [] or request.param == "": + pytest.skip("The collection name is invalid") + yield request.param + + @pytest.mark.tags(CaseLabel.L3) + def test_operations(self, collection_name): + # start the monitor threads to check the milvus ops + log.info("*********************Test Start**********************") + log.info(connections.get_connection_addr('default')) + c_name = collection_name + self.init_health_checkers(collection_name=c_name) + cc.start_monitor_threads(self.health_checkers) + # wait 20s + sleep(constants.WAIT_PER_OP * 2) + # assert all expectations + assert_statistic(self.health_checkers) + assert_expectations() + + log.info("*********************Chaos Test Completed**********************")