diff --git a/tests/benchmark/requirements.txt b/tests/benchmark/requirements.txt index 8d11793735..2fe4f4d03e 100644 --- a/tests/benchmark/requirements.txt +++ b/tests/benchmark/requirements.txt @@ -2,12 +2,12 @@ # --extra-index-url https://test.pypi.org/simple/ # pymilvus==2.0.0rc3.dev8 -grpcio==1.37.1 +grpcio==1.53.0 grpcio-testing==1.37.1 grpcio-tools==1.37.1 pandas==1.1.5 -scipy==1.3.1 +scipy==1.10.0 scikit-learn==0.19.1 h5py==2.7.1 # influxdb==5.2.2 diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index b7e9a3b61e..e0efaaace0 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -1,8 +1,15 @@ import pytest +import unittest from enum import Enum from random import randint import time +import threading +import os +import uuid +import json +import pandas as pd from datetime import datetime +from prettytable import PrettyTable import functools from time import sleep from base.collection_wrapper import ApiCollectionWrapper @@ -16,6 +23,167 @@ from common.common_type import CheckTasks from utils.util_log import test_log as log from utils.api_request import Error +lock = threading.Lock() + + +def get_chaos_info(): + try: + with open(constants.CHAOS_INFO_SAVE_PATH, 'r') as f: + chaos_info = json.load(f) + except Exception as e: + log.error(f"get_chaos_info error: {e}") + return None + return chaos_info + + +class Singleton(type): + instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls.instances: + cls.instances[cls] = super().__call__(*args, **kwargs) + return cls.instances[cls] + + +class EventRecords(metaclass=Singleton): + + def __init__(self): + self.file_name = f"/tmp/ci_logs/event_records_{uuid.uuid4()}.parquet" + self.created_file = False + + def insert(self, event_name, event_status, ts=None): + insert_ts = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f') if ts is None else ts + data = { + "event_name": [event_name], + "event_status": [event_status], + "event_ts": [insert_ts] + } + df = pd.DataFrame(data) + if not self.created_file: + with lock: + df.to_parquet(self.file_name, engine='fastparquet') + self.created_file = True + else: + with lock: + df.to_parquet(self.file_name, engine='fastparquet', append=True) + + def get_records_df(self): + df = pd.read_parquet(self.file_name) + return df + + +class RequestRecords(metaclass=Singleton): + + def __init__(self): + self.file_name = f"/tmp/ci_logs/request_records_{uuid.uuid4()}.parquet" + self.buffer = [] + self.created_file = False + + def insert(self, operation_name, collection_name, start_time, time_cost, result): + data = { + "operation_name": operation_name, + "collection_name": collection_name, + "start_time": start_time, + "time_cost": time_cost, + "result": result + } + self.buffer.append(data) + if len(self.buffer) > 100: + df = pd.DataFrame(self.buffer) + if not self.created_file: + with lock: + df.to_parquet(self.file_name, engine='fastparquet') + self.created_file = True + else: + with lock: + df.to_parquet(self.file_name, engine='fastparquet', append=True) + self.buffer = [] + + def sink(self): + df = pd.DataFrame(self.buffer) + if not self.created_file: + with lock: + df.to_parquet(self.file_name, engine='fastparquet') + self.created_file = True + else: + with lock: + df.to_parquet(self.file_name, engine='fastparquet', append=True) + + def get_records_df(self): + self.sink() + df = pd.read_parquet(self.file_name) + return df + + +class ResultAnalyzer: + + def __init__(self): + rr = RequestRecords() + df = rr.get_records_df() + df["start_time"] = pd.to_datetime(df["start_time"]) + df = df.sort_values(by='start_time') + self.df = df + self.chaos_info = get_chaos_info() + + def get_stage_success_rate(self): + df = self.df + window = pd.offsets.Milli(1000) + + result = df.groupby([pd.Grouper(key='start_time', freq=window), 'operation_name']).apply(lambda x: pd.Series({ + 'success_count': x[x['result'] == 'True'].shape[0], + 'failed_count': x[x['result'] == 'False'].shape[0] + })) + data = result.reset_index() + data['success_rate'] = data['success_count'] / (data['success_count'] + data['failed_count']).replace(0, 1) + grouped_data = data.groupby('operation_name') + if self.chaos_info is None: + chaos_start_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f') + chaos_end_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f') + recovery_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f') + else: + chaos_start_time = self.chaos_info['create_time'] + chaos_end_time = self.chaos_info['delete_time'] + recovery_time = self.chaos_info['recovery_time'] + stage_success_rate = {} + for name, group in grouped_data: + log.info(f"operation_name: {name}") + # spilt data to 3 parts by chaos start time and chaos end time and aggregate the success rate + data_before_chaos = group[group['start_time'] < chaos_start_time].agg( + {'success_rate': 'mean', 'failed_count': 'sum', 'success_count': 'sum'}) + data_during_chaos = group[ + (group['start_time'] >= chaos_start_time) & (group['start_time'] <= recovery_time)].agg( + {'success_rate': 'mean', 'failed_count': 'sum', 'success_count': 'sum'}) + data_after_chaos = group[group['start_time'] > recovery_time].agg( + {'success_rate': 'mean', 'failed_count': 'sum', 'success_count': 'sum'}) + stage_success_rate[name] = { + 'before_chaos': f"{data_before_chaos['success_rate']}({data_before_chaos['success_count']}/{data_before_chaos['success_count'] + data_before_chaos['failed_count']})" if not data_before_chaos.empty else "no data", + 'during_chaos': f"{data_during_chaos['success_rate']}({data_during_chaos['success_count']}/{data_during_chaos['success_count'] + data_during_chaos['failed_count']})" if not data_during_chaos.empty else "no data", + 'after_chaos': f"{data_after_chaos['success_rate']}({data_after_chaos['success_count']}/{data_after_chaos['success_count'] + data_after_chaos['failed_count']})" if not data_after_chaos.empty else "no data", + } + log.info(f"stage_success_rate: {stage_success_rate}") + return stage_success_rate + + def get_realtime_success_rate(self, interval=10): + df = self.df + window = pd.offsets.Second(interval) + result = df.groupby([pd.Grouper(key='start_time', freq=window), 'operation_name']).apply(lambda x: pd.Series({ + 'success_count': x[x['result'] == 'True'].shape[0], + 'failed_count': x[x['result'] == 'False'].shape[0] + })) + data = result.reset_index() + data['success_rate'] = data['success_count'] / (data['success_count'] + data['failed_count']).replace(0, 1) + grouped_data = data.groupby('operation_name') + return grouped_data + + def show_result_table(self): + table = PrettyTable() + table.field_names = ['operation_name', 'before_chaos', 'during_chaos', 'after_chaos'] + data = self.get_stage_success_rate() + for operation, values in data.items(): + row = [operation, values['before_chaos'], values['during_chaos'], values['after_chaos']] + table.add_row(row) + log.info(f"succ rate for operations in different stage\n{table}") + class Op(Enum): create = 'create' @@ -39,28 +207,36 @@ query_timeout = 10 enable_traceback = False DEFAULT_FMT = '[start time:{start_time}][time cost:{elapsed:0.8f}s][operation_name:{operation_name}][collection name:{collection_name}] -> {result!r}' +request_records = RequestRecords() + def trace(fmt=DEFAULT_FMT, prefix='test', flag=True): def decorate(func): @functools.wraps(func) def inner_wrapper(self, *args, **kwargs): - start_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ') + start_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f') start_time_ts = time.time() t0 = time.perf_counter() res, result = func(self, *args, **kwargs) elapsed = time.perf_counter() - t0 - end_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ') + end_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f') operation_name = func.__name__ if flag: collection_name = self.c_wrap.name log_str = f"[{prefix}]" + fmt.format(**locals()) # TODO: add report function in this place, like uploading to influxdb - # it is better a async way to do this, in case of blocking the request processing + try: + t0 = time.perf_counter() + request_records.insert(operation_name, collection_name, start_time, elapsed, str(result)) + tt = time.perf_counter() - t0 + log.debug(f"insert request record cost {tt}s") + except Exception as e: + log.error(e) log.info(log_str) if result: self.rsp_times.append(elapsed) self.average_time = ( - elapsed + self.average_time * self._succ) / (self._succ + 1) + elapsed + self.average_time * self._succ) / (self._succ + 1) self._succ += 1 # add first success record if there is no success record before if len(self.fail_records) > 0 and self.fail_records[-1][0] == "failure" and \ @@ -70,7 +246,9 @@ def trace(fmt=DEFAULT_FMT, prefix='test', flag=True): self._fail += 1 self.fail_records.append(("failure", self._succ + self._fail, start_time, start_time_ts)) return res, result + return inner_wrapper + return decorate @@ -85,10 +263,12 @@ def exception_handler(): log_row_length = 300 e_str = str(e) log_e = e_str[0:log_row_length] + \ - '......' if len(e_str) > log_row_length else e_str + '......' if len(e_str) > log_row_length else e_str log.error(log_e) return Error(e), False + return inner_wrapper + return wrapper @@ -128,9 +308,10 @@ class Checker: if insert_data: log.info(f"collection {c_name} created, start to insert data") t0 = time.perf_counter() - self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=schema, start=0), - timeout=timeout, - enable_traceback=enable_traceback) + self.c_wrap.insert( + data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=schema, start=0), + timeout=timeout, + enable_traceback=enable_traceback) log.info(f"insert data for collection {c_name} cost {time.perf_counter() - t0}s") self.initial_entities = self.c_wrap.num_entities # do as a flush @@ -220,7 +401,7 @@ class Checker: class SearchChecker(Checker): """check search operations in a dependent thread""" - def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None,): + def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ): if collection_name is None: collection_name = cf.gen_unique_str("SearchChecker_") super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) @@ -229,7 +410,7 @@ class SearchChecker(Checker): index_name=cf.gen_unique_str('index_'), timeout=timeout, enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing) + check_task=CheckTasks.check_nothing) # do load before search self.c_wrap.load(replica_number=replica_number) @@ -268,10 +449,11 @@ class InsertFlushChecker(Checker): while True: t0 = time.time() _, insert_result = \ - self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema), - timeout=timeout, - enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing) + self.c_wrap.insert( + data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) t1 = time.time() if not self._flush: if insert_result: @@ -318,10 +500,11 @@ class FlushChecker(Checker): @exception_handler() def run_task(self): - _, result = self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema), - timeout=timeout, - enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing) + _, result = self.c_wrap.insert( + data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) res, result = self.flush() return res, result @@ -341,8 +524,8 @@ class InsertChecker(Checker): self._flush = flush self.initial_entities = self.c_wrap.num_entities self.inserted_data = [] - self.scale = 1*10**6 - self.start_time_stamp = int(time.time()*self.scale) # us + self.scale = 1 * 10 ** 6 + self.start_time_stamp = int(time.time() * self.scale) # us self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp}' @trace() @@ -351,7 +534,7 @@ class InsertChecker(Checker): ts_data = [] for i in range(constants.DELTA_PER_INS): time.sleep(0.001) - offset_ts = int(time.time()*self.scale) + offset_ts = int(time.time() * self.scale) ts_data.append(offset_ts) data[0] = ts_data # set timestamp (ms) as int64 @@ -385,7 +568,7 @@ class InsertChecker(Checker): except Exception as e: log.error(f"create index error: {e}") self.c_wrap.load() - end_time_stamp = int(time.time()*self.scale) + end_time_stamp = int(time.time() * self.scale) self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp} and ' \ f'{self.int64_field_name} <= {end_time_stamp}' data_in_client = [] @@ -561,7 +744,7 @@ class DeleteChecker(Checker): self.c_wrap.load() # load before query term_expr = f'{self.int64_field_name} > 0' res, _ = self.c_wrap.query(term_expr, output_fields=[ - self.int64_field_name]) + self.int64_field_name]) self.ids = [r[self.int64_field_name] for r in res] self.expr = None @@ -640,7 +823,7 @@ class DropChecker(Checker): res, result = self.run_task() if result: self.c_wrap.init_collection( - name=cf.gen_unique_str("CreateChecker_"), + name=cf.gen_unique_str("DropChecker_"), schema=cf.gen_default_collection_schema(), timeout=timeout, check_task=CheckTasks.check_nothing) @@ -754,4 +937,15 @@ class BulkInsertChecker(Checker): def keep_running(self): while self._keep_running: self.run_task() - sleep(constants.WAIT_PER_OP / 10) \ No newline at end of file + sleep(constants.WAIT_PER_OP / 10) + + +class TestResultAnalyzer(unittest.TestCase): + def test_get_stage_success_rate(self): + ra = ResultAnalyzer() + res = ra.get_stage_success_rate() + print(res) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/python_client/chaos/conftest.py b/tests/python_client/chaos/conftest.py index d419500764..2531c1746b 100644 --- a/tests/python_client/chaos/conftest.py +++ b/tests/python_client/chaos/conftest.py @@ -9,8 +9,9 @@ def pytest_addoption(parser): parser.addoption("--target_number", action="store", default="1", help="target_number") parser.addoption("--chaos_duration", action="store", default="1m", help="chaos_duration") parser.addoption("--chaos_interval", action="store", default="10s", help="chaos_interval") - parser.addoption("--request_duration", action="store", default="3m", help="request_duration") - parser.addoption("--is_check", action="store", type=bool, default=False, help="is_check") + parser.addoption("--request_duration", action="store", default="5m", help="request_duration") + parser.addoption("--is_check", action="store", type=bool, default=False, help="is_check") + parser.addoption("--wait_signal", action="store", type=bool, default=True, help="wait_signal") @pytest.fixture @@ -56,3 +57,8 @@ def request_duration(request): @pytest.fixture def is_check(request): return request.config.getoption("--is_check") + + +@pytest.fixture +def wait_signal(request): + return request.config.getoption("--wait_signal") diff --git a/tests/python_client/chaos/constants.py b/tests/python_client/chaos/constants.py index 951d8a3229..46509fde4e 100644 --- a/tests/python_client/chaos/constants.py +++ b/tests/python_client/chaos/constants.py @@ -22,4 +22,5 @@ RELEASE_NAME = 'test-allstandalone-pod-kill-19-25-26' WAIT_PER_OP = 10 # time to wait in seconds between operations CHAOS_DURATION = 120 # chaos duration time in seconds DEFAULT_INDEX_PARAM = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}} -DEFAULT_SEARCH_PARAM = {"metric_type": "L2", "params": {"ef": 64}} \ No newline at end of file +DEFAULT_SEARCH_PARAM = {"metric_type": "L2", "params": {"ef": 64}} +CHAOS_INFO_SAVE_PATH = "/tmp/ci_logs/chaos_info.json" diff --git a/tests/python_client/chaos/requirements.txt b/tests/python_client/chaos/requirements.txt new file mode 100644 index 0000000000..1b73d551d9 --- /dev/null +++ b/tests/python_client/chaos/requirements.txt @@ -0,0 +1,5 @@ + +# for test result anaylszer +prettytable==3.8.0 +pyarrow==11.0.0 +fastparquet==2023.7.0 \ No newline at end of file diff --git a/tests/python_client/chaos/test_chaos_apply.py b/tests/python_client/chaos/test_chaos_apply.py index 87218ab4c3..3ca587a4ed 100644 --- a/tests/python_client/chaos/test_chaos_apply.py +++ b/tests/python_client/chaos/test_chaos_apply.py @@ -3,12 +3,14 @@ import pytest import time from time import sleep from pathlib import Path +import json from pymilvus import connections from common.cus_resource_opts import CustomResourceOperations as CusResource from common.milvus_sys import MilvusSys -import logging as log +from utils.util_log import test_log as log +from datetime import datetime from utils.util_k8s import wait_pods_ready, get_milvus_instance_name, get_milvus_deploy_tool -from utils.util_common import update_key_value, update_key_name, gen_experiment_config +from utils.util_common import update_key_value, update_key_name, gen_experiment_config, wait_signal_to_apply_chaos import constants @@ -54,9 +56,17 @@ class TestChaosApply: chaos_res.delete(meta_name, raise_ex=False) sleep(2) - def test_chaos_apply(self, chaos_type, target_component, target_number, chaos_duration, chaos_interval): + def test_chaos_apply(self, chaos_type, target_component, target_number, chaos_duration, chaos_interval, wait_signal): # start the monitor threads to check the milvus ops log.info("*********************Chaos Test Start**********************") + if wait_signal: + log.info("need wait signal to start chaos") + ready_for_chaos = wait_signal_to_apply_chaos() + if not ready_for_chaos: + log.info("did not get the signal to apply chaos") + raise Exception + else: + log.info("get the signal to apply chaos") log.info(connections.get_connection_addr('default')) release_name = self.release_name chaos_config = gen_experiment_config( @@ -88,6 +98,7 @@ class TestChaosApply: version=constants.CHAOS_VERSION, namespace=constants.CHAOS_NAMESPACE) chaos_res.create(chaos_config) + create_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f') log.info("chaos injected") res = chaos_res.list_all() chaos_list = [r['metadata']['name'] for r in res['items']] @@ -97,6 +108,7 @@ class TestChaosApply: sleep(chaos_duration) # delete chaos chaos_res.delete(meta_name) + delete_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f') log.info("chaos deleted") res = chaos_res.list_all() chaos_list = [r['metadata']['name'] for r in res['items']] @@ -114,6 +126,18 @@ class TestChaosApply: log.info("all pods are ready") pods_ready_time = time.time() - t0 log.info(f"pods ready time: {pods_ready_time}") + recovery_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f') + event_records = { + "chaos_type": chaos_type, + "target_component": target_component, + "meta_name": meta_name, + "create_time": create_time, + "delete_time": delete_time, + "recovery_time": recovery_time + } + # save event records to json file + with open(constants.CHAOS_INFO_SAVE_PATH, 'w') as f: + json.dump(event_records, f) # reconnect to test the service healthy start_time = time.time() end_time = start_time + 120 @@ -125,5 +149,6 @@ class TestChaosApply: log.error(e) sleep(2) recovery_time = time.time() - start_time - log.info(f"recovery time: {recovery_time}") + log.info(f"recovery time from pod ready to can be connected: {recovery_time}") + log.info("*********************Chaos Test Completed**********************") diff --git a/tests/python_client/chaos/testcases/test_concurrent_operation.py b/tests/python_client/chaos/testcases/test_concurrent_operation.py index a9ef5271e7..8e2892c1a0 100644 --- a/tests/python_client/chaos/testcases/test_concurrent_operation.py +++ b/tests/python_client/chaos/testcases/test_concurrent_operation.py @@ -1,4 +1,4 @@ -import threading +import time import pytest import json from time import sleep @@ -9,40 +9,20 @@ from chaos.checker import (InsertChecker, SearchChecker, QueryChecker, DeleteChecker, - Op) -from common.cus_resource_opts import CustomResourceOperations as CusResource + Op, + ResultAnalyzer + ) +from utils.util_k8s import wait_pods_ready, get_milvus_instance_name from utils.util_log import test_log as log from chaos import chaos_commons as cc from common import common_func as cf +from common.milvus_sys import MilvusSys from chaos.chaos_commons import assert_statistic from common.common_type import CaseLabel from chaos import constants from delayed_assert import expect, assert_expectations -def assert_statistic(checkers, expectations={}): - for k in checkers.keys(): - # expect succ if no expectations - succ_rate = checkers[k].succ_rate() - total = checkers[k].total() - average_time = checkers[k].average_time - if 'compact' in str(k): - log.info("skip compact check") - log.info( - f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") - continue - if expectations.get(k, '') == constants.FAIL: - log.info( - f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") - expect(succ_rate < 0.49 or total < 2, - f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") - else: - log.info( - f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") - expect(succ_rate > 0.90 and total > 2, - f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}") - - def get_all_collections(): try: with open("/tmp/ci_logs/all_collections.json", "r") as f: @@ -70,7 +50,7 @@ class TestBase: class TestOperations(TestBase): @pytest.fixture(scope="function", autouse=True) - def connection(self, host, port, user, password): + def connection(self, host, port, user, password, milvus_ns): if user and password: # log.info(f"connect to {host}:{port} with user {user} and password {password}") connections.connect('default', host=host, port=port, user=user, password=password, secure=True) @@ -82,7 +62,10 @@ class TestOperations(TestBase): self.host = host self.port = port self.user = user - self.password = password + self.password = password + self.milvus_sys = MilvusSys(alias='default') + self.milvus_ns = milvus_ns + self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys) def init_health_checkers(self, collection_name=None): c_name = collection_name @@ -91,7 +74,7 @@ class TestOperations(TestBase): Op.flush: FlushChecker(collection_name=c_name), Op.search: SearchChecker(collection_name=c_name), Op.query: QueryChecker(collection_name=c_name), - Op.compact:CompactChecker(collection_name=c_name), + Op.compact: CompactChecker(collection_name=c_name), Op.delete: DeleteChecker(collection_name=c_name), } self.health_checkers = checkers @@ -107,11 +90,14 @@ class TestOperations(TestBase): # start the monitor threads to check the milvus ops log.info("*********************Test Start**********************") log.info(connections.get_connection_addr('default')) + # event_records = EventRecords() c_name = collection_name if collection_name else cf.gen_unique_str("Checker_") + # event_records.insert("init_health_checkers", "start") self.init_health_checkers(collection_name=c_name) + # event_records.insert("init_health_checkers", "finished") cc.start_monitor_threads(self.health_checkers) log.info("*********************Load Start**********************") - request_duration = request_duration.replace("h","*3600+").replace("m","*60+").replace("s","") + request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "") if request_duration[-1] == "+": request_duration = request_duration[:-1] request_duration = eval(request_duration) @@ -120,7 +106,11 @@ class TestOperations(TestBase): for k, v in self.health_checkers.items(): v.check_result() # log.info(v.check_result()) + wait_pods_ready(self.milvus_ns, f"app.kubernetes.io/instance={self.release_name}") + time.sleep(60) + ra = ResultAnalyzer() + ra.get_stage_success_rate() if is_check: assert_statistic(self.health_checkers) - assert_expectations() + assert_expectations() log.info("*********************Chaos Test Completed**********************") diff --git a/tests/python_client/chaos/testcases/test_single_request_operation.py b/tests/python_client/chaos/testcases/test_single_request_operation.py index 26a7ce215a..70f61a043c 100644 --- a/tests/python_client/chaos/testcases/test_single_request_operation.py +++ b/tests/python_client/chaos/testcases/test_single_request_operation.py @@ -11,11 +11,15 @@ from chaos.checker import (CreateChecker, IndexChecker, DeleteChecker, DropChecker, - Op) + Op, + EventRecords, + ResultAnalyzer + ) from utils.util_log import test_log as log from utils.util_k8s import wait_pods_ready, get_milvus_instance_name from chaos import chaos_commons as cc from common.common_type import CaseLabel +from common.milvus_sys import MilvusSys from chaos.chaos_commons import assert_statistic from chaos import constants from delayed_assert import assert_expectations @@ -50,6 +54,7 @@ class TestOperations(TestBase): self.port = port self.user = user self.password = password + self.milvus_sys = MilvusSys(alias='default') self.milvus_ns = milvus_ns self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys) @@ -72,8 +77,11 @@ class TestOperations(TestBase): # start the monitor threads to check the milvus ops log.info("*********************Test Start**********************") log.info(connections.get_connection_addr('default')) + event_records = EventRecords() c_name = None + event_records.insert("init_health_checkers", "start") self.init_health_checkers(collection_name=c_name) + event_records.insert("init_health_checkers", "finished") cc.start_monitor_threads(self.health_checkers) log.info("*********************Load Start**********************") # wait request_duration @@ -83,6 +91,9 @@ class TestOperations(TestBase): request_duration = eval(request_duration) for i in range(10): sleep(request_duration // 10) + # add an event so that the chaos can start to apply + if i == 3: + event_records.insert("init_chaos", "ready") for k, v in self.health_checkers.items(): v.check_result() if is_check: @@ -91,4 +102,9 @@ class TestOperations(TestBase): # wait all pod ready wait_pods_ready(self.milvus_ns, f"app.kubernetes.io/instance={self.release_name}") time.sleep(60) + for k, v in self.health_checkers.items(): + v.pause() + ra = ResultAnalyzer() + ra.get_stage_success_rate() + ra.show_result_table() log.info("*********************Chaos Test Completed**********************") diff --git a/tests/python_client/deploy/requirements.txt b/tests/python_client/deploy/requirements.txt index d4e76e10f6..554949809e 100644 --- a/tests/python_client/deploy/requirements.txt +++ b/tests/python_client/deploy/requirements.txt @@ -1,5 +1,10 @@ --extra-index-url https://test.pypi.org/simple/ docker==5.0.0 -grpcio==1.37.1 +grpcio==1.53.0 grpcio-tools==1.37.1 -pymilvus==2.0.0rc8 \ No newline at end of file +pymilvus==2.0.0rc8 + +# for test result anaylszer +prettytable==3.8.0 +pyarrow==11.0.0 +fastparquet==2023.7.0 \ No newline at end of file diff --git a/tests/python_client/requirements.txt b/tests/python_client/requirements.txt index 3e08f967ac..598d9c1aa0 100644 --- a/tests/python_client/requirements.txt +++ b/tests/python_client/requirements.txt @@ -39,9 +39,15 @@ Faker==19.2.0 h5py==3.8.0 # for log -loguru==0.6.0 +loguru==0.7.0 # util psutil==5.9.4 +pandas==1.5.3 # for standby test etcd-sdk-python==0.0.2 + +# for test result anaylszer +prettytable==3.8.0 +pyarrow==11.0.0 +fastparquet==2023.7.0 \ No newline at end of file diff --git a/tests/python_client/utils/util_common.py b/tests/python_client/utils/util_common.py index 050db33e3f..08d79edd1c 100644 --- a/tests/python_client/utils/util_common.py +++ b/tests/python_client/utils/util_common.py @@ -1,5 +1,8 @@ +import glob +import time from yaml import full_load import json +import pandas as pd from utils.util_log import test_log as log def gen_experiment_config(yaml): @@ -62,6 +65,7 @@ def get_collections(file_name="all_collections.json"): return [] return collections + def get_deploy_test_collections(): try: with open("/tmp/ci_logs/deploy_test_all_collections.json", "r") as f: @@ -72,6 +76,7 @@ def get_deploy_test_collections(): return [] return collections + def get_chaos_test_collections(): try: with open("/tmp/ci_logs/chaos_test_all_collections.json", "r") as f: @@ -83,6 +88,23 @@ def get_chaos_test_collections(): return collections +def wait_signal_to_apply_chaos(): + all_db_file = glob.glob("/tmp/ci_logs/event_records*.parquet") + log.info(f"all files {all_db_file}") + ready_apply_chaos = True + timeout = 10*60 + t0 = time.time() + for f in all_db_file: + while True and (time.time() - t0 < timeout): + df = pd.read_parquet(f) + result = df[(df['event_name'] == 'init_chaos') & (df['event_status'] == 'ready')] + if len(result) > 0: + log.info(f"{f}: {result}") + ready_apply_chaos = True + break + else: + ready_apply_chaos = False + return ready_apply_chaos if __name__ == "__main__": diff --git a/tests/restful_client/requirements.txt b/tests/restful_client/requirements.txt index f243fe3ae9..d653cc1983 100644 --- a/tests/restful_client/requirements.txt +++ b/tests/restful_client/requirements.txt @@ -1,4 +1,4 @@ -requests~=2.26.0 +requests==2.31.0 urllib3==1.26.16 loguru~=0.5.3 pytest~=7.2.0