mirror of https://github.com/milvus-io/milvus.git
[test]Add delete checker for chaos test (#17965)
Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>pull/17980/head
parent
6ab850bedf
commit
5083425291
|
@ -35,6 +35,7 @@ def gen_experiment_config(yaml):
|
|||
def start_monitor_threads(checkers={}):
|
||||
"""start the threads by checkers"""
|
||||
for k, ch in checkers.items():
|
||||
ch._keep_running = True
|
||||
t = threading.Thread(target=ch.keep_running, args=(), name=k, daemon=True)
|
||||
t.start()
|
||||
|
||||
|
|
|
@ -128,6 +128,7 @@ class Checker:
|
|||
|
||||
def terminate(self):
|
||||
self._keep_running = False
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self._succ = 0
|
||||
|
@ -215,7 +216,7 @@ class FlushChecker(Checker):
|
|||
|
||||
def __init__(self, collection_name=None, flush=False, shards_num=2):
|
||||
if collection_name is None:
|
||||
collection_name = cf.gen_unique_str("InsertChecker_")
|
||||
collection_name = cf.gen_unique_str("FlushChecker_")
|
||||
super().__init__(collection_name=collection_name, shards_num=shards_num)
|
||||
self._flush = flush
|
||||
self.initial_entities = self.c_wrap.num_entities
|
||||
|
@ -223,7 +224,7 @@ class FlushChecker(Checker):
|
|||
@trace()
|
||||
def flush(self):
|
||||
num_entities = self.c_wrap.num_entities
|
||||
if num_entities == (self.initial_entities + constants.DELTA_PER_INS):
|
||||
if num_entities >= (self.initial_entities + constants.DELTA_PER_INS):
|
||||
result = True
|
||||
self.initial_entities += constants.DELTA_PER_INS
|
||||
else:
|
||||
|
|
|
@ -6,8 +6,8 @@ import json
|
|||
from time import sleep
|
||||
|
||||
from pymilvus import connections
|
||||
from chaos.checker import (CreateChecker, InsertFlushChecker,
|
||||
SearchChecker, QueryChecker, IndexChecker, Op)
|
||||
from chaos.checker import (CreateChecker, InsertChecker, FlushChecker,
|
||||
SearchChecker, QueryChecker, IndexChecker, DeleteChecker, Op)
|
||||
from common.cus_resource_opts import CustomResourceOperations as CusResource
|
||||
from utils.util_log import test_log as log
|
||||
from utils.util_k8s import wait_pods_ready, get_pod_list
|
||||
|
@ -20,7 +20,6 @@ from delayed_assert import assert_expectations
|
|||
|
||||
|
||||
def check_cluster_nodes(chaos_config):
|
||||
|
||||
# if all pods will be effected, the expect is all fail.
|
||||
# Even though the replicas is greater than 1, it can not provide HA, so cluster_nodes is set as 1 for this situation.
|
||||
if "all" in chaos_config["metadata"]["name"]:
|
||||
|
@ -40,6 +39,7 @@ def check_cluster_nodes(chaos_config):
|
|||
pods = get_pod_list(namespace, labels_str)
|
||||
return len(pods)
|
||||
|
||||
|
||||
def record_results(checkers):
|
||||
res = ""
|
||||
for k in checkers.keys():
|
||||
|
@ -103,11 +103,12 @@ class TestChaos(TestChaosBase):
|
|||
def init_health_checkers(self):
|
||||
checkers = {
|
||||
Op.create: CreateChecker(),
|
||||
Op.insert: InsertFlushChecker(),
|
||||
Op.flush: InsertFlushChecker(flush=True),
|
||||
Op.insert: InsertChecker(),
|
||||
Op.flush: FlushChecker(),
|
||||
Op.index: IndexChecker(),
|
||||
Op.search: SearchChecker(),
|
||||
Op.query: QueryChecker()
|
||||
Op.query: QueryChecker(),
|
||||
Op.delete: DeleteChecker()
|
||||
}
|
||||
self.health_checkers = checkers
|
||||
|
||||
|
@ -230,6 +231,16 @@ class TestChaos(TestChaosBase):
|
|||
f.write(record_results(self.health_checkers))
|
||||
except Exception as e:
|
||||
log.info(f"Fail to write the report: {e}")
|
||||
# terminate and restart threads
|
||||
for k, checker in self.health_checkers.items():
|
||||
checker.terminate()
|
||||
sleep(5)
|
||||
log.info(f'Alive threads: {threading.enumerate()}')
|
||||
cc.start_monitor_threads(self.health_checkers)
|
||||
sleep(constants.WAIT_PER_OP * 2)
|
||||
log.info("******4th assert after chaos deleted: ")
|
||||
assert_statistic(self.health_checkers)
|
||||
|
||||
# assert all expectations
|
||||
assert_expectations()
|
||||
|
||||
|
|
Loading…
Reference in New Issue