[skip e2e]Add verification checker (#17786)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/17800/head
zhuwenxing 2022-06-25 15:38:15 +08:00 committed by GitHub
parent f748d6de26
commit fcfca9a712
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 366 additions and 111 deletions

View File

@ -4,7 +4,7 @@ import glob
from chaos import constants
from yaml import full_load
from utils.util_log import test_log as log
from delayed_assert import expect
def check_config(chaos_config):
if not chaos_config.get('kind', None):
@ -72,3 +72,19 @@ def reconnect(connections, alias='default'):
res = connections.get_connection_addr(alias)
connections.remove_connection(alias)
return connections.connect(alias, host=res["host"], port=res["port"])
def assert_statistic(checkers, expectations={}):
for k in checkers.keys():
# expect succ if no expectations
succ_rate = checkers[k].succ_rate()
total = checkers[k].total()
average_time = checkers[k].average_time
if expectations.get(k, '') == constants.FAIL:
log.info(f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
expect(succ_rate < 0.49 or total < 2,
f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
else:
log.info(f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
expect(succ_rate > 0.90 and total > 2,
f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")

View File

@ -15,15 +15,19 @@ from utils.util_log import test_log as log
class Op(Enum):
create = 'create'
insert = 'insert'
flush = 'flush'
index = 'index'
search = 'search'
query = 'query'
bulk_load = 'bulk_load'
create = "create"
insert = "insert"
flush = "flush"
index = "index"
search = "search"
query = "query"
delete = "delete"
compact = "compact"
drop = "drop"
load_balance = "load_balance"
bulk_load = "bulk_load"
unknown = 'unknown'
unknown = "unknown"
timeout = 20
@ -43,16 +47,21 @@ class Checker:
self.rsp_times = []
self.average_time = 0
self.c_wrap = ApiCollectionWrapper()
c_name = collection_name if collection_name is not None else cf.gen_unique_str('Checker_')
self.c_wrap.init_collection(name=c_name,
schema=cf.gen_default_collection_schema(),
shards_num=shards_num,
timeout=timeout,
enable_traceback=enable_traceback)
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH),
timeout=timeout,
enable_traceback=enable_traceback)
c_name = collection_name if collection_name is not None else cf.gen_unique_str("Checker_")
self.c_wrap.init_collection(
name=c_name,
schema=cf.gen_default_collection_schema(),
shards_num=shards_num,
timeout=timeout,
enable_traceback=enable_traceback,
)
self.c_wrap.insert(
data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH),
timeout=timeout,
enable_traceback=enable_traceback,
)
self.initial_entities = self.c_wrap.num_entities # do as a flush
self.c_wrap.release()
def total(self):
return self._succ + self._fail
@ -81,6 +90,8 @@ class SearchChecker(Checker):
"""check search operations in a dependent thread"""
def __init__(self, collection_name=None, shards_num=2, replica_number=1):
if collection_name is None:
collection_name = cf.gen_unique_str("SearchChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num)
self.c_wrap.load(replica_number=replica_number) # do load before search
@ -92,16 +103,21 @@ class SearchChecker(Checker):
data=search_vec,
anns_field=ct.default_float_vec_field_name,
param={"nprobe": 32},
limit=1, timeout=timeout,
limit=1,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing
check_task=CheckTasks.check_nothing,
)
t1 = time.time()
if result:
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(f"search success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}")
log.debug(
f"search success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
else:
self._fail += 1
sleep(constants.WAIT_PER_OP / 10)
@ -111,6 +127,11 @@ class InsertFlushChecker(Checker):
"""check Insert and flush operations in a dependent thread"""
def __init__(self, collection_name=None, flush=False, shards_num=2):
if collection_name is None:
if flush:
collection_name = cf.gen_unique_str("FlushChecker_")
else:
collection_name = cf.gen_unique_str("InsertChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num)
self._flush = flush
self.initial_entities = self.c_wrap.num_entities
@ -118,18 +139,23 @@ class InsertFlushChecker(Checker):
def keep_running(self):
while True:
t0 = time.time()
_, insert_result = \
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
_, insert_result = self.c_wrap.insert(
data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing,
)
t1 = time.time()
if not self._flush:
if insert_result:
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(f"insert success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}")
log.debug(
f"insert success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
else:
self._fail += 1
sleep(constants.WAIT_PER_OP / 10)
@ -140,9 +166,13 @@ class InsertFlushChecker(Checker):
t1 = time.time()
if num_entities == (self.initial_entities + constants.DELTA_PER_INS):
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(f"flush success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}")
log.debug(
f"flush success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
self.initial_entities += constants.DELTA_PER_INS
else:
self._fail += 1
@ -151,8 +181,10 @@ class InsertFlushChecker(Checker):
class CreateChecker(Checker):
"""check create operations in a dependent thread"""
def __init__(self):
super().__init__()
def __init__(self, collection_name=None):
if collection_name is None:
collection_name = cf.gen_unique_str("CreateChecker_")
super().__init__(collection_name=collection_name)
def keep_running(self):
while True:
@ -162,13 +194,18 @@ class CreateChecker(Checker):
schema=cf.gen_default_collection_schema(),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
check_task=CheckTasks.check_nothing,
)
t1 = time.time()
if result:
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(f"create success, time: {t1 - t0:.4f}, average_time: {self.average_time:4f}")
log.debug(
f"create success, time: {t1 - t0:.4f}, average_time: {self.average_time:4f}"
)
self.c_wrap.drop(timeout=timeout)
else:
@ -179,27 +216,40 @@ class CreateChecker(Checker):
class IndexChecker(Checker):
"""check Insert operations in a dependent thread"""
def __init__(self):
super().__init__()
self.c_wrap.insert(data=cf.gen_default_list_data(nb=5 * constants.ENTITIES_FOR_SEARCH),
timeout=timeout, enable_traceback=enable_traceback)
log.debug(f"Index ready entities: {self.c_wrap.num_entities}") # do as a flush before indexing
def __init__(self, collection_name=None):
if collection_name is None:
collection_name = cf.gen_unique_str("IndexChecker_")
super().__init__(collection_name=collection_name)
self.c_wrap.insert(
data=cf.gen_default_list_data(nb=5 * constants.ENTITIES_FOR_SEARCH),
timeout=timeout,
enable_traceback=enable_traceback,
)
log.debug(
f"Index ready entities: {self.c_wrap.num_entities}"
) # do as a flush before indexing
def keep_running(self):
while True:
t0 = time.time()
_, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
constants.DEFAULT_INDEX_PARAM,
name=cf.gen_unique_str('index_'),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
_, result = self.c_wrap.create_index(
ct.default_float_vec_field_name,
constants.DEFAULT_INDEX_PARAM,
name=cf.gen_unique_str("index_"),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing,
)
t1 = time.time()
if result:
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(f"index success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}")
log.debug(
f"index success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
self.c_wrap.drop_index(timeout=timeout)
else:
self._fail += 1
@ -209,6 +259,8 @@ class QueryChecker(Checker):
"""check query operations in a dependent thread"""
def __init__(self, collection_name=None, shards_num=2, replica_number=1):
if collection_name is None:
collection_name = cf.gen_unique_str("QueryChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num)
self.c_wrap.load(replica_number=replica_number) # do load before query
@ -217,17 +269,24 @@ class QueryChecker(Checker):
int_values = []
for _ in range(5):
int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH))
term_expr = f'{ct.default_int64_field_name} in {int_values}'
term_expr = f"{ct.default_int64_field_name} in {int_values}"
t0 = time.time()
_, result = self.c_wrap.query(term_expr, timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
_, result = self.c_wrap.query(
term_expr,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing,
)
t1 = time.time()
if result:
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(f"query success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}")
log.debug(
f"query success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
else:
self._fail += 1
sleep(constants.WAIT_PER_OP / 10)
@ -237,24 +296,32 @@ class DeleteChecker(Checker):
"""check delete operations in a dependent thread"""
def __init__(self, collection_name=None):
if collection_name is None:
collection_name = cf.gen_unique_str("DeleteChecker_")
super().__init__(collection_name=collection_name)
self.c_wrap.load() # load before query
def keep_running(self):
while True:
term_expr = f'{ct.default_int64_field_name} > 0'
res, _ = self.c_wrap.query(term_expr, output_fields=[ct.default_int64_field_name])
term_expr = f"{ct.default_int64_field_name} > 0"
res, _ = self.c_wrap.query(
term_expr, output_fields=[ct.default_int64_field_name]
)
ids = [r[ct.default_int64_field_name] for r in res]
delete_ids = random.sample(ids, 2)
expr = f'{ct.default_int64_field_name} in {delete_ids}'
expr = f"{ct.default_int64_field_name} in {delete_ids}"
t0 = time.time()
_, result = self.c_wrap.delete(expr=expr, timeout=timeout)
tt = time.time() - t0
if result:
self.rsp_times.append(tt)
self.average_time = (tt + self.average_time * self._succ) / (self._succ + 1)
self.average_time = (tt + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(f"delete success, time: {tt:.4f}, average_time: {self.average_time:.4f}")
log.debug(
f"delete success, time: {tt:.4f}, average_time: {self.average_time:.4f}"
)
else:
self._fail += 1
sleep(constants.WAIT_PER_OP / 10)
@ -264,6 +331,8 @@ class CompactChecker(Checker):
"""check compact operations in a dependent thread"""
def __init__(self, collection_name=None):
if collection_name is None:
collection_name = cf.gen_unique_str("CompactChecker_")
super().__init__(collection_name=collection_name)
self.ut = ApiUtilityWrapper()
self.c_wrap.load(enable_traceback=enable_traceback) # load before compact
@ -279,9 +348,13 @@ class CompactChecker(Checker):
t1 = time.time()
if result:
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(f"compact success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}")
log.debug(
f"compact success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
else:
self._fail += 1
sleep(constants.WAIT_PER_OP / 10)
@ -291,6 +364,8 @@ class DropChecker(Checker):
"""check drop operations in a dependent thread"""
def __init__(self, collection_name=None):
if collection_name is None:
collection_name = cf.gen_unique_str("DropChecker_")
super().__init__(collection_name=collection_name)
# self.c_wrap.load(enable_traceback=enable_traceback) # load before compact
@ -301,9 +376,13 @@ class DropChecker(Checker):
t1 = time.time()
if result:
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(f"drop success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}")
log.debug(
f"drop success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
else:
self._fail += 1
sleep(constants.WAIT_PER_OP / 10)
@ -313,6 +392,8 @@ class LoadBalanceChecker(Checker):
"""check loadbalance operations in a dependent thread"""
def __init__(self, collection_name=None):
if collection_name is None:
collection_name = cf.gen_unique_str("LoadBalanceChecker_")
super().__init__(collection_name=collection_name)
self.utility_wrap = ApiUtilityWrapper()
self.c_wrap.load(enable_traceback=enable_traceback)
@ -335,14 +416,23 @@ class LoadBalanceChecker(Checker):
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
# load balance
t0 = time.time()
_, result = self.utility_wrap.load_balance(c_name, src_node_id, dst_node_ids, sealed_segment_ids)
_, result = self.utility_wrap.load_balance(
c_name, src_node_id, dst_node_ids, sealed_segment_ids
)
t1 = time.time()
# get segments distribution after load balance
time.sleep(3)
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
sealed_segment_ids_after_load_banalce = segment_distribution[src_node_id]["sealed"]
check_1 = len(set(sealed_segment_ids) & set(sealed_segment_ids_after_load_banalce)) == 0
sealed_segment_ids_after_load_banalce = segment_distribution[src_node_id][
"sealed"
]
check_1 = (
len(
set(sealed_segment_ids) & set(sealed_segment_ids_after_load_banalce)
)
== 0
)
des_sealed_segment_ids = []
for des_node_id in dst_node_ids:
des_sealed_segment_ids += segment_distribution[des_node_id]["sealed"]
@ -351,9 +441,13 @@ class LoadBalanceChecker(Checker):
if result and (check_1 and check_2):
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.debug(f"load balance success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}")
log.debug(
f"load balance success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}"
)
else:
self._fail += 1
sleep(10)
@ -362,8 +456,10 @@ class LoadBalanceChecker(Checker):
class BulkLoadChecker(Checker):
"""check bulk load operations in a dependent thread"""
def __init__(self, flush=False):
super().__init__()
def __init__(self, collection_name=None, flush=False):
if collection_name is None:
collection_name = cf.gen_unique_str("BulkLoadChecker_")
super().__init__(collection_name=collection_name)
self.utility_wrap = ApiUtilityWrapper()
self.schema = cf.gen_default_collection_schema()
self.flush = flush
@ -395,18 +491,24 @@ class BulkLoadChecker(Checker):
log.info(f"flush before bulk load, cost time: {tt:.4f}")
# import data
t0 = time.time()
task_ids, res_1 = self.utility_wrap.bulk_load(collection_name=c_name,
row_based=self.row_based,
files=self.files)
task_ids, res_1 = self.utility_wrap.bulk_load(
collection_name=c_name, row_based=self.row_based, files=self.files
)
log.info(f"bulk load task ids:{task_ids}")
completed, res_2 = self.utility_wrap.wait_for_bulk_load_tasks_completed(task_ids=task_ids, timeout=30)
completed, res_2 = self.utility_wrap.wait_for_bulk_load_tasks_completed(
task_ids=task_ids, timeout=30
)
tt = time.time() - t0
# added_num = sum(res_2[task_id].row_count for task_id in task_ids)
if completed:
self.rsp_times.append(tt)
self.average_time = (tt + self.average_time * self._succ) / (self._succ + 1)
self.average_time = (tt + self.average_time * self._succ) / (
self._succ + 1
)
self._succ += 1
log.info(f"bulk load success for collection {c_name}, time: {tt:.4f}, average_time: {self.average_time:4f}")
log.info(
f"bulk load success for collection {c_name}, time: {tt:.4f}, average_time: {self.average_time:4f}"
)
if self.flush:
t0 = time.time()
cur_entities_num = self.c_wrap.num_entities
@ -416,7 +518,9 @@ class BulkLoadChecker(Checker):
self._fail += 1
# if the task failed, store the failed collection name for further checking after chaos
self.failed_tasks.append(c_name)
log.info(f"bulk load failed for collection {c_name} time: {tt:.4f}, average_time: {self.average_time:4f}")
log.info(
f"bulk load failed for collection {c_name} time: {tt:.4f}, average_time: {self.average_time:4f}"
)
sleep(constants.WAIT_PER_OP / 10)
@ -427,11 +531,14 @@ def assert_statistic(checkers, expectations={}):
total = checkers[k].total()
checker_result = k.check_result()
if expectations.get(k, '') == constants.FAIL:
if expectations.get(k, "") == constants.FAIL:
log.info(f"Expect Fail: {str(k)} {checker_result}")
expect(succ_rate < 0.49 or total < 2,
f"Expect Fail: {str(k)} {checker_result}")
expect(
succ_rate < 0.49 or total < 2, f"Expect Fail: {str(k)} {checker_result}"
)
else:
log.info(f"Expect Succ: {str(k)} {checker_result}")
expect(succ_rate > 0.90 or total > 2,
f"Expect Succ: {str(k)} {checker_result}")
expect(
succ_rate > 0.90 or total > 2, f"Expect Succ: {str(k)} {checker_result}"
)

View File

@ -0,0 +1,36 @@
from collections import defaultdict
import json
import argparse
from pymilvus import connections, list_collections
TIMEOUT = 120
def save_all_checker_collections(host="127.0.0.1", prefix="Checker"):
# create connection
connections.connect(host=host, port="19530")
all_collections = list_collections()
if prefix is None:
all_collections = [c_name for c_name in all_collections]
else:
all_collections = [c_name for c_name in all_collections if prefix in c_name]
m = defaultdict(list)
for c_name in all_collections:
prefix = c_name.split("_")[0]
if len(m[prefix]) <= 10:
m[prefix].append(c_name)
selected_collections = []
for v in m.values():
selected_collections.extend(v)
data = {
"all": selected_collections
}
print("selected_collections is")
print(selected_collections)
with open("/tmp/ci_logs/all_collections.json", "w") as f:
f.write(json.dumps(data))
parser = argparse.ArgumentParser(description='host ip')
parser.add_argument('--host', type=str, default='127.0.0.1', help='host ip')
args = parser.parse_args()
save_all_checker_collections(args.host)

View File

@ -9,7 +9,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
from collections import defaultdict
import random
import numpy as np
import time
@ -110,15 +110,21 @@ args = parser.parse_args()
print(f"\nStart time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}")
# create connection
connections.connect(host=args.host, port="19530")
print(f"\nList collections...")
collection_list = list_collections()
print(collection_list)
# keep 10 collections with prefix "CreateChecker_", others will be skiped
print("\nList collections...")
all_collections = list_collections()
print(all_collections)
all_collections = [c_name for c_name in all_collections if "Checker" in c_name]
m = defaultdict(list)
for c_name in all_collections:
prefix = c_name.split("_")[0]
if len(m[prefix]) <= 5:
m[prefix].append(c_name)
selected_collections = []
for v in m.values():
selected_collections.extend(v)
print("selected_collections is")
print(selected_collections)
cnt = 0
for collection_name in collection_list:
if collection_name.startswith("CreateChecker_"):
cnt += 1
if collection_name.startswith("CreateChecker_") and cnt > 10:
continue
for collection_name in selected_collections:
print(f"check collection {collection_name}")
hello_milvus(collection_name)

View File

@ -1,5 +1,4 @@
import threading
import pytest
import os
import time
@ -14,25 +13,10 @@ from utils.util_log import test_log as log
from utils.util_k8s import wait_pods_ready, get_pod_list
from utils.util_common import findkeys
from chaos import chaos_commons as cc
from chaos.chaos_commons import assert_statistic
from common.common_type import CaseLabel
from chaos import constants
from delayed_assert import expect, assert_expectations
def assert_statistic(checkers, expectations={}):
for k in checkers.keys():
# expect succ if no expectations
succ_rate = checkers[k].succ_rate()
total = checkers[k].total()
average_time = checkers[k].average_time
if expectations.get(k, '') == constants.FAIL:
log.info(f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
expect(succ_rate < 0.49 or total < 2,
f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
else:
log.info(f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
expect(succ_rate > 0.90 or total > 2,
f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
from delayed_assert import assert_expectations
def check_cluster_nodes(chaos_config):

View File

@ -0,0 +1,106 @@
import threading
import pytest
import json
from time import sleep
from pymilvus import connections
from chaos.checker import (InsertFlushChecker,
SearchChecker,
QueryChecker,
IndexChecker,
DeleteChecker,
Op)
from common.cus_resource_opts import CustomResourceOperations as CusResource
from utils.util_log import test_log as log
from chaos import chaos_commons as cc
from common.common_type import CaseLabel
from chaos import constants
from delayed_assert import expect, assert_expectations
def assert_statistic(checkers, expectations={}):
for k in checkers.keys():
# expect succ if no expectations
succ_rate = checkers[k].succ_rate()
total = checkers[k].total()
average_time = checkers[k].average_time
if expectations.get(k, '') == constants.FAIL:
log.info(
f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
expect(succ_rate < 0.49 or total < 2,
f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
else:
log.info(
f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
expect(succ_rate > 0.90 and total > 2,
f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
def get_all_collections():
try:
with open("/tmp/ci_logs/all_collections.json", "r") as f:
data = json.load(f)
all_collections = data["all"]
except Exception as e:
log.error(f"get_all_collections error: {e}")
return []
return all_collections
class TestBase:
expect_create = constants.SUCC
expect_insert = constants.SUCC
expect_flush = constants.SUCC
expect_index = constants.SUCC
expect_search = constants.SUCC
expect_query = constants.SUCC
host = '127.0.0.1'
port = 19530
_chaos_config = None
health_checkers = {}
class TestOperatiions(TestBase):
@pytest.fixture(scope="function", autouse=True)
def connection(self, host, port):
connections.add_connection(default={"host": host, "port": port})
connections.connect(alias='default')
if connections.has_connection("default") is False:
raise Exception("no connections")
self.host = host
self.port = port
def init_health_checkers(self, collection_name=None):
c_name = collection_name
checkers = {
Op.insert: InsertFlushChecker(collection_name=c_name),
Op.flush: InsertFlushChecker(collection_name=c_name, flush=True),
Op.index: IndexChecker(collection_name=c_name),
Op.search: SearchChecker(collection_name=c_name),
Op.query: QueryChecker(collection_name=c_name),
Op.delete: DeleteChecker(collection_name=c_name),
}
self.health_checkers = checkers
@pytest.fixture(scope="function", params=get_all_collections())
def collection_name(self, request):
if request.param == [] or request.param == "":
pytest.skip("The collection name is invalid")
yield request.param
@pytest.mark.tags(CaseLabel.L3)
def test_operations(self, collection_name):
# start the monitor threads to check the milvus ops
log.info("*********************Test Start**********************")
log.info(connections.get_connection_addr('default'))
c_name = collection_name
self.init_health_checkers(collection_name=c_name)
cc.start_monitor_threads(self.health_checkers)
# wait 20s
sleep(constants.WAIT_PER_OP * 2)
# assert all expectations
assert_statistic(self.health_checkers)
assert_expectations()
log.info("*********************Chaos Test Completed**********************")