mirror of https://github.com/milvus-io/milvus.git
test: add import checker to chaos test (#32908)
add import checker to chaos test Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>pull/32931/head
parent
bb7765cbd6
commit
9a269f1489
|
@ -13,6 +13,7 @@ from prettytable import PrettyTable
|
|||
import functools
|
||||
from time import sleep
|
||||
from pymilvus import AnnSearchRequest, RRFRanker
|
||||
from pymilvus import RemoteBulkWriter, BulkFileType
|
||||
from base.database_wrapper import ApiDatabaseWrapper
|
||||
from base.collection_wrapper import ApiCollectionWrapper
|
||||
from base.partition_wrapper import ApiPartitionWrapper
|
||||
|
@ -1539,7 +1540,7 @@ class BulkInsertChecker(Checker):
|
|||
"""check bulk insert operations in a dependent thread"""
|
||||
|
||||
def __init__(self, collection_name=None, files=[], use_one_collection=False, dim=ct.default_dim,
|
||||
schema=None, insert_data=False):
|
||||
schema=None, insert_data=False, minio_endpoint=None, bucket_name=None):
|
||||
if collection_name is None:
|
||||
collection_name = cf.gen_unique_str("BulkInsertChecker_")
|
||||
super().__init__(collection_name=collection_name, dim=dim, schema=schema, insert_data=insert_data)
|
||||
|
@ -1548,8 +1549,32 @@ class BulkInsertChecker(Checker):
|
|||
self.files = files
|
||||
self.recheck_failed_task = False
|
||||
self.failed_tasks = []
|
||||
self.failed_tasks_id = []
|
||||
self.use_one_collection = use_one_collection # if True, all tasks will use one collection to bulk insert
|
||||
self.c_name = collection_name
|
||||
self.minio_endpoint = minio_endpoint
|
||||
self.bucket_name = bucket_name
|
||||
|
||||
def prepare(self, data_size=100000):
|
||||
with RemoteBulkWriter(
|
||||
schema=self.schema,
|
||||
file_type=BulkFileType.NUMPY,
|
||||
remote_path="bulk_data",
|
||||
connect_param=RemoteBulkWriter.ConnectParam(
|
||||
endpoint=self.minio_endpoint,
|
||||
access_key="minioadmin",
|
||||
secret_key="minioadmin",
|
||||
bucket_name=self.bucket_name
|
||||
)
|
||||
) as remote_writer:
|
||||
|
||||
for i in range(data_size):
|
||||
row = cf.get_row_data_by_schema(nb=1, schema=self.schema)[0]
|
||||
remote_writer.append_row(row)
|
||||
remote_writer.commit()
|
||||
batch_files = remote_writer.batch_files
|
||||
log.info(f"batch files: {batch_files}")
|
||||
self.files = batch_files[0]
|
||||
|
||||
def update(self, files=None, schema=None):
|
||||
if files is not None:
|
||||
|
@ -1557,6 +1582,13 @@ class BulkInsertChecker(Checker):
|
|||
if schema is not None:
|
||||
self.schema = schema
|
||||
|
||||
def get_bulk_insert_task_state(self):
|
||||
state_map = {}
|
||||
for task_id in self.failed_tasks_id:
|
||||
state, _ = self.utility_wrap.get_bulk_insert_state(task_id=task_id)
|
||||
state_map[task_id] = state
|
||||
return state_map
|
||||
|
||||
@trace()
|
||||
def bulk_insert(self):
|
||||
log.info(f"bulk insert collection name: {self.c_name}")
|
||||
|
@ -1584,9 +1616,11 @@ class BulkInsertChecker(Checker):
|
|||
log.info(f"after bulk insert, collection {self.c_name} has num entities {num_entities}")
|
||||
if not completed:
|
||||
self.failed_tasks.append(self.c_name)
|
||||
self.failed_tasks_id.append(task_ids)
|
||||
return task_ids, completed
|
||||
|
||||
def keep_running(self):
|
||||
self.prepare()
|
||||
while self._keep_running:
|
||||
self.run_task()
|
||||
sleep(constants.WAIT_PER_OP / 10)
|
||||
|
|
|
@ -12,6 +12,7 @@ def pytest_addoption(parser):
|
|||
parser.addoption("--chaos_interval", action="store", default="2m", help="chaos_interval")
|
||||
parser.addoption("--is_check", action="store", type=bool, default=False, help="is_check")
|
||||
parser.addoption("--wait_signal", action="store", type=bool, default=True, help="wait_signal")
|
||||
parser.addoption("--enable_import", action="store", type=bool, default=False, help="enable_import")
|
||||
parser.addoption("--collection_num", action="store", default="1", help="collection_num")
|
||||
|
||||
|
||||
|
@ -68,3 +69,8 @@ def is_check(request):
|
|||
@pytest.fixture
|
||||
def wait_signal(request):
|
||||
return request.config.getoption("--wait_signal")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def enable_import(request):
|
||||
return request.config.getoption("--enable_import")
|
||||
|
|
|
@ -5,6 +5,7 @@ from time import sleep
|
|||
from pymilvus import connections
|
||||
from chaos.checker import (CollectionCreateChecker,
|
||||
InsertChecker,
|
||||
BulkInsertChecker,
|
||||
UpsertChecker,
|
||||
FlushChecker,
|
||||
SearchChecker,
|
||||
|
@ -43,7 +44,7 @@ class TestBase:
|
|||
class TestOperations(TestBase):
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def connection(self, host, port, user, password, milvus_ns):
|
||||
def connection(self, host, port, user, password, milvus_ns, minio_host, enable_import):
|
||||
if user and password:
|
||||
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
|
||||
connections.connect('default', host=host, port=port, user=user, password=password, secure=True)
|
||||
|
@ -59,6 +60,10 @@ class TestOperations(TestBase):
|
|||
self.milvus_sys = MilvusSys(alias='default')
|
||||
self.milvus_ns = milvus_ns
|
||||
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
|
||||
self.enable_import = enable_import
|
||||
self.minio_endpoint = f"{minio_host}:9000"
|
||||
self.ms = MilvusSys()
|
||||
self.bucket_name = self.ms.index_nodes[0]["infos"]["system_configurations"]["minio_bucket_name"]
|
||||
|
||||
def init_health_checkers(self, collection_name=None):
|
||||
c_name = collection_name
|
||||
|
@ -74,6 +79,10 @@ class TestOperations(TestBase):
|
|||
Op.delete: DeleteChecker(collection_name=c_name),
|
||||
Op.drop: CollectionDropChecker(collection_name=c_name)
|
||||
}
|
||||
if bool(self.enable_import):
|
||||
checkers[Op.bulk_insert] = BulkInsertChecker(collection_name=c_name,
|
||||
bucket_name=self.bucket_name,
|
||||
minio_endpoint=self.minio_endpoint)
|
||||
self.health_checkers = checkers
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L3)
|
||||
|
|
Loading…
Reference in New Issue