[test]Add method to analyze chaos test result (#26724)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/26796/head
zhuwenxing 2023-09-01 10:31:01 +08:00 committed by GitHub
parent 09218bfd3d
commit b3de99e336
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 340 additions and 70 deletions

View File

@ -2,12 +2,12 @@
# --extra-index-url https://test.pypi.org/simple/
# pymilvus==2.0.0rc3.dev8
grpcio==1.37.1
grpcio==1.53.0
grpcio-testing==1.37.1
grpcio-tools==1.37.1
pandas==1.1.5
scipy==1.3.1
scipy==1.10.0
scikit-learn==0.19.1
h5py==2.7.1
# influxdb==5.2.2

View File

@ -1,8 +1,15 @@
import pytest
import unittest
from enum import Enum
from random import randint
import time
import threading
import os
import uuid
import json
import pandas as pd
from datetime import datetime
from prettytable import PrettyTable
import functools
from time import sleep
from base.collection_wrapper import ApiCollectionWrapper
@ -16,6 +23,167 @@ from common.common_type import CheckTasks
from utils.util_log import test_log as log
from utils.api_request import Error
lock = threading.Lock()
def get_chaos_info():
try:
with open(constants.CHAOS_INFO_SAVE_PATH, 'r') as f:
chaos_info = json.load(f)
except Exception as e:
log.error(f"get_chaos_info error: {e}")
return None
return chaos_info
class Singleton(type):
instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls.instances:
cls.instances[cls] = super().__call__(*args, **kwargs)
return cls.instances[cls]
class EventRecords(metaclass=Singleton):
def __init__(self):
self.file_name = f"/tmp/ci_logs/event_records_{uuid.uuid4()}.parquet"
self.created_file = False
def insert(self, event_name, event_status, ts=None):
insert_ts = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f') if ts is None else ts
data = {
"event_name": [event_name],
"event_status": [event_status],
"event_ts": [insert_ts]
}
df = pd.DataFrame(data)
if not self.created_file:
with lock:
df.to_parquet(self.file_name, engine='fastparquet')
self.created_file = True
else:
with lock:
df.to_parquet(self.file_name, engine='fastparquet', append=True)
def get_records_df(self):
df = pd.read_parquet(self.file_name)
return df
class RequestRecords(metaclass=Singleton):
def __init__(self):
self.file_name = f"/tmp/ci_logs/request_records_{uuid.uuid4()}.parquet"
self.buffer = []
self.created_file = False
def insert(self, operation_name, collection_name, start_time, time_cost, result):
data = {
"operation_name": operation_name,
"collection_name": collection_name,
"start_time": start_time,
"time_cost": time_cost,
"result": result
}
self.buffer.append(data)
if len(self.buffer) > 100:
df = pd.DataFrame(self.buffer)
if not self.created_file:
with lock:
df.to_parquet(self.file_name, engine='fastparquet')
self.created_file = True
else:
with lock:
df.to_parquet(self.file_name, engine='fastparquet', append=True)
self.buffer = []
def sink(self):
df = pd.DataFrame(self.buffer)
if not self.created_file:
with lock:
df.to_parquet(self.file_name, engine='fastparquet')
self.created_file = True
else:
with lock:
df.to_parquet(self.file_name, engine='fastparquet', append=True)
def get_records_df(self):
self.sink()
df = pd.read_parquet(self.file_name)
return df
class ResultAnalyzer:
def __init__(self):
rr = RequestRecords()
df = rr.get_records_df()
df["start_time"] = pd.to_datetime(df["start_time"])
df = df.sort_values(by='start_time')
self.df = df
self.chaos_info = get_chaos_info()
def get_stage_success_rate(self):
df = self.df
window = pd.offsets.Milli(1000)
result = df.groupby([pd.Grouper(key='start_time', freq=window), 'operation_name']).apply(lambda x: pd.Series({
'success_count': x[x['result'] == 'True'].shape[0],
'failed_count': x[x['result'] == 'False'].shape[0]
}))
data = result.reset_index()
data['success_rate'] = data['success_count'] / (data['success_count'] + data['failed_count']).replace(0, 1)
grouped_data = data.groupby('operation_name')
if self.chaos_info is None:
chaos_start_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f')
chaos_end_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f')
recovery_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f')
else:
chaos_start_time = self.chaos_info['create_time']
chaos_end_time = self.chaos_info['delete_time']
recovery_time = self.chaos_info['recovery_time']
stage_success_rate = {}
for name, group in grouped_data:
log.info(f"operation_name: {name}")
# spilt data to 3 parts by chaos start time and chaos end time and aggregate the success rate
data_before_chaos = group[group['start_time'] < chaos_start_time].agg(
{'success_rate': 'mean', 'failed_count': 'sum', 'success_count': 'sum'})
data_during_chaos = group[
(group['start_time'] >= chaos_start_time) & (group['start_time'] <= recovery_time)].agg(
{'success_rate': 'mean', 'failed_count': 'sum', 'success_count': 'sum'})
data_after_chaos = group[group['start_time'] > recovery_time].agg(
{'success_rate': 'mean', 'failed_count': 'sum', 'success_count': 'sum'})
stage_success_rate[name] = {
'before_chaos': f"{data_before_chaos['success_rate']}({data_before_chaos['success_count']}/{data_before_chaos['success_count'] + data_before_chaos['failed_count']})" if not data_before_chaos.empty else "no data",
'during_chaos': f"{data_during_chaos['success_rate']}({data_during_chaos['success_count']}/{data_during_chaos['success_count'] + data_during_chaos['failed_count']})" if not data_during_chaos.empty else "no data",
'after_chaos': f"{data_after_chaos['success_rate']}({data_after_chaos['success_count']}/{data_after_chaos['success_count'] + data_after_chaos['failed_count']})" if not data_after_chaos.empty else "no data",
}
log.info(f"stage_success_rate: {stage_success_rate}")
return stage_success_rate
def get_realtime_success_rate(self, interval=10):
df = self.df
window = pd.offsets.Second(interval)
result = df.groupby([pd.Grouper(key='start_time', freq=window), 'operation_name']).apply(lambda x: pd.Series({
'success_count': x[x['result'] == 'True'].shape[0],
'failed_count': x[x['result'] == 'False'].shape[0]
}))
data = result.reset_index()
data['success_rate'] = data['success_count'] / (data['success_count'] + data['failed_count']).replace(0, 1)
grouped_data = data.groupby('operation_name')
return grouped_data
def show_result_table(self):
table = PrettyTable()
table.field_names = ['operation_name', 'before_chaos', 'during_chaos', 'after_chaos']
data = self.get_stage_success_rate()
for operation, values in data.items():
row = [operation, values['before_chaos'], values['during_chaos'], values['after_chaos']]
table.add_row(row)
log.info(f"succ rate for operations in different stage\n{table}")
class Op(Enum):
create = 'create'
@ -39,28 +207,36 @@ query_timeout = 10
enable_traceback = False
DEFAULT_FMT = '[start time:{start_time}][time cost:{elapsed:0.8f}s][operation_name:{operation_name}][collection name:{collection_name}] -> {result!r}'
request_records = RequestRecords()
def trace(fmt=DEFAULT_FMT, prefix='test', flag=True):
def decorate(func):
@functools.wraps(func)
def inner_wrapper(self, *args, **kwargs):
start_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
start_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f')
start_time_ts = time.time()
t0 = time.perf_counter()
res, result = func(self, *args, **kwargs)
elapsed = time.perf_counter() - t0
end_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
end_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f')
operation_name = func.__name__
if flag:
collection_name = self.c_wrap.name
log_str = f"[{prefix}]" + fmt.format(**locals())
# TODO: add report function in this place, like uploading to influxdb
# it is better a async way to do this, in case of blocking the request processing
try:
t0 = time.perf_counter()
request_records.insert(operation_name, collection_name, start_time, elapsed, str(result))
tt = time.perf_counter() - t0
log.debug(f"insert request record cost {tt}s")
except Exception as e:
log.error(e)
log.info(log_str)
if result:
self.rsp_times.append(elapsed)
self.average_time = (
elapsed + self.average_time * self._succ) / (self._succ + 1)
elapsed + self.average_time * self._succ) / (self._succ + 1)
self._succ += 1
# add first success record if there is no success record before
if len(self.fail_records) > 0 and self.fail_records[-1][0] == "failure" and \
@ -70,7 +246,9 @@ def trace(fmt=DEFAULT_FMT, prefix='test', flag=True):
self._fail += 1
self.fail_records.append(("failure", self._succ + self._fail, start_time, start_time_ts))
return res, result
return inner_wrapper
return decorate
@ -85,10 +263,12 @@ def exception_handler():
log_row_length = 300
e_str = str(e)
log_e = e_str[0:log_row_length] + \
'......' if len(e_str) > log_row_length else e_str
'......' if len(e_str) > log_row_length else e_str
log.error(log_e)
return Error(e), False
return inner_wrapper
return wrapper
@ -128,9 +308,10 @@ class Checker:
if insert_data:
log.info(f"collection {c_name} created, start to insert data")
t0 = time.perf_counter()
self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=schema, start=0),
timeout=timeout,
enable_traceback=enable_traceback)
self.c_wrap.insert(
data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=schema, start=0),
timeout=timeout,
enable_traceback=enable_traceback)
log.info(f"insert data for collection {c_name} cost {time.perf_counter() - t0}s")
self.initial_entities = self.c_wrap.num_entities # do as a flush
@ -220,7 +401,7 @@ class Checker:
class SearchChecker(Checker):
"""check search operations in a dependent thread"""
def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None,):
def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
if collection_name is None:
collection_name = cf.gen_unique_str("SearchChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
@ -229,7 +410,7 @@ class SearchChecker(Checker):
index_name=cf.gen_unique_str('index_'),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
check_task=CheckTasks.check_nothing)
# do load before search
self.c_wrap.load(replica_number=replica_number)
@ -268,10 +449,11 @@ class InsertFlushChecker(Checker):
while True:
t0 = time.time()
_, insert_result = \
self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
self.c_wrap.insert(
data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
t1 = time.time()
if not self._flush:
if insert_result:
@ -318,10 +500,11 @@ class FlushChecker(Checker):
@exception_handler()
def run_task(self):
_, result = self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
_, result = self.c_wrap.insert(
data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
res, result = self.flush()
return res, result
@ -341,8 +524,8 @@ class InsertChecker(Checker):
self._flush = flush
self.initial_entities = self.c_wrap.num_entities
self.inserted_data = []
self.scale = 1*10**6
self.start_time_stamp = int(time.time()*self.scale) # us
self.scale = 1 * 10 ** 6
self.start_time_stamp = int(time.time() * self.scale) # us
self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp}'
@trace()
@ -351,7 +534,7 @@ class InsertChecker(Checker):
ts_data = []
for i in range(constants.DELTA_PER_INS):
time.sleep(0.001)
offset_ts = int(time.time()*self.scale)
offset_ts = int(time.time() * self.scale)
ts_data.append(offset_ts)
data[0] = ts_data # set timestamp (ms) as int64
@ -385,7 +568,7 @@ class InsertChecker(Checker):
except Exception as e:
log.error(f"create index error: {e}")
self.c_wrap.load()
end_time_stamp = int(time.time()*self.scale)
end_time_stamp = int(time.time() * self.scale)
self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp} and ' \
f'{self.int64_field_name} <= {end_time_stamp}'
data_in_client = []
@ -561,7 +744,7 @@ class DeleteChecker(Checker):
self.c_wrap.load() # load before query
term_expr = f'{self.int64_field_name} > 0'
res, _ = self.c_wrap.query(term_expr, output_fields=[
self.int64_field_name])
self.int64_field_name])
self.ids = [r[self.int64_field_name] for r in res]
self.expr = None
@ -640,7 +823,7 @@ class DropChecker(Checker):
res, result = self.run_task()
if result:
self.c_wrap.init_collection(
name=cf.gen_unique_str("CreateChecker_"),
name=cf.gen_unique_str("DropChecker_"),
schema=cf.gen_default_collection_schema(),
timeout=timeout,
check_task=CheckTasks.check_nothing)
@ -754,4 +937,15 @@ class BulkInsertChecker(Checker):
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
sleep(constants.WAIT_PER_OP / 10)
class TestResultAnalyzer(unittest.TestCase):
def test_get_stage_success_rate(self):
ra = ResultAnalyzer()
res = ra.get_stage_success_rate()
print(res)
if __name__ == '__main__':
unittest.main()

