[test]Remove is_row_based for bulk insert chaos test (#20174)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/20168/head
zhuwenxing 2022-10-28 20:55:33 +08:00 committed by GitHub
parent f32f23ca2b
commit e6b937c7a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 24 deletions

View File

@ -585,23 +585,19 @@ class BulkInsertChecker(Checker):
self.utility_wrap = ApiUtilityWrapper()
self.schema = cf.gen_default_collection_schema()
self.files = files
self.is_row_based = True
self.recheck_failed_task = False
self.failed_tasks = []
self.c_name = None
def update(self, files=None, schema=None, is_row_based=None):
def update(self, files=None, schema=None):
if files is not None:
self.files = files
if schema is not None:
self.schema = schema
if is_row_based is not None:
self.is_row_based = is_row_based
@trace()
def bulk_insert(self):
task_ids, result = self.utility_wrap.bulk_insert(collection_name=self.c_name,
is_row_based=self.is_row_based,
files=self.files)
completed, result = self.utility_wrap.wait_for_bulk_insert_tasks_completed(task_ids=task_ids, timeout=60)
return task_ids, completed

View File

@ -9,18 +9,16 @@ from time import sleep
from pathlib import Path
from minio import Minio
from pymilvus import connections
from chaos.checker import (InsertFlushChecker, SearchChecker, QueryChecker, BulkInsertChecker, Op)
from chaos.checker import (BulkInsertChecker, Op)
from common.cus_resource_opts import CustomResourceOperations as CusResource
from common.milvus_sys import MilvusSys
from utils.util_log import test_log as log
from utils.util_k8s import wait_pods_ready, get_pod_list, get_pod_ip_name_pairs, get_milvus_instance_name
from utils.util_common import findkeys, update_key_value
from utils.util_k8s import wait_pods_ready, get_milvus_deploy_tool, get_pod_ip_name_pairs, get_milvus_instance_name
from utils.util_common import update_key_value
from chaos import chaos_commons as cc
from common.common_type import CaseLabel
from common import common_func as cf
from chaos import constants
# from bulk_insert.bulk_insert_data import gen_file_name
from bulk_insert.minio_comm import copy_files_to_minio
from delayed_assert import expect, assert_expectations
@ -69,7 +67,7 @@ class TestChaosBase:
class TestChaos(TestChaosBase):
@pytest.fixture(scope="function", autouse=True)
def connection(self, host, port):
def connection(self, host, port, milvus_ns):
connections.add_connection(default={"host": host, "port": port})
connections.connect(alias='default')
@ -79,6 +77,10 @@ class TestChaos(TestChaosBase):
self.host = host
self.port = port
self.instance_name = instance_name
self.milvus_sys = MilvusSys(alias='default')
self.milvus_ns = milvus_ns
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
self.deploy_by = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys)
@pytest.fixture(scope="function", autouse=True)
def init_health_checkers(self):
@ -86,18 +88,22 @@ class TestChaos(TestChaosBase):
checkers = {
# Op.insert: InsertFlushChecker(collection_name=c_name),
# Op.search: SearchChecker(collection_name=c_name, replica_number=2),
Op.bulk_insert: BulkLoadChecker()
Op.bulk_insert: BulkInsertChecker()
# Op.query: QueryChecker(collection_name=c_name, replica_number=2)
}
self.health_checkers = checkers
@pytest.fixture(scope="function", autouse=True)
def prepare_bulk_insert(self, nb=1000, is_row_based=True):
def prepare_bulk_insert(self, nb=3000):
if Op.bulk_insert not in self.health_checkers:
log.info("bulk_insert checker is not in health checkers, skip prepare bulk load")
return
log.info("bulk_insert checker is in health checkers, prepare data firstly")
release_name = self.instance_name
deploy_tool = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys)
if deploy_tool == "helm":
release_name = self.instance_name
else:
release_name = self.instance_name + "-minio"
minio_ip_pod_pair = get_pod_ip_name_pairs("chaos-testing", f"release={release_name}, app=minio")
ms = MilvusSys()
minio_ip = list(minio_ip_pod_pair.keys())[0]
@ -107,15 +113,12 @@ class TestChaos(TestChaosBase):
schema = cf.gen_default_collection_schema()
data = cf.gen_default_list_data_for_bulk_insert(nb=nb)
fields_name = [field.name for field in schema.fields]
if not is_row_based:
data_dict = dict(zip(fields_name, data))
if is_row_based:
entities = []
for i in range(nb):
entity_value = [field_values[i] for field_values in data]
entity = dict(zip(fields_name, entity_value))
entities.append(entity)
data_dict = {"rows": entities}
entities = []
for i in range(nb):
entity_value = [field_values[i] for field_values in data]
entity = dict(zip(fields_name, entity_value))
entities.append(entity)
data_dict = {"rows": entities}
file_name = "/tmp/ci_logs/bulk_insert_data_source.json"
files = [file_name]
#TODO: npy file type is not supported so far
@ -125,7 +128,7 @@ class TestChaos(TestChaosBase):
log.info("upload file to minio")
client = Minio(minio_endpoint, access_key="minioadmin", secret_key="minioadmin", secure=False)
client.fput_object(bucket_name, file_name, file_name)
self.health_checkers[Op.bulk_insert].update(schema=schema, files=files, is_row_based=is_row_based)
self.health_checkers[Op.bulk_insert].update(schema=schema, files=files)
log.info("prepare data for bulk load done")
def teardown(self):