diff --git a/tests/python_client/bulk_insert/test_bulk_insert_bench.py b/tests/python_client/bulk_insert/test_bulk_insert_bench.py index c354557703..142a0e9f26 100644 --- a/tests/python_client/bulk_insert/test_bulk_insert_bench.py +++ b/tests/python_client/bulk_insert/test_bulk_insert_bench.py @@ -18,6 +18,165 @@ from common.bulk_insert_data import ( prepare_bulk_insert_csv_files, DataField as df, ) +import json +import requests +import time +import uuid +from utils.util_log import test_log as logger +from minio import Minio +from minio.error import S3Error + + +def logger_request_response(response, url, tt, headers, data, str_data, str_response, method): + if len(data) > 2000: + data = data[:1000] + "..." + data[-1000:] + try: + if response.status_code == 200: + if ('code' in response.json() and response.json()["code"] == 200) or ( + 'Code' in response.json() and response.json()["Code"] == 0): + logger.debug( + f"\nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {str_data}, \nresponse: {str_response}") + else: + logger.debug( + f"\nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {response.text}") + else: + logger.debug( + f"method: \nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {response.text}") + except Exception as e: + logger.debug( + f"method: \nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {response.text}, \nerror: {e}") + + +class Requests: + def __init__(self, url=None, api_key=None): + self.url = url + self.api_key = api_key + self.headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {self.api_key}', + 'RequestId': str(uuid.uuid1()) + } + + def update_headers(self): + headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {self.api_key}', + 'RequestId': str(uuid.uuid1()) + } + return headers + + def post(self, url, headers=None, data=None, params=None): + headers = headers if headers is not None else self.update_headers() + data = json.dumps(data) + str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data + t0 = time.time() + response = requests.post(url, headers=headers, data=data, params=params) + tt = time.time() - t0 + str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text + logger_request_response(response, url, tt, headers, data, str_data, str_response, "post") + return response + + def get(self, url, headers=None, params=None, data=None): + headers = headers if headers is not None else self.update_headers() + data = json.dumps(data) + str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data + t0 = time.time() + if data is None or data == "null": + response = requests.get(url, headers=headers, params=params) + else: + response = requests.get(url, headers=headers, params=params, data=data) + tt = time.time() - t0 + str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text + logger_request_response(response, url, tt, headers, data, str_data, str_response, "get") + return response + + def put(self, url, headers=None, data=None): + headers = headers if headers is not None else self.update_headers() + data = json.dumps(data) + str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data + t0 = time.time() + response = requests.put(url, headers=headers, data=data) + tt = time.time() - t0 + str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text + logger_request_response(response, url, tt, headers, data, str_data, str_response, "put") + return response + + def delete(self, url, headers=None, data=None): + headers = headers if headers is not None else self.update_headers() + data = json.dumps(data) + str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data + t0 = time.time() + response = requests.delete(url, headers=headers, data=data) + tt = time.time() - t0 + str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text + logger_request_response(response, url, tt, headers, data, str_data, str_response, "delete") + return response + + +class ImportJobClient(Requests): + + def __init__(self, endpoint, token): + super().__init__(url=endpoint, api_key=token) + self.endpoint = endpoint + self.api_key = token + self.db_name = None + self.headers = self.update_headers() + + def update_headers(self): + headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {self.api_key}', + 'RequestId': str(uuid.uuid1()) + } + return headers + + def list_import_jobs(self, payload, db_name="default"): + payload["dbName"] = db_name + data = payload + url = f'{self.endpoint}/v2/vectordb/jobs/import/list' + response = self.post(url, headers=self.update_headers(), data=data) + res = response.json() + return res + + def create_import_jobs(self, payload): + url = f'{self.endpoint}/v2/vectordb/jobs/import/create' + response = self.post(url, headers=self.update_headers(), data=payload) + res = response.json() + return res + + def get_import_job_progress(self, task_id): + payload = { + "jobId": task_id + } + url = f'{self.endpoint}/v2/vectordb/jobs/import/get_progress' + response = self.post(url, headers=self.update_headers(), data=payload) + res = response.json() + return res + + def wait_import_job_completed(self, task_id_list, timeout=1800): + success = False + success_states = {} + t0 = time.time() + while time.time() - t0 < timeout: + for task_id in task_id_list: + res = self.get_import_job_progress(task_id) + if res['data']['state'] == "Completed": + success_states[task_id] = True + else: + success_states[task_id] = False + time.sleep(5) + # all task success then break + if all(success_states.values()): + success = True + break + states = [] + for task_id in task_id_list: + res = self.get_import_job_progress(task_id) + states.append({ + "task_id": task_id, + "state": res['data'] + }) + return success, states default_vec_only_fields = [df.vec_field] @@ -47,7 +206,7 @@ def entity_suffix(entities): class TestcaseBaseBulkInsert(TestcaseBase): - + import_job_client = None @pytest.fixture(scope="function", autouse=True) def init_minio_client(self, minio_host): Path("/tmp/bulk_insert_data").mkdir(parents=True, exist_ok=True) @@ -60,6 +219,10 @@ class TestcaseBaseBulkInsert(TestcaseBase): "minio_bucket_name" ] + @pytest.fixture(scope="function", autouse=True) + def init_import_client(self, host, port, user, password): + self.import_job_client = ImportJobClient(f"http://{host}:{port}", f"{user}:{password}") + class TestBulkInsertPerf(TestcaseBaseBulkInsert): @@ -109,16 +272,21 @@ class TestBulkInsertPerf(TestcaseBaseBulkInsert): c_name = cf.gen_unique_str("bulk_insert") schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) self.collection_wrap.init_collection(c_name, schema=schema) + payload = { + "collectionName": c_name, + "files": [files], + } # import data + payload = { + "collectionName": c_name, + "files": [files], + } t0 = time.time() - task_id, _ = self.utility_wrap.do_bulk_insert( - collection_name=c_name, files=files - ) - logging.info(f"bulk insert task ids:{task_id}") - success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=[task_id], timeout=1800 - ) + rsp = self.import_job_client.create_import_jobs(payload) + job_id_list = [rsp["data"]["jobId"]] + logging.info(f"bulk insert job ids:{job_id_list}") + success, states = self.import_job_client.wait_import_job_completed(job_id_list, timeout=1800) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt} with states:{states}") assert success @@ -170,14 +338,15 @@ class TestBulkInsertPerf(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data + payload = { + "collectionName": c_name, + "files": [files], + } t0 = time.time() - task_id, _ = self.utility_wrap.do_bulk_insert( - collection_name=c_name, files=files - ) - logging.info(f"bulk insert task ids:{task_id}") - success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=[task_id], timeout=1800 - ) + rsp = self.import_job_client.create_import_jobs(payload) + job_id_list = [rsp["data"]["jobId"]] + logging.info(f"bulk insert job ids:{job_id_list}") + success, states = self.import_job_client.wait_import_job_completed(job_id_list, timeout=1800) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt} with states:{states}") assert success @@ -224,14 +393,15 @@ class TestBulkInsertPerf(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data + payload = { + "collectionName": c_name, + "files": [files], + } t0 = time.time() - task_id, _ = self.utility_wrap.do_bulk_insert( - collection_name=c_name, files=files - ) - logging.info(f"bulk insert task ids:{task_id}") - success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=[task_id], timeout=1800 - ) + rsp = self.import_job_client.create_import_jobs(payload) + job_id_list = [rsp["data"]["jobId"]] + logging.info(f"bulk insert job ids:{job_id_list}") + success, states = self.import_job_client.wait_import_job_completed(job_id_list, timeout=1800) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt} with states:{states}") assert success