View File

@ -9,8 +9,9 @@ def pytest_addoption(parser):
parser.addoption("--target_number", action="store", default="1", help="target_number")
parser.addoption("--chaos_duration", action="store", default="1m", help="chaos_duration")
parser.addoption("--chaos_interval", action="store", default="10s", help="chaos_interval")
parser.addoption("--request_duration", action="store", default="3m", help="request_duration")
parser.addoption("--is_check", action="store", type=bool, default=False, help="is_check")
parser.addoption("--request_duration", action="store", default="5m", help="request_duration")
parser.addoption("--is_check", action="store", type=bool, default=False, help="is_check")
parser.addoption("--wait_signal", action="store", type=bool, default=True, help="wait_signal")
@pytest.fixture
@ -56,3 +57,8 @@ def request_duration(request):
@pytest.fixture
def is_check(request):
return request.config.getoption("--is_check")
@pytest.fixture
def wait_signal(request):
return request.config.getoption("--wait_signal")

View File

@ -22,4 +22,5 @@ RELEASE_NAME = 'test-allstandalone-pod-kill-19-25-26'
WAIT_PER_OP = 10 # time to wait in seconds between operations
CHAOS_DURATION = 120 # chaos duration time in seconds
DEFAULT_INDEX_PARAM = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}
DEFAULT_SEARCH_PARAM = {"metric_type": "L2", "params": {"ef": 64}}
DEFAULT_SEARCH_PARAM = {"metric_type": "L2", "params": {"ef": 64}}
CHAOS_INFO_SAVE_PATH = "/tmp/ci_logs/chaos_info.json"

