[test]Refine health checker in test (#26920)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/25762/head
zhuwenxing 2023-09-08 10:09:16 +08:00 committed by GitHub
parent 9166011c4a
commit 0f4475e5e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 38 additions and 6 deletions

View File

@ -37,10 +37,22 @@ def gen_experiment_config(yaml):
def start_monitor_threads(checkers={}):
"""start the threads by checkers"""
tasks = []
for k, ch in checkers.items():
ch._keep_running = True
t = threading.Thread(target=ch.keep_running, args=(), name=k, daemon=True)
t.start()
tasks.append(t)
return tasks
def check_thread_status(tasks):
"""check the status of all threads"""
for t in tasks:
if t.is_alive():
log.info(f"thread {t.name} is still running")
else:
log.info(f"thread {t.name} is not running")
def get_env_variable_by_name(name):

View File

@ -531,6 +531,7 @@ class InsertChecker(Checker):
self.scale = 1 * 10 ** 6
self.start_time_stamp = int(time.time() * self.scale) # us
self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp}'
self.file_name = f"/tmp/ci_logs/insert_data_{uuid.uuid4()}.parquet"
@trace()
def insert(self):
@ -548,6 +549,7 @@ class InsertChecker(Checker):
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
if result:
# TODO: persist data to file
self.inserted_data.extend(ts_data)
return res, result
@ -812,12 +814,24 @@ class DropChecker(Checker):
if collection_name is None:
collection_name = cf.gen_unique_str("DropChecker_")
super().__init__(collection_name=collection_name, schema=schema)
self.collection_pool = []
self.gen_collection_pool(schema=self.schema)
def gen_collection_pool(self, pool_size=50, schema=None):
for i in range(pool_size):
collection_name = cf.gen_unique_str("DropChecker_")
res, result = self.c_wrap.init_collection(name=collection_name, schema=schema)
if result:
self.collection_pool.append(collection_name)
@trace()
def drop(self):
res, result = self.c_wrap.drop()
if result:
self.collection_pool.remove(self.c_wrap.name)
return res, result
@exception_handler()
def run_task(self):
res, result = self.drop()
return res, result
@ -826,11 +840,16 @@ class DropChecker(Checker):
while self._keep_running:
res, result = self.run_task()
if result:
self.c_wrap.init_collection(
name=cf.gen_unique_str("DropChecker_"),
schema=cf.gen_default_collection_schema(),
timeout=timeout,
check_task=CheckTasks.check_nothing)
try:
if len(self.collection_pool) <= 10:
self.gen_collection_pool(schema=self.schema)
except Exception as e:
log.error(f"Failed to generate collection pool: {e}")
try:
c_name = self.collection_pool[0]
self.c_wrap.init_collection(name=c_name)
except Exception as e:
log.error(f"Failed to init new collection: {e}")
sleep(constants.WAIT_PER_OP / 10)

View File

@ -82,7 +82,7 @@ class TestOperations(TestBase):
event_records.insert("init_health_checkers", "start")
self.init_health_checkers(collection_name=c_name)
event_records.insert("init_health_checkers", "finished")
cc.start_monitor_threads(self.health_checkers)
tasks = cc.start_monitor_threads(self.health_checkers)
log.info("*********************Load Start**********************")
# wait request_duration
request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "")
@ -102,6 +102,7 @@ class TestOperations(TestBase):
# wait all pod ready
wait_pods_ready(self.milvus_ns, f"app.kubernetes.io/instance={self.release_name}")
time.sleep(60)
cc.check_thread_status(tasks)
for k, v in self.health_checkers.items():
v.pause()
ra = ResultAnalyzer()