test: add more request type checker for test (#29210)

add more request type checker for test
* partition 
* database
* upsert

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/29218/head
zhuwenxing 2023-12-14 19:38:45 +08:00 committed by GitHub
parent 8a63e53421
commit 6efb7afd3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 619 additions and 145 deletions

View File

@ -12,7 +12,9 @@ from datetime import datetime
from prettytable import PrettyTable
import functools
from time import sleep
from base.database_wrapper import ApiDatabaseWrapper
from base.collection_wrapper import ApiCollectionWrapper
from base.partition_wrapper import ApiPartitionWrapper
from base.utility_wrapper import ApiUtilityWrapper
from common import common_func as cf
from common import common_type as ct
@ -195,15 +197,30 @@ class ResultAnalyzer:
class Op(Enum):
create = 'create'
create = 'create' # short name for create collection
create_db = 'create_db'
create_collection = 'create_collection'
create_partition = 'create_partition'
insert = 'insert'
upsert = 'upsert'
flush = 'flush'
index = 'index'
create_index = 'create_index'
drop_index = 'drop_index'
load = 'load'
load_collection = 'load_collection'
load_partition = 'load_partition'
release = 'release'
release_collection = 'release_collection'
release_partition = 'release_partition'
search = 'search'
query = 'query'
delete = 'delete'
compact = 'compact'
drop = 'drop'
drop = 'drop' # short name for drop collection
drop_db = 'drop_db'
drop_collection = 'drop_collection'
drop_partition = 'drop_partition'
load_balance = 'load_balance'
bulk_insert = 'bulk_insert'
unknown = 'unknown'
@ -288,7 +305,8 @@ class Checker:
b. count operations and success rate
"""
def __init__(self, collection_name=None, shards_num=2, dim=ct.default_dim, insert_data=True, schema=None):
def __init__(self, collection_name=None, partition_name=None, shards_num=2, dim=ct.default_dim, insert_data=True,
schema=None):
self.recovery_time = 0
self._succ = 0
self._fail = 0
@ -299,11 +317,16 @@ class Checker:
self.files = []
self.ms = MilvusSys()
self.bucket_name = self.ms.index_nodes[0]["infos"]["system_configurations"]["minio_bucket_name"]
self.db_wrap = ApiDatabaseWrapper()
self.c_wrap = ApiCollectionWrapper()
self.p_wrap = ApiPartitionWrapper()
self.utility_wrap = ApiUtilityWrapper()
c_name = collection_name if collection_name is not None else cf.gen_unique_str(
'Checker_')
self.c_name = c_name
p_name = partition_name if partition_name is not None else "_default"
self.p_name = p_name
self.p_names = [self.p_name] if partition_name is not None else None
schema = cf.gen_default_collection_schema(dim=dim) if schema is None else schema
self.schema = schema
self.dim = cf.get_dim_by_schema(schema=schema)
@ -314,17 +337,27 @@ class Checker:
shards_num=shards_num,
timeout=timeout,
enable_traceback=enable_traceback)
self.p_wrap.init_partition(self.c_name, self.p_name)
if insert_data:
log.info(f"collection {c_name} created, start to insert data")
t0 = time.perf_counter()
self.c_wrap.insert(
data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=schema, start=0),
partition_name=self.p_name,
timeout=timeout,
enable_traceback=enable_traceback)
log.info(f"insert data for collection {c_name} cost {time.perf_counter() - t0}s")
self.initial_entities = self.c_wrap.num_entities # do as a flush
def insert_data(self, nb=constants.ENTITIES_FOR_SEARCH, partition_name=None):
partition_name = self.p_name if partition_name is None else partition_name
self.c_wrap.insert(
data=cf.get_column_data_by_schema(nb=nb, schema=self.schema, start=0),
partition_name=partition_name,
timeout=timeout,
enable_traceback=enable_traceback)
def total(self):
return self._succ + self._fail
@ -407,6 +440,140 @@ class Checker:
return task_ids, completed
class CollectionLoadChecker(Checker):
"""check collection load operations in a dependent thread"""
def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
self.replica_number = replica_number
if collection_name is None:
collection_name = cf.gen_unique_str("LoadChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str('index_'),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
@trace()
def load_collection(self):
res, result = self.c_wrap.load(replica_number=self.replica_number)
return res, result
@exception_handler()
def run_task(self):
res, result = self.load_collection()
if result:
self.c_wrap.release()
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP)
class CollectionReleaseChecker(Checker):
"""check collection release operations in a dependent thread"""
def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
self.replica_number = replica_number
if collection_name is None:
collection_name = cf.gen_unique_str("LoadChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str('index_'),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
self.c_wrap.load(replica_number=self.replica_number)
@trace()
def release_collection(self):
res, result = self.c_wrap.release()
return res, result
@exception_handler()
def run_task(self):
res, result = self.release_collection()
if result:
self.c_wrap.release()
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP)
class PartitionLoadChecker(Checker):
"""check partition load operations in a dependent thread"""
def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
self.replica_number = replica_number
if collection_name is None:
collection_name = cf.gen_unique_str("LoadChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str('index_'),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
@trace()
def load_partition(self):
res, result = self.p_wrap.load(replica_number=self.replica_number)
return res, result
@exception_handler()
def run_task(self):
res, result = self.load_partition()
if result:
self.p_wrap.release()
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP)
class PartitionReleaseChecker(Checker):
"""check partition release operations in a dependent thread"""
def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None, ):
self.replica_number = replica_number
if collection_name is None:
collection_name = cf.gen_unique_str("LoadChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str('index_'),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
self.p_wrap.load(replica_number=self.replica_number)
@trace()
def release_partition(self):
res, result = self.p_wrap.release()
return res, result
@exception_handler()
def run_task(self):
res, result = self.release_partition()
if result:
self.p_wrap.load(replica_number=self.replica_number)
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP)
class SearchChecker(Checker):
"""check search operations in a dependent thread"""
@ -422,6 +589,7 @@ class SearchChecker(Checker):
check_task=CheckTasks.check_nothing)
# do load before search
self.c_wrap.load(replica_number=replica_number)
self.insert_data()
@trace()
def search(self):
@ -430,6 +598,7 @@ class SearchChecker(Checker):
anns_field=self.float_vector_field_name,
param=constants.DEFAULT_SEARCH_PARAM,
limit=1,
partition_names=self.p_names,
timeout=search_timeout,
check_task=CheckTasks.check_nothing
)
@ -525,7 +694,7 @@ class FlushChecker(Checker):
class InsertChecker(Checker):
"""check flush operations in a dependent thread"""
"""check insert operations in a dependent thread"""
def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None):
if collection_name is None:
@ -540,7 +709,7 @@ class InsertChecker(Checker):
self.file_name = f"/tmp/ci_logs/insert_data_{uuid.uuid4()}.parquet"
@trace()
def insert(self):
def insert_entities(self):
data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema)
ts_data = []
for i in range(constants.DELTA_PER_INS):
@ -551,6 +720,7 @@ class InsertChecker(Checker):
data[0] = ts_data # set timestamp (ms) as int64
log.debug(f"insert data: {ts_data}")
res, result = self.c_wrap.insert(data=data,
partition_names=self.p_names,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
@ -561,7 +731,7 @@ class InsertChecker(Checker):
@exception_handler()
def run_task(self):
res, result = self.insert()
res, result = self.insert_entities()
return res, result
def keep_running(self):
@ -599,8 +769,36 @@ class InsertChecker(Checker):
pytest.assume(set(data_in_server) == set(data_in_client))
class CreateChecker(Checker):
"""check create operations in a dependent thread"""
class UpsertChecker(Checker):
"""check upsert operations in a dependent thread"""
def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("InsertChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
@trace()
def upsert_entities(self):
data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema)
res, result = self.c_wrap.upsert(data=data,
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
return res, result
@exception_handler()
def run_task(self):
res, result = self.upsert_entities()
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
class CollectionCreateChecker(Checker):
"""check collection create operations in a dependent thread"""
def __init__(self, collection_name=None, schema=None):
if collection_name is None:
@ -630,8 +828,180 @@ class CreateChecker(Checker):
sleep(constants.WAIT_PER_OP)
class IndexChecker(Checker):
"""check Insert operations in a dependent thread"""
class CollectionDropChecker(Checker):
"""check collection drop operations in a dependent thread"""
def __init__(self, collection_name=None, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("DropChecker_")
super().__init__(collection_name=collection_name, schema=schema)
self.collection_pool = []
self.gen_collection_pool(schema=self.schema)
def gen_collection_pool(self, pool_size=50, schema=None):
for i in range(pool_size):
collection_name = cf.gen_unique_str("DropChecker_")
res, result = self.c_wrap.init_collection(name=collection_name, schema=schema)
if result:
self.collection_pool.append(collection_name)
@trace()
def drop_collection(self):
res, result = self.c_wrap.drop()
if result:
self.collection_pool.remove(self.c_wrap.name)
return res, result
@exception_handler()
def run_task(self):
res, result = self.drop_collection()
return res, result
def keep_running(self):
while self._keep_running:
res, result = self.run_task()
if result:
try:
if len(self.collection_pool) <= 10:
self.gen_collection_pool(schema=self.schema)
except Exception as e:
log.error(f"Failed to generate collection pool: {e}")
try:
c_name = self.collection_pool[0]
self.c_wrap.init_collection(name=c_name)
except Exception as e:
log.error(f"Failed to init new collection: {e}")
sleep(constants.WAIT_PER_OP)
class PartitionCreateChecker(Checker):
"""check partition create operations in a dependent thread"""
def __init__(self, collection_name=None, schema=None, partition_name=None):
if collection_name is None:
collection_name = cf.gen_unique_str("PartitionCreateChecker_")
super().__init__(collection_name=collection_name, schema=schema, partition_name=partition_name)
@trace()
def create_partition(self):
res, result = self.p_wrap.init_partition(collection=self.c_name,
name=cf.gen_unique_str("PartitionCreateChecker_"),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing
)
return res, result
@exception_handler()
def run_task(self):
res, result = self.create_partition()
if result:
self.p_wrap.drop(timeout=timeout)
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP)
class PartitionDropChecker(Checker):
"""check partition drop operations in a dependent thread"""
def __init__(self, collection_name=None, schema=None, partition_name=None):
if collection_name is None:
collection_name = cf.gen_unique_str("PartitionDropChecker_")
super().__init__(collection_name=collection_name, schema=schema, partition_name=partition_name)
self.p_wrap.init_partition(collection=self.c_name,
name=cf.gen_unique_str("PartitionDropChecker_"),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing
)
@trace()
def drop_partition(self):
res, result = self.p_wrap.drop()
return res, result
@exception_handler()
def run_task(self):
res, result = self.drop_partition()
if result:
self.p_wrap.init_partition(collection=self.c_name,
name=cf.gen_unique_str("PartitionDropChecker_"),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing
)
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP)
class DatabaseCreateChecker(Checker):
"""check create database operations in a dependent thread"""
def __init__(self, collection_name=None, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("DatabaseChecker_")
super().__init__(collection_name=collection_name, schema=schema)
self.db_name = None
@trace()
def init_db(self):
db_name = cf.gen_unique_str("db_")
res, result = self.db_wrap.create_database(db_name)
self.db_name = db_name
return res, result
@exception_handler()
def run_task(self):
res, result = self.init_db()
if result:
self.db_wrap.drop_database(self.db_name)
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP)
class DatabaseDropChecker(Checker):
"""check drop database operations in a dependent thread"""
def __init__(self, collection_name=None, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("DatabaseChecker_")
super().__init__(collection_name=collection_name, schema=schema)
self.db_name = cf.gen_unique_str("db_")
self.db_wrap.create_database(self.db_name)
@trace()
def drop_db(self):
res, result = self.db_wrap.drop_database(self.db_name)
return res, result
@exception_handler()
def run_task(self):
res, result = self.drop_db()
if result:
self.db_name = cf.gen_unique_str("db_")
self.db_wrap.create_database(self.db_name)
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP)
class IndexCreateChecker(Checker):
"""check index create operations in a dependent thread"""
def __init__(self, collection_name=None, schema=None):
if collection_name is None:
@ -666,6 +1036,47 @@ class IndexChecker(Checker):
sleep(constants.WAIT_PER_OP * 6)
class IndexDropChecker(Checker):
"""check index drop operations in a dependent thread"""
def __init__(self, collection_name=None, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("IndexChecker_")
super().__init__(collection_name=collection_name, schema=schema)
self.index_name = cf.gen_unique_str('index_')
for i in range(5):
self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
timeout=timeout, enable_traceback=enable_traceback)
# do as a flush before indexing
log.debug(f"Index ready entities: {self.c_wrap.num_entities}")
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=self.index_name,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
@trace()
def drop_index(self):
res, result = self.c_wrap.drop_index(timeout=timeout)
return res, result
@exception_handler()
def run_task(self):
res, result = self.drop_index()
if result:
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=self.index_name,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP * 6)
class QueryChecker(Checker):
"""check query operations in a dependent thread"""
@ -681,6 +1092,7 @@ class QueryChecker(Checker):
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
self.c_wrap.load(replica_number=replica_number) # do load before query
self.insert_data()
self.term_expr = None
@trace()
@ -704,40 +1116,6 @@ class QueryChecker(Checker):
sleep(constants.WAIT_PER_OP / 10)
class LoadChecker(Checker):
"""check load operations in a dependent thread"""
def __init__(self, collection_name=None, replica_number=1, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("LoadChecker_")
super().__init__(collection_name=collection_name, schema=schema)
self.replica_number = replica_number
res, result = self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str(
'index_'),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
@trace()
def load(self):
res, result = self.c_wrap.load(replica_number=self.replica_number, timeout=timeout)
return res, result
@exception_handler()
def run_task(self):
res, result = self.load()
if result:
self.c_wrap.release()
return res, result
def keep_running(self):
while self._keep_running:
self.run_task()
sleep(constants.WAIT_PER_OP / 10)
class DeleteChecker(Checker):
"""check delete operations in a dependent thread"""
@ -753,6 +1131,7 @@ class DeleteChecker(Checker):
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
self.c_wrap.load() # load before query
self.insert_data()
term_expr = f'{self.int64_field_name} > 0'
res, _ = self.c_wrap.query(term_expr, output_fields=[
self.int64_field_name])
@ -760,7 +1139,7 @@ class DeleteChecker(Checker):
self.expr = None
@trace()
def delete(self):
def delete_entities(self):
res, result = self.c_wrap.delete(expr=self.expr, timeout=timeout)
return res, result
@ -768,7 +1147,7 @@ class DeleteChecker(Checker):
def run_task(self):
delete_ids = self.ids.pop()
self.expr = f'{self.int64_field_name} in {[delete_ids]}'
res, result = self.delete()
res, result = self.delete_entities()
return res, result
def keep_running(self):
@ -812,54 +1191,8 @@ class CompactChecker(Checker):
sleep(constants.WAIT_PER_OP / 10)
class DropChecker(Checker):
"""check drop operations in a dependent thread"""
def __init__(self, collection_name=None, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("DropChecker_")
super().__init__(collection_name=collection_name, schema=schema)
self.collection_pool = []
self.gen_collection_pool(schema=self.schema)
def gen_collection_pool(self, pool_size=50, schema=None):
for i in range(pool_size):
collection_name = cf.gen_unique_str("DropChecker_")
res, result = self.c_wrap.init_collection(name=collection_name, schema=schema)
if result:
self.collection_pool.append(collection_name)
@trace()
def drop(self):
res, result = self.c_wrap.drop()
if result:
self.collection_pool.remove(self.c_wrap.name)
return res, result
@exception_handler()
def run_task(self):
res, result = self.drop()
return res, result
def keep_running(self):
while self._keep_running:
res, result = self.run_task()
if result:
try:
if len(self.collection_pool) <= 10:
self.gen_collection_pool(schema=self.schema)
except Exception as e:
log.error(f"Failed to generate collection pool: {e}")
try:
c_name = self.collection_pool[0]
self.c_wrap.init_collection(name=c_name)
except Exception as e:
log.error(f"Failed to init new collection: {e}")
sleep(constants.WAIT_PER_OP)
class LoadBalanceChecker(Checker):
"""check loadbalance operations in a dependent thread"""
"""check load balance operations in a dependent thread"""
def __init__(self, collection_name=None, schema=None):
if collection_name is None:
@ -912,7 +1245,7 @@ class LoadBalanceChecker(Checker):
class BulkInsertChecker(Checker):
"""check bulk load operations in a dependent thread"""
"""check bulk insert operations in a dependent thread"""
def __init__(self, collection_name=None, files=[], use_one_collection=False, dim=ct.default_dim,
schema=None, insert_data=False):

View File

@ -6,8 +6,8 @@ import json
from time import sleep
from pymilvus import connections
from chaos.checker import (CreateChecker, InsertChecker, FlushChecker,
SearchChecker, QueryChecker, IndexChecker, DeleteChecker, Op)
from chaos.checker import (CollectionCreateChecker, InsertChecker, FlushChecker,
SearchChecker, QueryChecker, IndexCreateChecker, DeleteChecker, Op)
from common.cus_resource_opts import CustomResourceOperations as CusResource
from utils.util_log import test_log as log
from utils.util_k8s import wait_pods_ready, get_pod_list
@ -102,10 +102,10 @@ class TestChaos(TestChaosBase):
@pytest.fixture(scope="function", autouse=True)
def init_health_checkers(self):
checkers = {
Op.create: CreateChecker(),
Op.create: CollectionCreateChecker(),
Op.insert: InsertChecker(),
Op.flush: FlushChecker(),
Op.index: IndexChecker(),
Op.index: IndexCreateChecker(),
Op.search: SearchChecker(),
Op.query: QueryChecker(),
Op.delete: DeleteChecker()

View File

@ -9,7 +9,7 @@ import datetime
from pymilvus import connections
from base.collection_wrapper import ApiCollectionWrapper
from base.utility_wrapper import ApiUtilityWrapper
from chaos.checker import Op, CreateChecker, InsertFlushChecker, IndexChecker, SearchChecker, QueryChecker
from chaos.checker import Op, CollectionCreateChecker, InsertFlushChecker, IndexCreateChecker, SearchChecker, QueryChecker
from common.cus_resource_opts import CustomResourceOperations as CusResource
from common import common_func as cf
from common import common_type as ct
@ -74,7 +74,7 @@ class TestChaosData:
# wait memory stress
sleep(constants.WAIT_PER_OP * 2)
# try to do release, load, query and serach in a duration time loop
# try to do release, load, query and search in a duration time loop
try:
start = time.time()
while time.time() - start < eval(duration):
@ -215,10 +215,10 @@ class TestChaosData:
expected: Verify milvus operation succ rate
"""
mic_checkers = {
Op.create: CreateChecker(),
Op.create: CollectionCreateChecker(),
Op.insert: InsertFlushChecker(),
Op.flush: InsertFlushChecker(flush=True),
Op.index: IndexChecker(),
Op.index: IndexCreateChecker(),
Op.search: SearchChecker(),
Op.query: QueryChecker()
}
@ -285,7 +285,7 @@ class TestMemoryStressReplica:
@pytest.mark.skip(reason="https://github.com/milvus-io/milvus/issues/16887")
@pytest.mark.tags(CaseLabel.L3)
def test_memory_stress_replicas_befor_load(self, prepare_collection):
def test_memory_stress_replicas_before_load(self, prepare_collection):
"""
target: test querynode group load with insufficient memory
method: 1.Limit querynode memory ? 2Gi
@ -353,7 +353,7 @@ class TestMemoryStressReplica:
def test_memory_stress_replicas_group_insufficient(self, prepare_collection, mode):
"""
target: test apply stress memory on different number querynodes and the group failed to load,
bacause of the memory is insufficient
because of the memory is insufficient
method: 1.Limit querynodes memory 5Gi
2.Create collection and insert 1000,000 entities
3.Apply memory stress on querynodes and it's memory is not enough to load replicas
@ -529,7 +529,7 @@ class TestMemoryStressReplicaLoadBalance:
chaos_res.delete(metadata_name=chaos_config.get('metadata', None).get('name', None))
# Verfiy auto load loadbalance
# Verify auto load loadbalance
seg_info_after, _ = utility_w.get_query_segment_info(collection_w.name)
seg_distribution_after = cf.get_segment_distribution(seg_info_after)
segments_num_after = len(seg_distribution_after[chaos_querynode_id]["sealed"])
@ -549,7 +549,7 @@ class TestMemoryStressReplicaLoadBalance:
method: 1.Limit all querynodes memory 6Gi
2.Create and insert 1000,000 entities
3.Load collection with two replicas
4.Apply memory stress on one grooup 80%
4.Apply memory stress on one group 80%
expected: Verify that load balancing across groups is not occurring
"""
collection_w = prepare_collection
@ -586,7 +586,7 @@ class TestMemoryStressReplicaLoadBalance:
chaos_res.delete(metadata_name=chaos_config.get('metadata', None).get('name', None))
# Verfiy auto load loadbalance
# Verify auto load loadbalance
seg_info_after, _ = utility_w.get_query_segment_info(collection_w.name)
seg_distribution_before = cf.get_segment_distribution(seg_info_before)
seg_distribution_after = cf.get_segment_distribution(seg_info_after)

View File

@ -4,15 +4,15 @@ import json
from time import sleep
from minio import Minio
from pymilvus import connections
from chaos.checker import (CreateChecker,
from chaos.checker import (CollectionCreateChecker,
InsertChecker,
FlushChecker,
SearchChecker,
QueryChecker,
IndexChecker,
IndexCreateChecker,
DeleteChecker,
CompactChecker,
DropChecker,
CollectionDropChecker,
LoadBalanceChecker,
BulkInsertChecker,
Op)
@ -56,15 +56,15 @@ class TestChaos(TestChaosBase):
def init_health_checkers(self):
c_name = cf.gen_unique_str("Checker_")
checkers = {
# Op.create: CreateChecker(collection_name=c_name),
# Op.create: CollectionCreateChecker(collection_name=c_name),
# Op.insert: InsertChecker(collection_name=c_name),
# Op.flush: FlushChecker(collection_name=c_name),
# Op.query: QueryChecker(collection_name=c_name),
# Op.search: SearchChecker(collection_name=c_name),
# Op.delete: DeleteChecker(collection_name=c_name),
# Op.compact: CompactChecker(collection_name=c_name),
# Op.index: IndexChecker(),
# Op.drop: DropChecker(),
# Op.index: IndexCreateChecker(),
# Op.drop: CollectionDropChecker(),
# Op.bulk_insert: BulkInsertChecker(),
Op.load_balance: LoadBalanceChecker()
}

View File

@ -0,0 +1,133 @@
import time
import pytest
from time import sleep
from pymilvus import connections
from chaos.checker import (
DatabaseCreateChecker,
DatabaseDropChecker,
CollectionCreateChecker,
CollectionDropChecker,
PartitionCreateChecker,
PartitionDropChecker,
CollectionLoadChecker,
CollectionReleaseChecker,
PartitionLoadChecker,
PartitionReleaseChecker,
IndexCreateChecker,
IndexDropChecker,
InsertChecker,
UpsertChecker,
DeleteChecker,
FlushChecker,
SearchChecker,
QueryChecker,
Op,
EventRecords,
ResultAnalyzer
)
from utils.util_log import test_log as log
from utils.util_k8s import wait_pods_ready, get_milvus_instance_name
from chaos import chaos_commons as cc
from common.common_type import CaseLabel
from common.milvus_sys import MilvusSys
from chaos.chaos_commons import assert_statistic
from chaos import constants
from delayed_assert import assert_expectations
class TestBase:
expect_create = constants.SUCC
expect_insert = constants.SUCC
expect_flush = constants.SUCC
expect_index = constants.SUCC
expect_search = constants.SUCC
expect_query = constants.SUCC
host = '127.0.0.1'
port = 19530
_chaos_config = None
health_checkers = {}
class TestOperations(TestBase):
@pytest.fixture(scope="function", autouse=True)
def connection(self, host, port, user, password, milvus_ns):
if user and password:
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
connections.connect('default', host=host, port=port, user=user, password=password, secure=True)
else:
connections.connect('default', host=host, port=port)
if connections.has_connection("default") is False:
raise Exception("no connections")
log.info("connect to milvus successfully")
self.host = host
self.port = port
self.user = user
self.password = password
self.milvus_sys = MilvusSys(alias='default')
self.milvus_ns = milvus_ns
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
def init_health_checkers(self, collection_name=None):
c_name = collection_name
checkers = {
Op.create_db: DatabaseCreateChecker(),
Op.create_collection: CollectionCreateChecker(collection_name=c_name),
Op.create_partition: PartitionCreateChecker(collection_name=c_name),
Op.drop_db: DatabaseDropChecker(),
Op.drop_collection: CollectionDropChecker(collection_name=c_name),
Op.drop_partition: PartitionDropChecker(collection_name=c_name),
Op.load_collection: CollectionLoadChecker(collection_name=c_name),
Op.load_partition: PartitionLoadChecker(collection_name=c_name),
Op.release_collection: CollectionReleaseChecker(collection_name=c_name),
Op.release_partition: PartitionReleaseChecker(collection_name=c_name),
Op.insert: InsertChecker(collection_name=c_name),
Op.upsert: UpsertChecker(collection_name=c_name),
Op.flush: FlushChecker(collection_name=c_name),
Op.create_index: IndexCreateChecker(collection_name=c_name),
Op.drop_index: IndexDropChecker(collection_name=c_name),
Op.search: SearchChecker(collection_name=c_name),
Op.query: QueryChecker(collection_name=c_name),
Op.delete: DeleteChecker(collection_name=c_name),
Op.drop: CollectionDropChecker(collection_name=c_name)
}
self.health_checkers = checkers
@pytest.mark.tags(CaseLabel.L3)
def test_operations(self, request_duration, is_check):
# start the monitor threads to check the milvus ops
log.info("*********************Test Start**********************")
log.info(connections.get_connection_addr('default'))
event_records = EventRecords()
c_name = None
event_records.insert("init_health_checkers", "start")
self.init_health_checkers(collection_name=c_name)
event_records.insert("init_health_checkers", "finished")
tasks = cc.start_monitor_threads(self.health_checkers)
log.info("*********************Load Start**********************")
# wait request_duration
request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "")
if request_duration[-1] == "+":
request_duration = request_duration[:-1]
request_duration = eval(request_duration)
for i in range(10):
sleep(request_duration // 10)
# add an event so that the chaos can start to apply
if i == 3:
event_records.insert("init_chaos", "ready")
for k, v in self.health_checkers.items():
v.check_result()
if is_check:
assert_statistic(self.health_checkers, succ_rate_threshold=0.98)
assert_expectations()
# wait all pod ready
wait_pods_ready(self.milvus_ns, f"app.kubernetes.io/instance={self.release_name}")
time.sleep(60)
cc.check_thread_status(tasks)
for k, v in self.health_checkers.items():
v.pause()
ra = ResultAnalyzer()
ra.get_stage_success_rate()
ra.show_result_table()
log.info("*********************Chaos Test Completed**********************")

View File

@ -4,6 +4,7 @@ import json
from time import sleep
from pymilvus import connections
from chaos.checker import (InsertChecker,
UpsertChecker,
FlushChecker,
SearchChecker,
QueryChecker,
@ -70,6 +71,7 @@ class TestOperations(TestBase):
c_name = collection_name
checkers = {
Op.insert: InsertChecker(collection_name=c_name),
Op.upsert: UpsertChecker(collection_name=c_name),
Op.flush: FlushChecker(collection_name=c_name),
Op.search: SearchChecker(collection_name=c_name),
Op.query: QueryChecker(collection_name=c_name),

View File

@ -3,14 +3,15 @@ import time
import pytest
from time import sleep
from pymilvus import connections
from chaos.checker import (CreateChecker,
from chaos.checker import (CollectionCreateChecker,
InsertChecker,
UpsertChecker,
FlushChecker,
SearchChecker,
QueryChecker,
IndexChecker,
IndexCreateChecker,
DeleteChecker,
DropChecker,
CollectionDropChecker,
Op,
EventRecords,
ResultAnalyzer
@ -61,14 +62,15 @@ class TestOperations(TestBase):
def init_health_checkers(self, collection_name=None):
c_name = collection_name
checkers = {
Op.create: CreateChecker(collection_name=c_name),
Op.create: CollectionCreateChecker(collection_name=c_name),
Op.insert: InsertChecker(collection_name=c_name),
Op.upsert: UpsertChecker(collection_name=c_name),
Op.flush: FlushChecker(collection_name=c_name),
Op.index: IndexChecker(collection_name=c_name),
Op.index: IndexCreateChecker(collection_name=c_name),
Op.search: SearchChecker(collection_name=c_name),
Op.query: QueryChecker(collection_name=c_name),
Op.delete: DeleteChecker(collection_name=c_name),
Op.drop: DropChecker(collection_name=c_name)
Op.drop: CollectionDropChecker(collection_name=c_name)
}
self.health_checkers = checkers

View File

@ -6,14 +6,15 @@ from time import sleep
from yaml import full_load
from pymilvus import connections, utility
from chaos.checker import (CreateChecker,
from chaos.checker import (CollectionCreateChecker,
InsertChecker,
UpsertChecker,
FlushChecker,
SearchChecker,
QueryChecker,
IndexChecker,
IndexCreateChecker,
DeleteChecker,
DropChecker,
CollectionDropChecker,
Op)
from utils.util_k8s import wait_pods_ready
from utils.util_log import test_log as log
@ -61,14 +62,15 @@ class TestOperations(TestBase):
schema = cf.gen_default_collection_schema(auto_id=False)
checkers = {
Op.create: CreateChecker(collection_name=None, schema=schema),
Op.create: CollectionCreateChecker(collection_name=None, schema=schema),
Op.insert: InsertChecker(collection_name=c_name, schema=schema),
Op.upsert: UpsertChecker(collection_name=c_name, schema=schema),
Op.flush: FlushChecker(collection_name=c_name, schema=schema),
Op.index: IndexChecker(collection_name=None, schema=schema),
Op.index: IndexCreateChecker(collection_name=None, schema=schema),
Op.search: SearchChecker(collection_name=c_name, schema=schema),
Op.query: QueryChecker(collection_name=c_name, schema=schema),
Op.delete: DeleteChecker(collection_name=c_name, schema=schema),
Op.drop: DropChecker(collection_name=None, schema=schema)
Op.drop: CollectionDropChecker(collection_name=None, schema=schema)
}
self.health_checkers = checkers

View File

@ -2,12 +2,12 @@ import pytest
import threading
from time import sleep
from pymilvus import connections
from chaos.checker import (CreateChecker,
from chaos.checker import (CollectionCreateChecker,
InsertChecker,
FlushChecker,
SearchChecker,
QueryChecker,
IndexChecker,
IndexCreateChecker,
DeleteChecker,
Op)
from utils.util_log import test_log as log
@ -60,10 +60,10 @@ class TestOperations(TestBase):
def init_health_checkers(self, collection_name=None):
c_name = collection_name
checkers = {
Op.create: CreateChecker(collection_name=c_name),
Op.create: CollectionCreateChecker(collection_name=c_name),
Op.insert: InsertChecker(collection_name=c_name),
Op.flush: FlushChecker(collection_name=c_name),
Op.index: IndexChecker(collection_name=c_name),
Op.index: IndexCreateChecker(collection_name=c_name),
Op.search: SearchChecker(collection_name=c_name),
Op.query: QueryChecker(collection_name=c_name),
Op.delete: DeleteChecker(collection_name=c_name),

View File

@ -3,10 +3,11 @@ from time import sleep
from collections import defaultdict
from pymilvus import connections
from chaos.checker import (InsertChecker,
UpsertChecker,
FlushChecker,
SearchChecker,
QueryChecker,
IndexChecker,
IndexCreateChecker,
DeleteChecker,
Op)
from utils.util_log import test_log as log
@ -73,8 +74,9 @@ class TestOperations(TestBase):
c_name = collection_name
checkers = {
Op.insert: InsertChecker(collection_name=c_name),
Op.upsert: UpsertChecker(collection_name=c_name),
Op.flush: FlushChecker(collection_name=c_name),
Op.index: IndexChecker(collection_name=c_name),
Op.index: IndexCreateChecker(collection_name=c_name),
Op.search: SearchChecker(collection_name=c_name),
Op.query: QueryChecker(collection_name=c_name),
Op.delete: DeleteChecker(collection_name=c_name),

View File

@ -1,7 +1,7 @@
from time import sleep
from pymilvus import connections, list_collections, utility
from chaos.checker import (CreateChecker, InsertFlushChecker,
SearchChecker, QueryChecker, IndexChecker, Op)
from chaos.checker import (CollectionCreateChecker, InsertFlushChecker,
SearchChecker, QueryChecker, IndexCreateChecker, Op)
from common.milvus_sys import MilvusSys
from utils.util_log import test_log as log
from chaos import chaos_commons as cc
@ -74,10 +74,10 @@ class TestAutoLoadBalance(object):
conn = connections.connect("default", host=host, port=port)
assert conn is not None
self.health_checkers = {
Op.create: CreateChecker(),
Op.create: CollectionCreateChecker(),
Op.insert: InsertFlushChecker(),
Op.flush: InsertFlushChecker(flush=True),
Op.index: IndexChecker(),
Op.index: IndexCreateChecker(),
Op.search: SearchChecker(),
Op.query: QueryChecker()
}