View File

@ -0,0 +1,5 @@
# for test result anaylszer
prettytable==3.8.0
pyarrow==11.0.0
fastparquet==2023.7.0

View File

@ -3,12 +3,14 @@ import pytest
import time
from time import sleep
from pathlib import Path
import json
from pymilvus import connections
from common.cus_resource_opts import CustomResourceOperations as CusResource
from common.milvus_sys import MilvusSys
import logging as log
from utils.util_log import test_log as log
from datetime import datetime
from utils.util_k8s import wait_pods_ready, get_milvus_instance_name, get_milvus_deploy_tool
from utils.util_common import update_key_value, update_key_name, gen_experiment_config
from utils.util_common import update_key_value, update_key_name, gen_experiment_config, wait_signal_to_apply_chaos
import constants
@ -54,9 +56,17 @@ class TestChaosApply:
chaos_res.delete(meta_name, raise_ex=False)
sleep(2)
def test_chaos_apply(self, chaos_type, target_component, target_number, chaos_duration, chaos_interval):
def test_chaos_apply(self, chaos_type, target_component, target_number, chaos_duration, chaos_interval, wait_signal):
# start the monitor threads to check the milvus ops
log.info("*********************Chaos Test Start**********************")
if wait_signal:
log.info("need wait signal to start chaos")
ready_for_chaos = wait_signal_to_apply_chaos()
if not ready_for_chaos:
log.info("did not get the signal to apply chaos")
raise Exception
else:
log.info("get the signal to apply chaos")
log.info(connections.get_connection_addr('default'))
release_name = self.release_name
chaos_config = gen_experiment_config(
@ -88,6 +98,7 @@ class TestChaosApply:
version=constants.CHAOS_VERSION,
namespace=constants.CHAOS_NAMESPACE)
chaos_res.create(chaos_config)
create_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f')
log.info("chaos injected")
res = chaos_res.list_all()
chaos_list = [r['metadata']['name'] for r in res['items']]
@ -97,6 +108,7 @@ class TestChaosApply:
sleep(chaos_duration)
# delete chaos
chaos_res.delete(meta_name)
delete_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f')
log.info("chaos deleted")
res = chaos_res.list_all()
chaos_list = [r['metadata']['name'] for r in res['items']]
@ -114,6 +126,18 @@ class TestChaosApply:
log.info("all pods are ready")
pods_ready_time = time.time() - t0
log.info(f"pods ready time: {pods_ready_time}")
recovery_time = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S.%f')
event_records = {
"chaos_type": chaos_type,
"target_component": target_component,
"meta_name": meta_name,
"create_time": create_time,
"delete_time": delete_time,
"recovery_time": recovery_time
}
# save event records to json file
with open(constants.CHAOS_INFO_SAVE_PATH, 'w') as f:
json.dump(event_records, f)
# reconnect to test the service healthy
start_time = time.time()
end_time = start_time + 120
@ -125,5 +149,6 @@ class TestChaosApply:
log.error(e)
sleep(2)
recovery_time = time.time() - start_time
log.info(f"recovery time: {recovery_time}")
log.info(f"recovery time from pod ready to can be connected: {recovery_time}")
log.info("*********************Chaos Test Completed**********************")

