diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index 641bf74e8f..26c63d4f1a 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -12,7 +12,9 @@ from datetime import datetime from prettytable import PrettyTable import functools from time import sleep +from base.database_wrapper import ApiDatabaseWrapper from base.collection_wrapper import ApiCollectionWrapper +from base.partition_wrapper import ApiPartitionWrapper from base.utility_wrapper import ApiUtilityWrapper from common import common_func as cf from common import common_type as ct @@ -195,15 +197,30 @@ class ResultAnalyzer: class Op(Enum): - create = 'create' + create = 'create' # short name for create collection + create_db = 'create_db' + create_collection = 'create_collection' + create_partition = 'create_partition' insert = 'insert' + upsert = 'upsert' flush = 'flush' index = 'index' + create_index = 'create_index' + drop_index = 'drop_index' + load = 'load' + load_collection = 'load_collection' + load_partition = 'load_partition' + release = 'release' + release_collection = 'release_collection' + release_partition = 'release_partition' search = 'search' query = 'query' delete = 'delete' compact = 'compact' - drop = 'drop' + drop = 'drop' # short name for drop collection + drop_db = 'drop_db' + drop_collection = 'drop_collection' + drop_partition = 'drop_partition' load_balance = 'load_balance' bulk_insert = 'bulk_insert' unknown = 'unknown' @@ -288,7 +305,8 @@ class Checker: b. count operations and success rate """ - def __init__(self, collection_name=None, shards_num=2, dim=ct.default_dim, insert_data=True, schema=None): + def __init__(self, collection_name=None, partition_name=None, shards_num=2, dim=ct.default_dim, insert_data=True, + schema=None): self.recovery_time = 0 self._succ = 0 self._fail = 0 @@ -299,11 +317,16 @@ class Checker: self.files = [] self.ms = MilvusSys() self.bucket_name = self.ms.index_nodes[0]["infos"]["system_configurations"]["minio_bucket_name"] + self.db_wrap = ApiDatabaseWrapper() self.c_wrap = ApiCollectionWrapper() + self.p_wrap = ApiPartitionWrapper() self.utility_wrap = ApiUtilityWrapper() c_name = collection_name if collection_name is not None else cf.gen_unique_str( 'Checker_') self.c_name = c_name + p_name = partition_name if partition_name is not None else "_default" + self.p_name = p_name + self.p_names = [self.p_name] if partition_name is not None else None schema = cf.gen_default_collection_schema(dim=dim) if schema is None else schema self.schema = schema self.dim = cf.get_dim_by_schema(schema=schema) @@ -314,17 +337,27 @@ class Checker: shards_num=shards_num, timeout=timeout, enable_traceback=enable_traceback) + self.p_wrap.init_partition(self.c_name, self.p_name) if insert_data: log.info(f"collection {c_name} created, start to insert data") t0 = time.perf_counter() self.c_wrap.insert( data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=schema, start=0), + partition_name=self.p_name, timeout=timeout, enable_traceback=enable_traceback) log.info(f"insert data for collection {c_name} cost {time.perf_counter() - t0}s") self.initial_entities = self.c_wrap.num_entities # do as a flush + def insert_data(self, nb=constants.ENTITIES_FOR_SEARCH, partition_name=None): + partition_name = self.p_name if partition_name is None else partition_name + self.c_wrap.insert( + data=cf.get_column_data_by_schema(nb=nb, schema=self.schema, start=0), + partition_name=partition_name, + timeout=timeout, + enable_traceback=enable_traceback) + def total(self): return self._succ + self._fail @@ -407,6 +440,140 @@ class Checker: return task_ids, completed +class CollectionLoadChecker(Checker): + """check collection load operations in a dependent thread""" + + def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ): + self.replica_number = replica_number + if collection_name is None: + collection_name = cf.gen_unique_str("LoadChecker_") + super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) + self.c_wrap.create_index(self.float_vector_field_name, + constants.DEFAULT_INDEX_PARAM, + index_name=cf.gen_unique_str('index_'), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) + + @trace() + def load_collection(self): + res, result = self.c_wrap.load(replica_number=self.replica_number) + return res, result + + @exception_handler() + def run_task(self): + res, result = self.load_collection() + if result: + self.c_wrap.release() + return res, result + + def keep_running(self): + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP) + + +class CollectionReleaseChecker(Checker): + """check collection release operations in a dependent thread""" + + def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ): + self.replica_number = replica_number + if collection_name is None: + collection_name = cf.gen_unique_str("LoadChecker_") + super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) + self.c_wrap.create_index(self.float_vector_field_name, + constants.DEFAULT_INDEX_PARAM, + index_name=cf.gen_unique_str('index_'), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) + self.c_wrap.load(replica_number=self.replica_number) + + @trace() + def release_collection(self): + res, result = self.c_wrap.release() + return res, result + + @exception_handler() + def run_task(self): + res, result = self.release_collection() + if result: + self.c_wrap.release() + return res, result + + def keep_running(self): + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP) + + +class PartitionLoadChecker(Checker): + """check partition load operations in a dependent thread""" + + def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ): + self.replica_number = replica_number + if collection_name is None: + collection_name = cf.gen_unique_str("LoadChecker_") + super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) + self.c_wrap.create_index(self.float_vector_field_name, + constants.DEFAULT_INDEX_PARAM, + index_name=cf.gen_unique_str('index_'), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) + + @trace() + def load_partition(self): + res, result = self.p_wrap.load(replica_number=self.replica_number) + return res, result + + @exception_handler() + def run_task(self): + res, result = self.load_partition() + if result: + self.p_wrap.release() + return res, result + + def keep_running(self): + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP) + + +class PartitionReleaseChecker(Checker): + """check partition release operations in a dependent thread""" + + def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ): + self.replica_number = replica_number + if collection_name is None: + collection_name = cf.gen_unique_str("LoadChecker_") + super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) + self.c_wrap.create_index(self.float_vector_field_name, + constants.DEFAULT_INDEX_PARAM, + index_name=cf.gen_unique_str('index_'), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) + self.p_wrap.load(replica_number=self.replica_number) + + @trace() + def release_partition(self): + res, result = self.p_wrap.release() + return res, result + + @exception_handler() + def run_task(self): + res, result = self.release_partition() + if result: + self.p_wrap.load(replica_number=self.replica_number) + return res, result + + def keep_running(self): + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP) + + class SearchChecker(Checker): """check search operations in a dependent thread""" @@ -422,6 +589,7 @@ class SearchChecker(Checker): check_task=CheckTasks.check_nothing) # do load before search self.c_wrap.load(replica_number=replica_number) + self.insert_data() @trace() def search(self): @@ -430,6 +598,7 @@ class SearchChecker(Checker): anns_field=self.float_vector_field_name, param=constants.DEFAULT_SEARCH_PARAM, limit=1, + partition_names=self.p_names, timeout=search_timeout, check_task=CheckTasks.check_nothing ) @@ -525,7 +694,7 @@ class FlushChecker(Checker): class InsertChecker(Checker): - """check flush operations in a dependent thread""" + """check insert operations in a dependent thread""" def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None): if collection_name is None: @@ -540,7 +709,7 @@ class InsertChecker(Checker): self.file_name = f"/tmp/ci_logs/insert_data_{uuid.uuid4()}.parquet" @trace() - def insert(self): + def insert_entities(self): data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema) ts_data = [] for i in range(constants.DELTA_PER_INS): @@ -551,6 +720,7 @@ class InsertChecker(Checker): data[0] = ts_data # set timestamp (ms) as int64 log.debug(f"insert data: {ts_data}") res, result = self.c_wrap.insert(data=data, + partition_names=self.p_names, timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) @@ -561,7 +731,7 @@ class InsertChecker(Checker): @exception_handler() def run_task(self): - res, result = self.insert() + res, result = self.insert_entities() return res, result def keep_running(self): @@ -599,8 +769,36 @@ class InsertChecker(Checker): pytest.assume(set(data_in_server) == set(data_in_client)) -class CreateChecker(Checker): - """check create operations in a dependent thread""" +class UpsertChecker(Checker): + """check upsert operations in a dependent thread""" + + def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None): + if collection_name is None: + collection_name = cf.gen_unique_str("InsertChecker_") + super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) + + @trace() + def upsert_entities(self): + data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema) + res, result = self.c_wrap.upsert(data=data, + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) + return res, result + + @exception_handler() + def run_task(self): + res, result = self.upsert_entities() + return res, result + + def keep_running(self): + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP / 10) + + +class CollectionCreateChecker(Checker): + """check collection create operations in a dependent thread""" def __init__(self, collection_name=None, schema=None): if collection_name is None: @@ -630,8 +828,180 @@ class CreateChecker(Checker): sleep(constants.WAIT_PER_OP) -class IndexChecker(Checker): - """check Insert operations in a dependent thread""" +class CollectionDropChecker(Checker): + """check collection drop operations in a dependent thread""" + + def __init__(self, collection_name=None, schema=None): + if collection_name is None: + collection_name = cf.gen_unique_str("DropChecker_") + super().__init__(collection_name=collection_name, schema=schema) + self.collection_pool = [] + self.gen_collection_pool(schema=self.schema) + + def gen_collection_pool(self, pool_size=50, schema=None): + for i in range(pool_size): + collection_name = cf.gen_unique_str("DropChecker_") + res, result = self.c_wrap.init_collection(name=collection_name, schema=schema) + if result: + self.collection_pool.append(collection_name) + + @trace() + def drop_collection(self): + res, result = self.c_wrap.drop() + if result: + self.collection_pool.remove(self.c_wrap.name) + return res, result + + @exception_handler() + def run_task(self): + res, result = self.drop_collection() + return res, result + + def keep_running(self): + while self._keep_running: + res, result = self.run_task() + if result: + try: + if len(self.collection_pool) <= 10: + self.gen_collection_pool(schema=self.schema) + except Exception as e: + log.error(f"Failed to generate collection pool: {e}") + try: + c_name = self.collection_pool[0] + self.c_wrap.init_collection(name=c_name) + except Exception as e: + log.error(f"Failed to init new collection: {e}") + sleep(constants.WAIT_PER_OP) + + +class PartitionCreateChecker(Checker): + """check partition create operations in a dependent thread""" + + def __init__(self, collection_name=None, schema=None, partition_name=None): + if collection_name is None: + collection_name = cf.gen_unique_str("PartitionCreateChecker_") + super().__init__(collection_name=collection_name, schema=schema, partition_name=partition_name) + + @trace() + def create_partition(self): + res, result = self.p_wrap.init_partition(collection=self.c_name, + name=cf.gen_unique_str("PartitionCreateChecker_"), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing + ) + return res, result + + @exception_handler() + def run_task(self): + res, result = self.create_partition() + if result: + self.p_wrap.drop(timeout=timeout) + return res, result + + def keep_running(self): + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP) + + +class PartitionDropChecker(Checker): + """check partition drop operations in a dependent thread""" + + def __init__(self, collection_name=None, schema=None, partition_name=None): + if collection_name is None: + collection_name = cf.gen_unique_str("PartitionDropChecker_") + super().__init__(collection_name=collection_name, schema=schema, partition_name=partition_name) + self.p_wrap.init_partition(collection=self.c_name, + name=cf.gen_unique_str("PartitionDropChecker_"), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing + ) + + @trace() + def drop_partition(self): + res, result = self.p_wrap.drop() + return res, result + + @exception_handler() + def run_task(self): + res, result = self.drop_partition() + if result: + self.p_wrap.init_partition(collection=self.c_name, + name=cf.gen_unique_str("PartitionDropChecker_"), + timeout=timeout, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing + ) + return res, result + + def keep_running(self): + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP) + + +class DatabaseCreateChecker(Checker): + """check create database operations in a dependent thread""" + + def __init__(self, collection_name=None, schema=None): + if collection_name is None: + collection_name = cf.gen_unique_str("DatabaseChecker_") + super().__init__(collection_name=collection_name, schema=schema) + self.db_name = None + + @trace() + def init_db(self): + db_name = cf.gen_unique_str("db_") + res, result = self.db_wrap.create_database(db_name) + self.db_name = db_name + return res, result + + @exception_handler() + def run_task(self): + res, result = self.init_db() + if result: + self.db_wrap.drop_database(self.db_name) + return res, result + + def keep_running(self): + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP) + + +class DatabaseDropChecker(Checker): + """check drop database operations in a dependent thread""" + + def __init__(self, collection_name=None, schema=None): + if collection_name is None: + collection_name = cf.gen_unique_str("DatabaseChecker_") + super().__init__(collection_name=collection_name, schema=schema) + self.db_name = cf.gen_unique_str("db_") + self.db_wrap.create_database(self.db_name) + + @trace() + def drop_db(self): + res, result = self.db_wrap.drop_database(self.db_name) + return res, result + + @exception_handler() + def run_task(self): + res, result = self.drop_db() + if result: + self.db_name = cf.gen_unique_str("db_") + self.db_wrap.create_database(self.db_name) + return res, result + + def keep_running(self): + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP) + + +class IndexCreateChecker(Checker): + """check index create operations in a dependent thread""" def __init__(self, collection_name=None, schema=None): if collection_name is None: @@ -666,6 +1036,47 @@ class IndexChecker(Checker): sleep(constants.WAIT_PER_OP * 6) +class IndexDropChecker(Checker): + """check index drop operations in a dependent thread""" + + def __init__(self, collection_name=None, schema=None): + if collection_name is None: + collection_name = cf.gen_unique_str("IndexChecker_") + super().__init__(collection_name=collection_name, schema=schema) + self.index_name = cf.gen_unique_str('index_') + for i in range(5): + self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema), + timeout=timeout, enable_traceback=enable_traceback) + # do as a flush before indexing + log.debug(f"Index ready entities: {self.c_wrap.num_entities}") + self.c_wrap.create_index(self.float_vector_field_name, + constants.DEFAULT_INDEX_PARAM, + index_name=self.index_name, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) + + @trace() + def drop_index(self): + res, result = self.c_wrap.drop_index(timeout=timeout) + return res, result + + @exception_handler() + def run_task(self): + res, result = self.drop_index() + if result: + self.c_wrap.create_index(self.float_vector_field_name, + constants.DEFAULT_INDEX_PARAM, + index_name=self.index_name, + enable_traceback=enable_traceback, + check_task=CheckTasks.check_nothing) + return res, result + + def keep_running(self): + while self._keep_running: + self.run_task() + sleep(constants.WAIT_PER_OP * 6) + + class QueryChecker(Checker): """check query operations in a dependent thread""" @@ -681,6 +1092,7 @@ class QueryChecker(Checker): enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) self.c_wrap.load(replica_number=replica_number) # do load before query + self.insert_data() self.term_expr = None @trace() @@ -704,40 +1116,6 @@ class QueryChecker(Checker): sleep(constants.WAIT_PER_OP / 10) -class LoadChecker(Checker): - """check load operations in a dependent thread""" - - def __init__(self, collection_name=None, replica_number=1, schema=None): - if collection_name is None: - collection_name = cf.gen_unique_str("LoadChecker_") - super().__init__(collection_name=collection_name, schema=schema) - self.replica_number = replica_number - res, result = self.c_wrap.create_index(self.float_vector_field_name, - constants.DEFAULT_INDEX_PARAM, - index_name=cf.gen_unique_str( - 'index_'), - timeout=timeout, - enable_traceback=enable_traceback, - check_task=CheckTasks.check_nothing) - - @trace() - def load(self): - res, result = self.c_wrap.load(replica_number=self.replica_number, timeout=timeout) - return res, result - - @exception_handler() - def run_task(self): - res, result = self.load() - if result: - self.c_wrap.release() - return res, result - - def keep_running(self): - while self._keep_running: - self.run_task() - sleep(constants.WAIT_PER_OP / 10) - - class DeleteChecker(Checker): """check delete operations in a dependent thread""" @@ -753,6 +1131,7 @@ class DeleteChecker(Checker): enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) self.c_wrap.load() # load before query + self.insert_data() term_expr = f'{self.int64_field_name} > 0' res, _ = self.c_wrap.query(term_expr, output_fields=[ self.int64_field_name]) @@ -760,7 +1139,7 @@ class DeleteChecker(Checker): self.expr = None @trace() - def delete(self): + def delete_entities(self): res, result = self.c_wrap.delete(expr=self.expr, timeout=timeout) return res, result @@ -768,7 +1147,7 @@ class DeleteChecker(Checker): def run_task(self): delete_ids = self.ids.pop() self.expr = f'{self.int64_field_name} in {[delete_ids]}' - res, result = self.delete() + res, result = self.delete_entities() return res, result def keep_running(self): @@ -812,54 +1191,8 @@ class CompactChecker(Checker): sleep(constants.WAIT_PER_OP / 10) -class DropChecker(Checker): - """check drop operations in a dependent thread""" - - def __init__(self, collection_name=None, schema=None): - if collection_name is None: - collection_name = cf.gen_unique_str("DropChecker_") - super().__init__(collection_name=collection_name, schema=schema) - self.collection_pool = [] - self.gen_collection_pool(schema=self.schema) - - def gen_collection_pool(self, pool_size=50, schema=None): - for i in range(pool_size): - collection_name = cf.gen_unique_str("DropChecker_") - res, result = self.c_wrap.init_collection(name=collection_name, schema=schema) - if result: - self.collection_pool.append(collection_name) - - @trace() - def drop(self): - res, result = self.c_wrap.drop() - if result: - self.collection_pool.remove(self.c_wrap.name) - return res, result - - @exception_handler() - def run_task(self): - res, result = self.drop() - return res, result - - def keep_running(self): - while self._keep_running: - res, result = self.run_task() - if result: - try: - if len(self.collection_pool) <= 10: - self.gen_collection_pool(schema=self.schema) - except Exception as e: - log.error(f"Failed to generate collection pool: {e}") - try: - c_name = self.collection_pool[0] - self.c_wrap.init_collection(name=c_name) - except Exception as e: - log.error(f"Failed to init new collection: {e}") - sleep(constants.WAIT_PER_OP) - - class LoadBalanceChecker(Checker): - """check loadbalance operations in a dependent thread""" + """check load balance operations in a dependent thread""" def __init__(self, collection_name=None, schema=None): if collection_name is None: @@ -912,7 +1245,7 @@ class LoadBalanceChecker(Checker): class BulkInsertChecker(Checker): - """check bulk load operations in a dependent thread""" + """check bulk insert operations in a dependent thread""" def __init__(self, collection_name=None, files=[], use_one_collection=False, dim=ct.default_dim, schema=None, insert_data=False): diff --git a/tests/python_client/chaos/test_chaos.py b/tests/python_client/chaos/test_chaos.py index e4363f59cf..867460da36 100644 --- a/tests/python_client/chaos/test_chaos.py +++ b/tests/python_client/chaos/test_chaos.py @@ -6,8 +6,8 @@ import json from time import sleep from pymilvus import connections -from chaos.checker import (CreateChecker, InsertChecker, FlushChecker, - SearchChecker, QueryChecker, IndexChecker, DeleteChecker, Op) +from chaos.checker import (CollectionCreateChecker, InsertChecker, FlushChecker, + SearchChecker, QueryChecker, IndexCreateChecker, DeleteChecker, Op) from common.cus_resource_opts import CustomResourceOperations as CusResource from utils.util_log import test_log as log from utils.util_k8s import wait_pods_ready, get_pod_list @@ -20,11 +20,11 @@ from delayed_assert import assert_expectations def check_cluster_nodes(chaos_config): - # if all pods will be effected, the expect is all fail. + # if all pods will be effected, the expect is all fail. # Even though the replicas is greater than 1, it can not provide HA, so cluster_nodes is set as 1 for this situation. if "all" in chaos_config["metadata"]["name"]: return 1 - + selector = findkeys(chaos_config, "selector") selector = list(selector) log.info(f"chaos target selector: {selector}") @@ -93,7 +93,7 @@ class TestChaos(TestChaosBase): def connection(self, host, port): connections.add_connection(default={"host": host, "port": port}) connections.connect(alias='default') - + if connections.has_connection("default") is False: raise Exception("no connections") self.host = host @@ -102,10 +102,10 @@ class TestChaos(TestChaosBase): @pytest.fixture(scope="function", autouse=True) def init_health_checkers(self): checkers = { - Op.create: CreateChecker(), + Op.create: CollectionCreateChecker(), Op.insert: InsertChecker(), Op.flush: FlushChecker(), - Op.index: IndexChecker(), + Op.index: IndexCreateChecker(), Op.search: SearchChecker(), Op.query: QueryChecker(), Op.delete: DeleteChecker() @@ -244,4 +244,4 @@ class TestChaos(TestChaosBase): # assert all expectations assert_expectations() - log.info("*********************Chaos Test Completed**********************") \ No newline at end of file + log.info("*********************Chaos Test Completed**********************") diff --git a/tests/python_client/chaos/test_chaos_memory_stress.py b/tests/python_client/chaos/test_chaos_memory_stress.py index 0d8f12f55b..1c27827457 100644 --- a/tests/python_client/chaos/test_chaos_memory_stress.py +++ b/tests/python_client/chaos/test_chaos_memory_stress.py @@ -9,7 +9,7 @@ import datetime from pymilvus import connections from base.collection_wrapper import ApiCollectionWrapper from base.utility_wrapper import ApiUtilityWrapper -from chaos.checker import Op, CreateChecker, InsertFlushChecker, IndexChecker, SearchChecker, QueryChecker +from chaos.checker import Op, CollectionCreateChecker, InsertFlushChecker, IndexCreateChecker, SearchChecker, QueryChecker from common.cus_resource_opts import CustomResourceOperations as CusResource from common import common_func as cf from common import common_type as ct @@ -74,7 +74,7 @@ class TestChaosData: # wait memory stress sleep(constants.WAIT_PER_OP * 2) - # try to do release, load, query and serach in a duration time loop + # try to do release, load, query and search in a duration time loop try: start = time.time() while time.time() - start < eval(duration): @@ -215,10 +215,10 @@ class TestChaosData: expected: Verify milvus operation succ rate """ mic_checkers = { - Op.create: CreateChecker(), + Op.create: CollectionCreateChecker(), Op.insert: InsertFlushChecker(), Op.flush: InsertFlushChecker(flush=True), - Op.index: IndexChecker(), + Op.index: IndexCreateChecker(), Op.search: SearchChecker(), Op.query: QueryChecker() } @@ -285,7 +285,7 @@ class TestMemoryStressReplica: @pytest.mark.skip(reason="https://github.com/milvus-io/milvus/issues/16887") @pytest.mark.tags(CaseLabel.L3) - def test_memory_stress_replicas_befor_load(self, prepare_collection): + def test_memory_stress_replicas_before_load(self, prepare_collection): """ target: test querynode group load with insufficient memory method: 1.Limit querynode memory ? 2Gi @@ -353,7 +353,7 @@ class TestMemoryStressReplica: def test_memory_stress_replicas_group_insufficient(self, prepare_collection, mode): """ target: test apply stress memory on different number querynodes and the group failed to load, - bacause of the memory is insufficient + because of the memory is insufficient method: 1.Limit querynodes memory 5Gi 2.Create collection and insert 1000,000 entities 3.Apply memory stress on querynodes and it's memory is not enough to load replicas @@ -529,7 +529,7 @@ class TestMemoryStressReplicaLoadBalance: chaos_res.delete(metadata_name=chaos_config.get('metadata', None).get('name', None)) - # Verfiy auto load loadbalance + # Verify auto load loadbalance seg_info_after, _ = utility_w.get_query_segment_info(collection_w.name) seg_distribution_after = cf.get_segment_distribution(seg_info_after) segments_num_after = len(seg_distribution_after[chaos_querynode_id]["sealed"]) @@ -549,7 +549,7 @@ class TestMemoryStressReplicaLoadBalance: method: 1.Limit all querynodes memory 6Gi 2.Create and insert 1000,000 entities 3.Load collection with two replicas - 4.Apply memory stress on one grooup 80% + 4.Apply memory stress on one group 80% expected: Verify that load balancing across groups is not occurring """ collection_w = prepare_collection @@ -586,7 +586,7 @@ class TestMemoryStressReplicaLoadBalance: chaos_res.delete(metadata_name=chaos_config.get('metadata', None).get('name', None)) - # Verfiy auto load loadbalance + # Verify auto load loadbalance seg_info_after, _ = utility_w.get_query_segment_info(collection_w.name) seg_distribution_before = cf.get_segment_distribution(seg_info_before) seg_distribution_after = cf.get_segment_distribution(seg_info_after) diff --git a/tests/python_client/chaos/test_load_with_checker.py b/tests/python_client/chaos/test_load_with_checker.py index 419a38b42b..724c467ba5 100644 --- a/tests/python_client/chaos/test_load_with_checker.py +++ b/tests/python_client/chaos/test_load_with_checker.py @@ -4,15 +4,15 @@ import json from time import sleep from minio import Minio from pymilvus import connections -from chaos.checker import (CreateChecker, +from chaos.checker import (CollectionCreateChecker, InsertChecker, FlushChecker, SearchChecker, QueryChecker, - IndexChecker, + IndexCreateChecker, DeleteChecker, CompactChecker, - DropChecker, + CollectionDropChecker, LoadBalanceChecker, BulkInsertChecker, Op) @@ -56,15 +56,15 @@ class TestChaos(TestChaosBase): def init_health_checkers(self): c_name = cf.gen_unique_str("Checker_") checkers = { - # Op.create: CreateChecker(collection_name=c_name), + # Op.create: CollectionCreateChecker(collection_name=c_name), # Op.insert: InsertChecker(collection_name=c_name), # Op.flush: FlushChecker(collection_name=c_name), # Op.query: QueryChecker(collection_name=c_name), # Op.search: SearchChecker(collection_name=c_name), # Op.delete: DeleteChecker(collection_name=c_name), # Op.compact: CompactChecker(collection_name=c_name), - # Op.index: IndexChecker(), - # Op.drop: DropChecker(), + # Op.index: IndexCreateChecker(), + # Op.drop: CollectionDropChecker(), # Op.bulk_insert: BulkInsertChecker(), Op.load_balance: LoadBalanceChecker() } diff --git a/tests/python_client/chaos/testcases/test_all_checker_operation.py b/tests/python_client/chaos/testcases/test_all_checker_operation.py new file mode 100644 index 0000000000..1c00febbe5 --- /dev/null +++ b/tests/python_client/chaos/testcases/test_all_checker_operation.py @@ -0,0 +1,133 @@ +import time + +import pytest +from time import sleep +from pymilvus import connections +from chaos.checker import ( + DatabaseCreateChecker, + DatabaseDropChecker, + CollectionCreateChecker, + CollectionDropChecker, + PartitionCreateChecker, + PartitionDropChecker, + CollectionLoadChecker, + CollectionReleaseChecker, + PartitionLoadChecker, + PartitionReleaseChecker, + IndexCreateChecker, + IndexDropChecker, + InsertChecker, + UpsertChecker, + DeleteChecker, + FlushChecker, + SearchChecker, + QueryChecker, + Op, + EventRecords, + ResultAnalyzer +) +from utils.util_log import test_log as log +from utils.util_k8s import wait_pods_ready, get_milvus_instance_name +from chaos import chaos_commons as cc +from common.common_type import CaseLabel +from common.milvus_sys import MilvusSys +from chaos.chaos_commons import assert_statistic +from chaos import constants +from delayed_assert import assert_expectations + + +class TestBase: + expect_create = constants.SUCC + expect_insert = constants.SUCC + expect_flush = constants.SUCC + expect_index = constants.SUCC + expect_search = constants.SUCC + expect_query = constants.SUCC + host = '127.0.0.1' + port = 19530 + _chaos_config = None + health_checkers = {} + + +class TestOperations(TestBase): + + @pytest.fixture(scope="function", autouse=True) + def connection(self, host, port, user, password, milvus_ns): + if user and password: + # log.info(f"connect to {host}:{port} with user {user} and password {password}") + connections.connect('default', host=host, port=port, user=user, password=password, secure=True) + else: + connections.connect('default', host=host, port=port) + if connections.has_connection("default") is False: + raise Exception("no connections") + log.info("connect to milvus successfully") + self.host = host + self.port = port + self.user = user + self.password = password + self.milvus_sys = MilvusSys(alias='default') + self.milvus_ns = milvus_ns + self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys) + + def init_health_checkers(self, collection_name=None): + c_name = collection_name + checkers = { + Op.create_db: DatabaseCreateChecker(), + Op.create_collection: CollectionCreateChecker(collection_name=c_name), + Op.create_partition: PartitionCreateChecker(collection_name=c_name), + Op.drop_db: DatabaseDropChecker(), + Op.drop_collection: CollectionDropChecker(collection_name=c_name), + Op.drop_partition: PartitionDropChecker(collection_name=c_name), + Op.load_collection: CollectionLoadChecker(collection_name=c_name), + Op.load_partition: PartitionLoadChecker(collection_name=c_name), + Op.release_collection: CollectionReleaseChecker(collection_name=c_name), + Op.release_partition: PartitionReleaseChecker(collection_name=c_name), + Op.insert: InsertChecker(collection_name=c_name), + Op.upsert: UpsertChecker(collection_name=c_name), + Op.flush: FlushChecker(collection_name=c_name), + Op.create_index: IndexCreateChecker(collection_name=c_name), + Op.drop_index: IndexDropChecker(collection_name=c_name), + Op.search: SearchChecker(collection_name=c_name), + Op.query: QueryChecker(collection_name=c_name), + Op.delete: DeleteChecker(collection_name=c_name), + Op.drop: CollectionDropChecker(collection_name=c_name) + } + self.health_checkers = checkers + + @pytest.mark.tags(CaseLabel.L3) + def test_operations(self, request_duration, is_check): + # start the monitor threads to check the milvus ops + log.info("*********************Test Start**********************") + log.info(connections.get_connection_addr('default')) + event_records = EventRecords() + c_name = None + event_records.insert("init_health_checkers", "start") + self.init_health_checkers(collection_name=c_name) + event_records.insert("init_health_checkers", "finished") + tasks = cc.start_monitor_threads(self.health_checkers) + log.info("*********************Load Start**********************") + # wait request_duration + request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "") + if request_duration[-1] == "+": + request_duration = request_duration[:-1] + request_duration = eval(request_duration) + for i in range(10): + sleep(request_duration // 10) + # add an event so that the chaos can start to apply + if i == 3: + event_records.insert("init_chaos", "ready") + for k, v in self.health_checkers.items(): + v.check_result() + if is_check: + assert_statistic(self.health_checkers, succ_rate_threshold=0.98) + assert_expectations() + # wait all pod ready + wait_pods_ready(self.milvus_ns, f"app.kubernetes.io/instance={self.release_name}") + time.sleep(60) + cc.check_thread_status(tasks) + for k, v in self.health_checkers.items(): + v.pause() + ra = ResultAnalyzer() + ra.get_stage_success_rate() + ra.show_result_table() + log.info("*********************Chaos Test Completed**********************") diff --git a/tests/python_client/chaos/testcases/test_concurrent_operation.py b/tests/python_client/chaos/testcases/test_concurrent_operation.py index e72ddcfbdc..c582c5a803 100644 --- a/tests/python_client/chaos/testcases/test_concurrent_operation.py +++ b/tests/python_client/chaos/testcases/test_concurrent_operation.py @@ -4,6 +4,7 @@ import json from time import sleep from pymilvus import connections from chaos.checker import (InsertChecker, + UpsertChecker, FlushChecker, SearchChecker, QueryChecker, @@ -70,6 +71,7 @@ class TestOperations(TestBase): c_name = collection_name checkers = { Op.insert: InsertChecker(collection_name=c_name), + Op.upsert: UpsertChecker(collection_name=c_name), Op.flush: FlushChecker(collection_name=c_name), Op.search: SearchChecker(collection_name=c_name), Op.query: QueryChecker(collection_name=c_name), diff --git a/tests/python_client/chaos/testcases/test_single_request_operation.py b/tests/python_client/chaos/testcases/test_single_request_operation.py index b7fa746ebb..e5f38afcb3 100644 --- a/tests/python_client/chaos/testcases/test_single_request_operation.py +++ b/tests/python_client/chaos/testcases/test_single_request_operation.py @@ -3,14 +3,15 @@ import time import pytest from time import sleep from pymilvus import connections -from chaos.checker import (CreateChecker, +from chaos.checker import (CollectionCreateChecker, InsertChecker, + UpsertChecker, FlushChecker, SearchChecker, QueryChecker, - IndexChecker, + IndexCreateChecker, DeleteChecker, - DropChecker, + CollectionDropChecker, Op, EventRecords, ResultAnalyzer @@ -61,14 +62,15 @@ class TestOperations(TestBase): def init_health_checkers(self, collection_name=None): c_name = collection_name checkers = { - Op.create: CreateChecker(collection_name=c_name), + Op.create: CollectionCreateChecker(collection_name=c_name), Op.insert: InsertChecker(collection_name=c_name), + Op.upsert: UpsertChecker(collection_name=c_name), Op.flush: FlushChecker(collection_name=c_name), - Op.index: IndexChecker(collection_name=c_name), + Op.index: IndexCreateChecker(collection_name=c_name), Op.search: SearchChecker(collection_name=c_name), Op.query: QueryChecker(collection_name=c_name), Op.delete: DeleteChecker(collection_name=c_name), - Op.drop: DropChecker(collection_name=c_name) + Op.drop: CollectionDropChecker(collection_name=c_name) } self.health_checkers = checkers diff --git a/tests/python_client/chaos/testcases/test_single_request_operation_for_rolling_update.py b/tests/python_client/chaos/testcases/test_single_request_operation_for_rolling_update.py index 4b6ec7640c..1e241ee3ca 100644 --- a/tests/python_client/chaos/testcases/test_single_request_operation_for_rolling_update.py +++ b/tests/python_client/chaos/testcases/test_single_request_operation_for_rolling_update.py @@ -6,14 +6,15 @@ from time import sleep from yaml import full_load from pymilvus import connections, utility -from chaos.checker import (CreateChecker, +from chaos.checker import (CollectionCreateChecker, InsertChecker, + UpsertChecker, FlushChecker, SearchChecker, QueryChecker, - IndexChecker, + IndexCreateChecker, DeleteChecker, - DropChecker, + CollectionDropChecker, Op) from utils.util_k8s import wait_pods_ready from utils.util_log import test_log as log @@ -61,14 +62,15 @@ class TestOperations(TestBase): schema = cf.gen_default_collection_schema(auto_id=False) checkers = { - Op.create: CreateChecker(collection_name=None, schema=schema), + Op.create: CollectionCreateChecker(collection_name=None, schema=schema), Op.insert: InsertChecker(collection_name=c_name, schema=schema), + Op.upsert: UpsertChecker(collection_name=c_name, schema=schema), Op.flush: FlushChecker(collection_name=c_name, schema=schema), - Op.index: IndexChecker(collection_name=None, schema=schema), + Op.index: IndexCreateChecker(collection_name=None, schema=schema), Op.search: SearchChecker(collection_name=c_name, schema=schema), Op.query: QueryChecker(collection_name=c_name, schema=schema), Op.delete: DeleteChecker(collection_name=c_name, schema=schema), - Op.drop: DropChecker(collection_name=None, schema=schema) + Op.drop: CollectionDropChecker(collection_name=None, schema=schema) } self.health_checkers = checkers @@ -132,9 +134,9 @@ class TestOperations(TestBase): v.pause() for k, v in self.health_checkers.items(): v.check_result() - for k, v in self.health_checkers.items(): + for k, v in self.health_checkers.items(): log.info(f"{k} failed request: {v.fail_records}") - for k, v in self.health_checkers.items(): + for k, v in self.health_checkers.items(): log.info(f"{k} rto: {v.get_rto()}") if is_check: assert_statistic(self.health_checkers, succ_rate_threshold=0.98) diff --git a/tests/python_client/chaos/testcases/test_single_request_operation_for_standby.py b/tests/python_client/chaos/testcases/test_single_request_operation_for_standby.py index ea82d14b81..c2b6e8313c 100644 --- a/tests/python_client/chaos/testcases/test_single_request_operation_for_standby.py +++ b/tests/python_client/chaos/testcases/test_single_request_operation_for_standby.py @@ -2,12 +2,12 @@ import pytest import threading from time import sleep from pymilvus import connections -from chaos.checker import (CreateChecker, +from chaos.checker import (CollectionCreateChecker, InsertChecker, FlushChecker, SearchChecker, QueryChecker, - IndexChecker, + IndexCreateChecker, DeleteChecker, Op) from utils.util_log import test_log as log @@ -60,10 +60,10 @@ class TestOperations(TestBase): def init_health_checkers(self, collection_name=None): c_name = collection_name checkers = { - Op.create: CreateChecker(collection_name=c_name), + Op.create: CollectionCreateChecker(collection_name=c_name), Op.insert: InsertChecker(collection_name=c_name), Op.flush: FlushChecker(collection_name=c_name), - Op.index: IndexChecker(collection_name=c_name), + Op.index: IndexCreateChecker(collection_name=c_name), Op.search: SearchChecker(collection_name=c_name), Op.query: QueryChecker(collection_name=c_name), Op.delete: DeleteChecker(collection_name=c_name), @@ -102,4 +102,4 @@ class TestOperations(TestBase): rto = v.get_rto() pytest.assume(rto < 30, f"{k} rto expect 30s but get {rto}s") # rto should be less than 30s - log.info("*********************Chaos Test Completed**********************") \ No newline at end of file + log.info("*********************Chaos Test Completed**********************") diff --git a/tests/python_client/chaos/testcases/test_verify_all_collections.py b/tests/python_client/chaos/testcases/test_verify_all_collections.py index 3d1315f2f3..38d420ec95 100644 --- a/tests/python_client/chaos/testcases/test_verify_all_collections.py +++ b/tests/python_client/chaos/testcases/test_verify_all_collections.py @@ -3,10 +3,11 @@ from time import sleep from collections import defaultdict from pymilvus import connections from chaos.checker import (InsertChecker, - FlushChecker, + UpsertChecker, + FlushChecker, SearchChecker, QueryChecker, - IndexChecker, + IndexCreateChecker, DeleteChecker, Op) from utils.util_log import test_log as log @@ -67,14 +68,15 @@ class TestOperations(TestBase): self.host = host self.port = port self.user = user - self.password = password + self.password = password def init_health_checkers(self, collection_name=None): c_name = collection_name checkers = { Op.insert: InsertChecker(collection_name=c_name), + Op.upsert: UpsertChecker(collection_name=c_name), Op.flush: FlushChecker(collection_name=c_name), - Op.index: IndexChecker(collection_name=c_name), + Op.index: IndexCreateChecker(collection_name=c_name), Op.search: SearchChecker(collection_name=c_name), Op.query: QueryChecker(collection_name=c_name), Op.delete: DeleteChecker(collection_name=c_name), diff --git a/tests/python_client/loadbalance/test_auto_load_balance.py b/tests/python_client/loadbalance/test_auto_load_balance.py index 739d995068..cb6b1b5ec1 100644 --- a/tests/python_client/loadbalance/test_auto_load_balance.py +++ b/tests/python_client/loadbalance/test_auto_load_balance.py @@ -1,7 +1,7 @@ from time import sleep from pymilvus import connections, list_collections, utility -from chaos.checker import (CreateChecker, InsertFlushChecker, - SearchChecker, QueryChecker, IndexChecker, Op) +from chaos.checker import (CollectionCreateChecker, InsertFlushChecker, + SearchChecker, QueryChecker, IndexCreateChecker, Op) from common.milvus_sys import MilvusSys from utils.util_log import test_log as log from chaos import chaos_commons as cc @@ -74,15 +74,15 @@ class TestAutoLoadBalance(object): conn = connections.connect("default", host=host, port=port) assert conn is not None self.health_checkers = { - Op.create: CreateChecker(), + Op.create: CollectionCreateChecker(), Op.insert: InsertFlushChecker(), Op.flush: InsertFlushChecker(flush=True), - Op.index: IndexChecker(), + Op.index: IndexCreateChecker(), Op.search: SearchChecker(), Op.query: QueryChecker() } cc.start_monitor_threads(self.health_checkers) - # wait + # wait sleep(constants.WAIT_PER_OP * 10) all_collections = list_collections() for c in all_collections: