mirror of https://github.com/milvus-io/milvus.git
test: update bulk insert bench (#31457)
use restful api to do bulk insert Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>pull/31003/head
parent
ee56ae7299
commit
d0dd962a2f
|
@ -18,6 +18,165 @@ from common.bulk_insert_data import (
|
|||
prepare_bulk_insert_csv_files,
|
||||
DataField as df,
|
||||
)
|
||||
import json
|
||||
import requests
|
||||
import time
|
||||
import uuid
|
||||
from utils.util_log import test_log as logger
|
||||
from minio import Minio
|
||||
from minio.error import S3Error
|
||||
|
||||
|
||||
def logger_request_response(response, url, tt, headers, data, str_data, str_response, method):
|
||||
if len(data) > 2000:
|
||||
data = data[:1000] + "..." + data[-1000:]
|
||||
try:
|
||||
if response.status_code == 200:
|
||||
if ('code' in response.json() and response.json()["code"] == 200) or (
|
||||
'Code' in response.json() and response.json()["Code"] == 0):
|
||||
logger.debug(
|
||||
f"\nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {str_data}, \nresponse: {str_response}")
|
||||
else:
|
||||
logger.debug(
|
||||
f"\nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {response.text}")
|
||||
else:
|
||||
logger.debug(
|
||||
f"method: \nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {response.text}")
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
f"method: \nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {response.text}, \nerror: {e}")
|
||||
|
||||
|
||||
class Requests:
|
||||
def __init__(self, url=None, api_key=None):
|
||||
self.url = url
|
||||
self.api_key = api_key
|
||||
self.headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': f'Bearer {self.api_key}',
|
||||
'RequestId': str(uuid.uuid1())
|
||||
}
|
||||
|
||||
def update_headers(self):
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': f'Bearer {self.api_key}',
|
||||
'RequestId': str(uuid.uuid1())
|
||||
}
|
||||
return headers
|
||||
|
||||
def post(self, url, headers=None, data=None, params=None):
|
||||
headers = headers if headers is not None else self.update_headers()
|
||||
data = json.dumps(data)
|
||||
str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data
|
||||
t0 = time.time()
|
||||
response = requests.post(url, headers=headers, data=data, params=params)
|
||||
tt = time.time() - t0
|
||||
str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text
|
||||
logger_request_response(response, url, tt, headers, data, str_data, str_response, "post")
|
||||
return response
|
||||
|
||||
def get(self, url, headers=None, params=None, data=None):
|
||||
headers = headers if headers is not None else self.update_headers()
|
||||
data = json.dumps(data)
|
||||
str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data
|
||||
t0 = time.time()
|
||||
if data is None or data == "null":
|
||||
response = requests.get(url, headers=headers, params=params)
|
||||
else:
|
||||
response = requests.get(url, headers=headers, params=params, data=data)
|
||||
tt = time.time() - t0
|
||||
str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text
|
||||
logger_request_response(response, url, tt, headers, data, str_data, str_response, "get")
|
||||
return response
|
||||
|
||||
def put(self, url, headers=None, data=None):
|
||||
headers = headers if headers is not None else self.update_headers()
|
||||
data = json.dumps(data)
|
||||
str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data
|
||||
t0 = time.time()
|
||||
response = requests.put(url, headers=headers, data=data)
|
||||
tt = time.time() - t0
|
||||
str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text
|
||||
logger_request_response(response, url, tt, headers, data, str_data, str_response, "put")
|
||||
return response
|
||||
|
||||
def delete(self, url, headers=None, data=None):
|
||||
headers = headers if headers is not None else self.update_headers()
|
||||
data = json.dumps(data)
|
||||
str_data = data[:200] + '...' + data[-200:] if len(data) > 400 else data
|
||||
t0 = time.time()
|
||||
response = requests.delete(url, headers=headers, data=data)
|
||||
tt = time.time() - t0
|
||||
str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text
|
||||
logger_request_response(response, url, tt, headers, data, str_data, str_response, "delete")
|
||||
return response
|
||||
|
||||
|
||||
class ImportJobClient(Requests):
|
||||
|
||||
def __init__(self, endpoint, token):
|
||||
super().__init__(url=endpoint, api_key=token)
|
||||
self.endpoint = endpoint
|
||||
self.api_key = token
|
||||
self.db_name = None
|
||||
self.headers = self.update_headers()
|
||||
|
||||
def update_headers(self):
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': f'Bearer {self.api_key}',
|
||||
'RequestId': str(uuid.uuid1())
|
||||
}
|
||||
return headers
|
||||
|
||||
def list_import_jobs(self, payload, db_name="default"):
|
||||
payload["dbName"] = db_name
|
||||
data = payload
|
||||
url = f'{self.endpoint}/v2/vectordb/jobs/import/list'
|
||||
response = self.post(url, headers=self.update_headers(), data=data)
|
||||
res = response.json()
|
||||
return res
|
||||
|
||||
def create_import_jobs(self, payload):
|
||||
url = f'{self.endpoint}/v2/vectordb/jobs/import/create'
|
||||
response = self.post(url, headers=self.update_headers(), data=payload)
|
||||
res = response.json()
|
||||
return res
|
||||
|
||||
def get_import_job_progress(self, task_id):
|
||||
payload = {
|
||||
"jobId": task_id
|
||||
}
|
||||
url = f'{self.endpoint}/v2/vectordb/jobs/import/get_progress'
|
||||
response = self.post(url, headers=self.update_headers(), data=payload)
|
||||
res = response.json()
|
||||
return res
|
||||
|
||||
def wait_import_job_completed(self, task_id_list, timeout=1800):
|
||||
success = False
|
||||
success_states = {}
|
||||
t0 = time.time()
|
||||
while time.time() - t0 < timeout:
|
||||
for task_id in task_id_list:
|
||||
res = self.get_import_job_progress(task_id)
|
||||
if res['data']['state'] == "Completed":
|
||||
success_states[task_id] = True
|
||||
else:
|
||||
success_states[task_id] = False
|
||||
time.sleep(5)
|
||||
# all task success then break
|
||||
if all(success_states.values()):
|
||||
success = True
|
||||
break
|
||||
states = []
|
||||
for task_id in task_id_list:
|
||||
res = self.get_import_job_progress(task_id)
|
||||
states.append({
|
||||
"task_id": task_id,
|
||||
"state": res['data']
|
||||
})
|
||||
return success, states
|
||||
|
||||
|
||||
default_vec_only_fields = [df.vec_field]
|
||||
|
@ -47,7 +206,7 @@ def entity_suffix(entities):
|
|||
|
||||
|
||||
class TestcaseBaseBulkInsert(TestcaseBase):
|
||||
|
||||
import_job_client = None
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def init_minio_client(self, minio_host):
|
||||
Path("/tmp/bulk_insert_data").mkdir(parents=True, exist_ok=True)
|
||||
|
@ -60,6 +219,10 @@ class TestcaseBaseBulkInsert(TestcaseBase):
|
|||
"minio_bucket_name"
|
||||
]
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def init_import_client(self, host, port, user, password):
|
||||
self.import_job_client = ImportJobClient(f"http://{host}:{port}", f"{user}:{password}")
|
||||
|
||||
|
||||
class TestBulkInsertPerf(TestcaseBaseBulkInsert):
|
||||
|
||||
|
@ -109,16 +272,21 @@ class TestBulkInsertPerf(TestcaseBaseBulkInsert):
|
|||
c_name = cf.gen_unique_str("bulk_insert")
|
||||
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
|
||||
self.collection_wrap.init_collection(c_name, schema=schema)
|
||||
payload = {
|
||||
"collectionName": c_name,
|
||||
"files": [files],
|
||||
}
|
||||
|
||||
# import data
|
||||
payload = {
|
||||
"collectionName": c_name,
|
||||
"files": [files],
|
||||
}
|
||||
t0 = time.time()
|
||||
task_id, _ = self.utility_wrap.do_bulk_insert(
|
||||
collection_name=c_name, files=files
|
||||
)
|
||||
logging.info(f"bulk insert task ids:{task_id}")
|
||||
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
|
||||
task_ids=[task_id], timeout=1800
|
||||
)
|
||||
rsp = self.import_job_client.create_import_jobs(payload)
|
||||
job_id_list = [rsp["data"]["jobId"]]
|
||||
logging.info(f"bulk insert job ids:{job_id_list}")
|
||||
success, states = self.import_job_client.wait_import_job_completed(job_id_list, timeout=1800)
|
||||
tt = time.time() - t0
|
||||
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
|
||||
assert success
|
||||
|
@ -170,14 +338,15 @@ class TestBulkInsertPerf(TestcaseBaseBulkInsert):
|
|||
self.collection_wrap.init_collection(c_name, schema=schema)
|
||||
|
||||
# import data
|
||||
payload = {
|
||||
"collectionName": c_name,
|
||||
"files": [files],
|
||||
}
|
||||
t0 = time.time()
|
||||
task_id, _ = self.utility_wrap.do_bulk_insert(
|
||||
collection_name=c_name, files=files
|
||||
)
|
||||
logging.info(f"bulk insert task ids:{task_id}")
|
||||
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
|
||||
task_ids=[task_id], timeout=1800
|
||||
)
|
||||
rsp = self.import_job_client.create_import_jobs(payload)
|
||||
job_id_list = [rsp["data"]["jobId"]]
|
||||
logging.info(f"bulk insert job ids:{job_id_list}")
|
||||
success, states = self.import_job_client.wait_import_job_completed(job_id_list, timeout=1800)
|
||||
tt = time.time() - t0
|
||||
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
|
||||
assert success
|
||||
|
@ -224,14 +393,15 @@ class TestBulkInsertPerf(TestcaseBaseBulkInsert):
|
|||
self.collection_wrap.init_collection(c_name, schema=schema)
|
||||
|
||||
# import data
|
||||
payload = {
|
||||
"collectionName": c_name,
|
||||
"files": [files],
|
||||
}
|
||||
t0 = time.time()
|
||||
task_id, _ = self.utility_wrap.do_bulk_insert(
|
||||
collection_name=c_name, files=files
|
||||
)
|
||||
logging.info(f"bulk insert task ids:{task_id}")
|
||||
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
|
||||
task_ids=[task_id], timeout=1800
|
||||
)
|
||||
rsp = self.import_job_client.create_import_jobs(payload)
|
||||
job_id_list = [rsp["data"]["jobId"]]
|
||||
logging.info(f"bulk insert job ids:{job_id_list}")
|
||||
success, states = self.import_job_client.wait_import_job_completed(job_id_list, timeout=1800)
|
||||
tt = time.time() - t0
|
||||
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
|
||||
assert success
|
||||
|
|
Loading…
Reference in New Issue