View File

@ -1,4 +1,4 @@
import threading
import time
import pytest
import json
from time import sleep
@ -9,40 +9,20 @@ from chaos.checker import (InsertChecker,
SearchChecker,
QueryChecker,
DeleteChecker,
Op)
from common.cus_resource_opts import CustomResourceOperations as CusResource
Op,
ResultAnalyzer
)
from utils.util_k8s import wait_pods_ready, get_milvus_instance_name
from utils.util_log import test_log as log
from chaos import chaos_commons as cc
from common import common_func as cf
from common.milvus_sys import MilvusSys
from chaos.chaos_commons import assert_statistic
from common.common_type import CaseLabel
from chaos import constants
from delayed_assert import expect, assert_expectations
def assert_statistic(checkers, expectations={}):
for k in checkers.keys():
# expect succ if no expectations
succ_rate = checkers[k].succ_rate()
total = checkers[k].total()
average_time = checkers[k].average_time
if 'compact' in str(k):
log.info("skip compact check")
log.info(
f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
continue
if expectations.get(k, '') == constants.FAIL:
log.info(
f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
expect(succ_rate < 0.49 or total < 2,
f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
else:
log.info(
f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
expect(succ_rate > 0.90 and total > 2,
f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
def get_all_collections():
try:
with open("/tmp/ci_logs/all_collections.json", "r") as f:
@ -70,7 +50,7 @@ class TestBase:
class TestOperations(TestBase):
@pytest.fixture(scope="function", autouse=True)
def connection(self, host, port, user, password):
def connection(self, host, port, user, password, milvus_ns):
if user and password:
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
connections.connect('default', host=host, port=port, user=user, password=password, secure=True)
@ -82,7 +62,10 @@ class TestOperations(TestBase):
self.host = host
self.port = port
self.user = user
self.password = password
self.password = password
self.milvus_sys = MilvusSys(alias='default')
self.milvus_ns = milvus_ns
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
def init_health_checkers(self, collection_name=None):
c_name = collection_name
@ -91,7 +74,7 @@ class TestOperations(TestBase):
Op.flush: FlushChecker(collection_name=c_name),
Op.search: SearchChecker(collection_name=c_name),
Op.query: QueryChecker(collection_name=c_name),
Op.compact:CompactChecker(collection_name=c_name),
Op.compact: CompactChecker(collection_name=c_name),
Op.delete: DeleteChecker(collection_name=c_name),
}
self.health_checkers = checkers
@ -107,11 +90,14 @@ class TestOperations(TestBase):
# start the monitor threads to check the milvus ops
log.info("*********************Test Start**********************")
log.info(connections.get_connection_addr('default'))
# event_records = EventRecords()
c_name = collection_name if collection_name else cf.gen_unique_str("Checker_")
# event_records.insert("init_health_checkers", "start")
self.init_health_checkers(collection_name=c_name)
# event_records.insert("init_health_checkers", "finished")
cc.start_monitor_threads(self.health_checkers)
log.info("*********************Load Start**********************")
request_duration = request_duration.replace("h","*3600+").replace("m","*60+").replace("s","")
request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "")
if request_duration[-1] == "+":
request_duration = request_duration[:-1]
request_duration = eval(request_duration)
@ -120,7 +106,11 @@ class TestOperations(TestBase):
for k, v in self.health_checkers.items():
v.check_result()
# log.info(v.check_result())
wait_pods_ready(self.milvus_ns, f"app.kubernetes.io/instance={self.release_name}")
time.sleep(60)
ra = ResultAnalyzer()
ra.get_stage_success_rate()
if is_check:
assert_statistic(self.health_checkers)
assert_expectations()
assert_expectations()
log.info("*********************Chaos Test Completed**********************")

View File

@ -11,11 +11,15 @@ from chaos.checker import (CreateChecker,
IndexChecker,
DeleteChecker,
DropChecker,
Op)
Op,
EventRecords,
ResultAnalyzer
)
from utils.util_log import test_log as log
from utils.util_k8s import wait_pods_ready, get_milvus_instance_name
from chaos import chaos_commons as cc
from common.common_type import CaseLabel
from common.milvus_sys import MilvusSys
from chaos.chaos_commons import assert_statistic
from chaos import constants
from delayed_assert import assert_expectations
@ -50,6 +54,7 @@ class TestOperations(TestBase):
self.port = port
self.user = user
self.password = password
self.milvus_sys = MilvusSys(alias='default')
self.milvus_ns = milvus_ns
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
@ -72,8 +77,11 @@ class TestOperations(TestBase):
# start the monitor threads to check the milvus ops
log.info("*********************Test Start**********************")
log.info(connections.get_connection_addr('default'))
event_records = EventRecords()
c_name = None
event_records.insert("init_health_checkers", "start")
self.init_health_checkers(collection_name=c_name)
event_records.insert("init_health_checkers", "finished")
cc.start_monitor_threads(self.health_checkers)
log.info("*********************Load Start**********************")
# wait request_duration
@ -83,6 +91,9 @@ class TestOperations(TestBase):
request_duration = eval(request_duration)
for i in range(10):
sleep(request_duration // 10)
# add an event so that the chaos can start to apply
if i == 3:
event_records.insert("init_chaos", "ready")
for k, v in self.health_checkers.items():
v.check_result()
if is_check:
@ -91,4 +102,9 @@ class TestOperations(TestBase):
# wait all pod ready
wait_pods_ready(self.milvus_ns, f"app.kubernetes.io/instance={self.release_name}")
time.sleep(60)
for k, v in self.health_checkers.items():
v.pause()
ra = ResultAnalyzer()
ra.get_stage_success_rate()
ra.show_result_table()
log.info("*********************Chaos Test Completed**********************")

View File

@ -1,5 +1,10 @@
--extra-index-url https://test.pypi.org/simple/
docker==5.0.0
grpcio==1.37.1
grpcio==1.53.0
grpcio-tools==1.37.1
pymilvus==2.0.0rc8
pymilvus==2.0.0rc8
# for test result anaylszer
prettytable==3.8.0
pyarrow==11.0.0
fastparquet==2023.7.0

View File

@ -39,9 +39,15 @@ Faker==19.2.0
h5py==3.8.0
# for log
loguru==0.6.0
loguru==0.7.0
# util
psutil==5.9.4
pandas==1.5.3
# for standby test
etcd-sdk-python==0.0.2
# for test result anaylszer
prettytable==3.8.0
pyarrow==11.0.0
fastparquet==2023.7.0

View File

@ -1,5 +1,8 @@
import glob
import time
from yaml import full_load
import json
import pandas as pd
from utils.util_log import test_log as log
def gen_experiment_config(yaml):
@ -62,6 +65,7 @@ def get_collections(file_name="all_collections.json"):
return []
return collections
def get_deploy_test_collections():
try:
with open("/tmp/ci_logs/deploy_test_all_collections.json", "r") as f:
@ -72,6 +76,7 @@ def get_deploy_test_collections():
return []
return collections
def get_chaos_test_collections():
try:
with open("/tmp/ci_logs/chaos_test_all_collections.json", "r") as f:
@ -83,6 +88,23 @@ def get_chaos_test_collections():
return collections
def wait_signal_to_apply_chaos():
all_db_file = glob.glob("/tmp/ci_logs/event_records*.parquet")
log.info(f"all files {all_db_file}")
ready_apply_chaos = True
timeout = 10*60
t0 = time.time()
for f in all_db_file:
while True and (time.time() - t0 < timeout):
df = pd.read_parquet(f)
result = df[(df['event_name'] == 'init_chaos') & (df['event_status'] == 'ready')]
if len(result) > 0:
log.info(f"{f}: {result}")
ready_apply_chaos = True
break
else:
ready_apply_chaos = False
return ready_apply_chaos
if __name__ == "__main__":

View File

@ -1,4 +1,4 @@
requests~=2.26.0
requests==2.31.0
urllib3==1.26.16
loguru~=0.5.3
pytest~=7.2.0