mirror of https://github.com/milvus-io/milvus.git
[skip e2e]Add checker for chaos test (#17170)
Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>pull/17179/head
parent
d6d9c9126c
commit
2c15cd3203
|
@ -1,6 +1,7 @@
|
|||
from enum import Enum
|
||||
from random import randint
|
||||
import time
|
||||
import random
|
||||
from time import sleep
|
||||
from delayed_assert import expect
|
||||
from base.collection_wrapper import ApiCollectionWrapper
|
||||
|
@ -65,7 +66,7 @@ class Checker:
|
|||
rsp_times = self.rsp_times
|
||||
average_time = 0 if len(rsp_times) == 0 else sum(rsp_times) / len(rsp_times)
|
||||
max_time = 0 if len(rsp_times) == 0 else max(rsp_times)
|
||||
min_time = 0 if len(rsp_times) == 0 else min(rsp_times)
|
||||
min_time = 0 if len(rsp_times) == 0 else min(rsp_times)
|
||||
checkers_result = f"succ_rate: {succ_rate:.2f}, total: {total:03d}, average_time: {average_time:.4f}, max_time: {max_time:.4f}, min_time: {min_time:.4f}"
|
||||
return checkers_result
|
||||
|
||||
|
@ -232,13 +233,140 @@ class QueryChecker(Checker):
|
|||
sleep(constants.WAIT_PER_OP / 10)
|
||||
|
||||
|
||||
class DeleteChecker(Checker):
|
||||
"""check delete operations in a dependent thread"""
|
||||
|
||||
def __init__(self, collection_name=None):
|
||||
super().__init__(collection_name=collection_name)
|
||||
self.c_wrap.load() # load before query
|
||||
|
||||
def keep_running(self):
|
||||
while True:
|
||||
term_expr = f'{ct.default_int64_field_name} > 0'
|
||||
res, _ = self.c_wrap.query(term_expr, output_fields=[ct.default_int64_field_name])
|
||||
ids = [r[ct.default_int64_field_name] for r in res]
|
||||
delete_ids = random.sample(ids, 2)
|
||||
expr = f'{ct.default_int64_field_name} in {delete_ids}'
|
||||
t0 = time.time()
|
||||
_, result = self.c_wrap.delete(expr=expr, timeout=timeout)
|
||||
tt = time.time() - t0
|
||||
if result:
|
||||
self.rsp_times.append(tt)
|
||||
self.average_time = (tt + self.average_time * self._succ) / (self._succ + 1)
|
||||
self._succ += 1
|
||||
log.debug(f"delete success, time: {tt:.4f}, average_time: {self.average_time:.4f}")
|
||||
else:
|
||||
self._fail += 1
|
||||
sleep(constants.WAIT_PER_OP / 10)
|
||||
|
||||
|
||||
class CompactChecker(Checker):
|
||||
"""check compact operations in a dependent thread"""
|
||||
|
||||
def __init__(self, collection_name=None):
|
||||
super().__init__(collection_name=collection_name)
|
||||
self.ut = ApiUtilityWrapper()
|
||||
self.c_wrap.load(enable_traceback=enable_traceback) # load before compact
|
||||
|
||||
def keep_running(self):
|
||||
while True:
|
||||
seg_info = self.ut.get_query_segment_info(self.c_wrap.name)
|
||||
t0 = time.time()
|
||||
res, result = self.c_wrap.compact(timeout=timeout)
|
||||
print(f"compact done: res {res}")
|
||||
self.c_wrap.wait_for_compaction_completed()
|
||||
self.c_wrap.get_compaction_plans()
|
||||
t1 = time.time()
|
||||
if result:
|
||||
self.rsp_times.append(t1 - t0)
|
||||
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
|
||||
self._succ += 1
|
||||
log.debug(f"compact success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}")
|
||||
else:
|
||||
self._fail += 1
|
||||
sleep(constants.WAIT_PER_OP / 10)
|
||||
|
||||
|
||||
class DropChecker(Checker):
|
||||
"""check drop operations in a dependent thread"""
|
||||
|
||||
def __init__(self, collection_name=None):
|
||||
super().__init__(collection_name=collection_name)
|
||||
# self.c_wrap.load(enable_traceback=enable_traceback) # load before compact
|
||||
|
||||
def keep_running(self):
|
||||
while True:
|
||||
t0 = time.time()
|
||||
_, result = self.c_wrap.drop()
|
||||
t1 = time.time()
|
||||
if result:
|
||||
self.rsp_times.append(t1 - t0)
|
||||
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
|
||||
self._succ += 1
|
||||
log.debug(f"drop success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}")
|
||||
else:
|
||||
self._fail += 1
|
||||
sleep(constants.WAIT_PER_OP / 10)
|
||||
|
||||
|
||||
class LoadBalanceChecker(Checker):
|
||||
"""check loadbalance operations in a dependent thread"""
|
||||
|
||||
def __init__(self, collection_name=None):
|
||||
super().__init__(collection_name=collection_name)
|
||||
self.utility_wrap = ApiUtilityWrapper()
|
||||
self.c_wrap.load(enable_traceback=enable_traceback)
|
||||
|
||||
def keep_running(self):
|
||||
while True:
|
||||
c_name = self.c_wrap.name
|
||||
res, _ = self.c_wrap.get_replicas()
|
||||
# prepare load balance params
|
||||
# find a group which has multi nodes
|
||||
group_nodes = []
|
||||
for g in res.groups:
|
||||
if len(g.group_nodes) >= 2:
|
||||
group_nodes = list(g.group_nodes)
|
||||
break
|
||||
src_node_id = group_nodes[0]
|
||||
dst_node_ids = group_nodes[1:]
|
||||
res, _ = self.utility_wrap.get_query_segment_info(c_name)
|
||||
segment_distribution = cf.get_segment_distribution(res)
|
||||
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
|
||||
# load balance
|
||||
t0 = time.time()
|
||||
_, result = self.utility_wrap.load_balance(c_name, src_node_id, dst_node_ids, sealed_segment_ids)
|
||||
t1 = time.time()
|
||||
# get segments distribution after load balance
|
||||
time.sleep(3)
|
||||
res, _ = self.utility_wrap.get_query_segment_info(c_name)
|
||||
segment_distribution = cf.get_segment_distribution(res)
|
||||
sealed_segment_ids_after_load_banalce = segment_distribution[src_node_id]["sealed"]
|
||||
check_1 = len(set(sealed_segment_ids) & set(sealed_segment_ids_after_load_banalce)) == 0
|
||||
des_sealed_segment_ids = []
|
||||
for des_node_id in dst_node_ids:
|
||||
des_sealed_segment_ids += segment_distribution[des_node_id]["sealed"]
|
||||
# assert sealed_segment_ids is subset of des_sealed_segment_ids
|
||||
check_2 = set(sealed_segment_ids).issubset(set(des_sealed_segment_ids))
|
||||
|
||||
if result and (check_1 and check_2):
|
||||
self.rsp_times.append(t1 - t0)
|
||||
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
|
||||
self._succ += 1
|
||||
log.debug(f"load balance success, time: {t1 - t0:.4f}, average_time: {self.average_time:.4f}")
|
||||
else:
|
||||
self._fail += 1
|
||||
sleep(10)
|
||||
|
||||
|
||||
class BulkLoadChecker(Checker):
|
||||
"""check bulk load operations in a dependent thread"""
|
||||
|
||||
def __init__(self,):
|
||||
def __init__(self, flush=False):
|
||||
super().__init__()
|
||||
self.utility_wrap = ApiUtilityWrapper()
|
||||
self.schema = cf.gen_default_collection_schema()
|
||||
self.flush = flush
|
||||
self.files = ["bulk_load_data_source.json"]
|
||||
self.row_based = True
|
||||
self.recheck_failed_task = False
|
||||
|
@ -256,11 +384,15 @@ class BulkLoadChecker(Checker):
|
|||
while True:
|
||||
if self.recheck_failed_task and self.failed_tasks:
|
||||
c_name = self.failed_tasks.pop(0)
|
||||
log.debug(f"check failed task: {c_name}")
|
||||
log.info(f"check failed task: {c_name}")
|
||||
else:
|
||||
c_name = cf.gen_unique_str("BulkLoadChecker_")
|
||||
self.c_wrap.init_collection(name=c_name, schema=self.schema)
|
||||
# pre_entities_num = self.c_wrap.num_entities
|
||||
if self.flush:
|
||||
t0 = time.time()
|
||||
pre_entities_num = self.c_wrap.num_entities
|
||||
tt = time.time() - t0
|
||||
log.info(f"flush before bulk load, cost time: {tt:.4f}")
|
||||
# import data
|
||||
t0 = time.time()
|
||||
task_ids, res_1 = self.utility_wrap.bulk_load(collection_name=c_name,
|
||||
|
@ -275,6 +407,11 @@ class BulkLoadChecker(Checker):
|
|||
self.average_time = (tt + self.average_time * self._succ) / (self._succ + 1)
|
||||
self._succ += 1
|
||||
log.info(f"bulk load success for collection {c_name}, time: {tt:.4f}, average_time: {self.average_time:4f}")
|
||||
if self.flush:
|
||||
t0 = time.time()
|
||||
cur_entities_num = self.c_wrap.num_entities
|
||||
tt = time.time() - t0
|
||||
log.info(f"flush after bulk load, cost time: {tt:.4f}")
|
||||
else:
|
||||
self._fail += 1
|
||||
# if the task failed, store the failed collection name for further checking after chaos
|
||||
|
|
Loading…
Reference in New Issue