test: refine restful testcases trace (#34065)

pr: https://github.com/milvus-io/milvus/pull/34066

---------

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/34093/head
zhuwenxing 2024-06-24 10:40:03 +08:00 committed by GitHub
parent f4debe5e5e
commit 630a726f35
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 410 additions and 39 deletions

View File

@ -8,17 +8,75 @@ from minio.error import S3Error
from minio.commonconfig import CopySource
from tenacity import retry, retry_if_exception_type, stop_after_attempt
from requests.exceptions import ConnectionError
import urllib.parse
ENABLE_LOG_SAVE = False
def logger_request_response(response, url, tt, headers, data, str_data, str_response, method):
if len(data) > 2000:
data = data[:1000] + "..." + data[-1000:]
def simplify_list(lst):
if len(lst) > 20:
return [lst[0], '...', lst[-1]]
return lst
def simplify_dict(d):
if d is None:
d = {}
if len(d) > 20:
keys = list(d.keys())
d = {keys[0]: d[keys[0]], '...': '...', keys[-1]: d[keys[-1]]}
simplified = {}
for k, v in d.items():
if isinstance(v, list):
simplified[k] = simplify_list([simplify_dict(item) if isinstance(item, dict) else simplify_list(
item) if isinstance(item, list) else item for item in v])
elif isinstance(v, dict):
simplified[k] = simplify_dict(v)
else:
simplified[k] = v
return simplified
def build_curl_command(method, url, headers, data=None, params=None):
if isinstance(params, dict):
query_string = urllib.parse.urlencode(params)
url = f"{url}?{query_string}"
curl_cmd = [f"curl -X {method} '{url}'"]
for key, value in headers.items():
curl_cmd.append(f" -H '{key}: {value}'")
if data:
# process_and_simplify(data)
data = json.dumps(data, indent=4)
curl_cmd.append(f" -d '{data}'")
return " \\\n".join(curl_cmd)
def logger_request_response(response, url, tt, headers, data, str_data, str_response, method, params=None):
# save data to jsonl file
data_dict = json.loads(data) if data else {}
data_dict_simple = simplify_dict(data_dict)
if ENABLE_LOG_SAVE:
with open('request_response.jsonl', 'a') as f:
f.write(json.dumps({
"method": method,
"url": url,
"headers": headers,
"params": params,
"data": data_dict_simple,
"response": response.json()
}) + "\n")
data = json.dumps(data_dict_simple, indent=4)
try:
if response.status_code == 200:
if ('code' in response.json() and response.json()["code"] == 0) or (
'Code' in response.json() and response.json()["Code"] == 0):
logger.debug(
f"\nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {str_data}, \nresponse: {str_response}")
f"\nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {str_response}")
else:
logger.debug(
f"\nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {response.text}")
@ -30,21 +88,31 @@ def logger_request_response(response, url, tt, headers, data, str_data, str_resp
f"method: \nmethod: {method}, \nurl: {url}, \ncost time: {tt}, \nheader: {headers}, \npayload: {data}, \nresponse: {response.text}, \nerror: {e}")
class Requests:
class Requests():
uuid = str(uuid.uuid1())
api_key = None
def __init__(self, url=None, api_key=None):
self.url = url
self.api_key = api_key
if self.uuid is None:
self.uuid = str(uuid.uuid1())
self.headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}',
'RequestId': str(uuid.uuid1())
'RequestId': self.uuid
}
def update_headers(self):
@classmethod
def update_uuid(cls, _uuid):
cls.uuid = _uuid
@classmethod
def update_headers(cls):
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}',
'RequestId': str(uuid.uuid1())
'Authorization': f'Bearer {cls.api_key}',
'RequestId': cls.uuid
}
return headers
@ -59,7 +127,7 @@ class Requests:
response = requests.post(url, headers=headers, data=data, params=params)
tt = time.time() - t0
str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text
logger_request_response(response, url, tt, headers, data, str_data, str_response, "post")
logger_request_response(response, url, tt, headers, data, str_data, str_response, "post", params=params)
return response
@retry(retry=retry_if_exception_type(ConnectionError), stop=stop_after_attempt(3))
@ -74,7 +142,7 @@ class Requests:
response = requests.get(url, headers=headers, params=params, data=data)
tt = time.time() - t0
str_response = response.text[:200] + '...' + response.text[-200:] if len(response.text) > 400 else response.text
logger_request_response(response, url, tt, headers, data, str_data, str_response, "get")
logger_request_response(response, url, tt, headers, data, str_data, str_response, "get", params=params)
return response
@retry(retry=retry_if_exception_type(ConnectionError), stop=stop_after_attempt(3))
@ -111,12 +179,13 @@ class VectorClient(Requests):
self.db_name = None
self.headers = self.update_headers()
def update_headers(self):
@classmethod
def update_headers(cls):
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}',
'Authorization': f'Bearer {cls.api_key}',
'Accept-Type-Allow-Int64': "true",
'RequestId': str(uuid.uuid1())
'RequestId': cls.uuid
}
return headers
@ -195,8 +264,6 @@ class VectorClient(Requests):
return response.json()
def vector_query(self, payload, db_name="default", timeout=5):
time.sleep(1)
url = f'{self.endpoint}/v2/vectordb/entities/query'
@ -269,13 +336,14 @@ class CollectionClient(Requests):
self.db_name = None
self.headers = self.update_headers()
def update_headers(self, headers=None):
@classmethod
def update_headers(cls, headers=None):
if headers is not None:
return headers
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}',
'RequestId': str(uuid.uuid1())
'Authorization': f'Bearer {cls.api_key}',
'RequestId': cls.uuid
}
return headers
@ -415,11 +483,12 @@ class PartitionClient(Requests):
self.db_name = None
self.headers = self.update_headers()
def update_headers(self):
@classmethod
def update_headers(cls):
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}',
'RequestId': str(uuid.uuid1())
'Authorization': f'Bearer {cls.api_key}',
'RequestId': cls.uuid
}
return headers
@ -530,11 +599,12 @@ class UserClient(Requests):
self.db_name = None
self.headers = self.update_headers()
def update_headers(self):
@classmethod
def update_headers(cls):
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}',
'RequestId': str(uuid.uuid1())
'Authorization': f'Bearer {cls.api_key}',
'RequestId': cls.uuid
}
return headers
@ -594,11 +664,12 @@ class RoleClient(Requests):
self.headers = self.update_headers()
self.role_names = []
def update_headers(self):
@classmethod
def update_headers(cls):
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}',
'RequestId': str(uuid.uuid1())
'Authorization': f'Bearer {cls.api_key}',
'RequestId': cls.uuid
}
return headers
@ -653,11 +724,12 @@ class IndexClient(Requests):
self.db_name = None
self.headers = self.update_headers()
def update_headers(self):
@classmethod
def update_headers(cls):
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}',
'RequestId': str(uuid.uuid1())
'Authorization': f'Bearer {cls.api_key}',
'RequestId': cls.uuid
}
return headers
@ -714,11 +786,12 @@ class AliasClient(Requests):
self.db_name = None
self.headers = self.update_headers()
def update_headers(self):
@classmethod
def update_headers(cls):
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}',
'RequestId': str(uuid.uuid1())
'Authorization': f'Bearer {cls.api_key}',
'RequestId': cls.uuid
}
return headers
@ -765,11 +838,12 @@ class ImportJobClient(Requests):
self.db_name = None
self.headers = self.update_headers()
def update_headers(self):
@classmethod
def update_headers(cls):
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}',
'RequestId': str(uuid.uuid1())
'Authorization': f'Bearer {cls.api_key}',
'RequestId': cls.uuid
}
return headers

View File

@ -2,10 +2,11 @@ import json
import sys
import pytest
import time
import uuid
from pymilvus import connections, db
from utils.util_log import test_log as logger
from api.milvus import (VectorClient, CollectionClient, PartitionClient, IndexClient, AliasClient,
UserClient, RoleClient, ImportJobClient, StorageClient)
UserClient, RoleClient, ImportJobClient, StorageClient, Requests)
from utils.utils import get_data_by_payload
@ -35,7 +36,7 @@ class Base:
class TestBase(Base):
req = None
def teardown_method(self):
self.collection_client.api_key = self.api_key
all_collections = self.collection_client.collection_list()['data']
@ -49,19 +50,34 @@ class TestBase(Base):
except Exception as e:
logger.error(e)
# def setup_method(self):
# self.req = Requests()
# self.req.uuid = str(uuid.uuid1())
@pytest.fixture(scope="function", autouse=True)
def init_client(self, endpoint, token, minio_host, bucket_name, root_path):
_uuid = str(uuid.uuid1())
self.req = Requests()
self.req.update_uuid(_uuid)
self.endpoint = f"{endpoint}"
self.api_key = f"{token}"
self.invalid_api_key = "invalid_token"
self.vector_client = VectorClient(self.endpoint, self.api_key)
self.vector_client.update_uuid(_uuid)
self.collection_client = CollectionClient(self.endpoint, self.api_key)
self.collection_client.update_uuid(_uuid)
self.partition_client = PartitionClient(self.endpoint, self.api_key)
self.partition_client.update_uuid(_uuid)
self.index_client = IndexClient(self.endpoint, self.api_key)
self.index_client.update_uuid(_uuid)
self.alias_client = AliasClient(self.endpoint, self.api_key)
self.alias_client.update_uuid(_uuid)
self.user_client = UserClient(self.endpoint, self.api_key)
self.user_client.update_uuid(_uuid)
self.role_client = RoleClient(self.endpoint, self.api_key)
self.role_client.update_uuid(_uuid)
self.import_job_client = ImportJobClient(self.endpoint, self.api_key)
self.import_job_client.update_uuid(_uuid)
self.storage_client = StorageClient(f"{minio_host}:9000", "minioadmin", "minioadmin", bucket_name, root_path)
if token is None:
self.vector_client.api_key = None

View File

@ -238,6 +238,287 @@ class TestInsertVector(TestBase):
assert rsp['code'] == 0
assert len(rsp['data']) == 50
@pytest.mark.parametrize("insert_round", [1])
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("is_partition_key", [True])
@pytest.mark.parametrize("enable_dynamic_schema", [True])
@pytest.mark.parametrize("nb", [3000])
@pytest.mark.parametrize("dim", [128])
def test_insert_entities_with_all_vector_datatype_0(self, nb, dim, insert_round, auto_id,
is_partition_key, enable_dynamic_schema):
"""
Insert a vector with a simple payload
"""
# create a collection
name = gen_collection_name()
payload = {
"collectionName": name,
"schema": {
"autoId": auto_id,
"enableDynamicField": enable_dynamic_schema,
"fields": [
{"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}},
{"fieldName": "user_id", "dataType": "Int64", "isPartitionKey": is_partition_key,
"elementTypeParams": {}},
{"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}},
{"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}},
{"fieldName": "book_vector", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}},
{"fieldName": "float_vector", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}},
{"fieldName": "float16_vector", "dataType": "Float16Vector",
"elementTypeParams": {"dim": f"{dim}"}},
{"fieldName": "bfloat16_vector", "dataType": "BFloat16Vector",
"elementTypeParams": {"dim": f"{dim}"}},
]
},
"indexParams": [
{"fieldName": "book_vector", "indexName": "book_vector", "metricType": "L2",
"params": {"index_type": "FLAT"}},
{"fieldName": "float_vector", "indexName": "float_vector", "metricType": "L2",
"params": {"index_type": "IVF_FLAT", "nlist": 128}},
{"fieldName": "float16_vector", "indexName": "float16_vector", "metricType": "L2",
"params": {"index_type": "IVF_SQ8", "nlist": "128"}},
{"fieldName": "bfloat16_vector", "indexName": "bfloat16_vector", "metricType": "L2",
"params": {"index_type": "IVF_PQ", "nlist": 128, "m": 16, "nbits": 8}},
]
}
rsp = self.collection_client.collection_create(payload)
assert rsp['code'] == 0
rsp = self.collection_client.collection_describe(name)
logger.info(f"rsp: {rsp}")
assert rsp['code'] == 0
# insert data
for i in range(insert_round):
data = []
for i in range(nb):
if auto_id:
tmp = {
"user_id": i,
"word_count": i,
"book_describe": f"book_{i}",
"book_vector": gen_vector(datatype="FloatVector", dim=dim),
"float_vector": gen_vector(datatype="FloatVector", dim=dim),
"float16_vector": gen_vector(datatype="Float16Vector", dim=dim),
"bfloat16_vector": gen_vector(datatype="BFloat16Vector", dim=dim),
}
else:
tmp = {
"book_id": i,
"user_id": i,
"word_count": i,
"book_describe": f"book_{i}",
"book_vector": gen_vector(datatype="FloatVector", dim=dim),
"float_vector": gen_vector(datatype="FloatVector", dim=dim),
"float16_vector": gen_vector(datatype="Float16Vector", dim=dim),
"bfloat16_vector": gen_vector(datatype="BFloat16Vector", dim=dim),
}
if enable_dynamic_schema:
tmp.update({f"dynamic_field_{i}": i})
data.append(tmp)
payload = {
"collectionName": name,
"data": data,
}
rsp = self.vector_client.vector_insert(payload)
assert rsp['code'] == 0
assert rsp['data']['insertCount'] == nb
c = Collection(name)
res = c.query(
expr="user_id > 0",
limit=1,
output_fields=["*"],
)
logger.info(f"res: {res}")
# query data to make sure the data is inserted
rsp = self.vector_client.vector_query({"collectionName": name, "filter": "user_id > 0", "limit": 50})
assert rsp['code'] == 0
assert len(rsp['data']) == 50
@pytest.mark.parametrize("insert_round", [1])
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("is_partition_key", [True])
@pytest.mark.parametrize("enable_dynamic_schema", [True])
@pytest.mark.parametrize("nb", [3000])
@pytest.mark.parametrize("dim", [128])
def test_insert_entities_with_all_vector_datatype_1(self, nb, dim, insert_round, auto_id,
is_partition_key, enable_dynamic_schema):
"""
Insert a vector with a simple payload
"""
# create a collection
name = gen_collection_name()
payload = {
"collectionName": name,
"schema": {
"autoId": auto_id,
"enableDynamicField": enable_dynamic_schema,
"fields": [
{"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}},
{"fieldName": "user_id", "dataType": "Int64", "isPartitionKey": is_partition_key,
"elementTypeParams": {}},
{"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}},
{"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}},
{"fieldName": "float_vector", "dataType": "FloatVector", "elementTypeParams": {"dim": f"{dim}"}},
{"fieldName": "float16_vector", "dataType": "Float16Vector",
"elementTypeParams": {"dim": f"{dim}"}},
{"fieldName": "bfloat16_vector", "dataType": "BFloat16Vector",
"elementTypeParams": {"dim": f"{dim}"}},
]
},
"indexParams": [
{"fieldName": "float_vector", "indexName": "float_vector", "metricType": "L2",
"params": {"index_type": "HNSW", "M": 32, "efConstruction": 360}},
{"fieldName": "float16_vector", "indexName": "float16_vector", "metricType": "L2",
"params": {"index_type": "SCANN", "nlist": "128"}},
{"fieldName": "bfloat16_vector", "indexName": "bfloat16_vector", "metricType": "L2",
"params": {"index_type": "DISKANN"}},
]
}
rsp = self.collection_client.collection_create(payload)
assert rsp['code'] == 0
rsp = self.collection_client.collection_describe(name)
logger.info(f"rsp: {rsp}")
assert rsp['code'] == 0
# insert data
for i in range(insert_round):
data = []
for i in range(nb):
if auto_id:
tmp = {
"user_id": i,
"word_count": i,
"book_describe": f"book_{i}",
"float_vector": gen_vector(datatype="FloatVector", dim=dim),
"float16_vector": gen_vector(datatype="Float16Vector", dim=dim),
"bfloat16_vector": gen_vector(datatype="BFloat16Vector", dim=dim),
}
else:
tmp = {
"book_id": i,
"user_id": i,
"word_count": i,
"book_describe": f"book_{i}",
"float_vector": gen_vector(datatype="FloatVector", dim=dim),
"float16_vector": gen_vector(datatype="Float16Vector", dim=dim),
"bfloat16_vector": gen_vector(datatype="BFloat16Vector", dim=dim),
}
if enable_dynamic_schema:
tmp.update({f"dynamic_field_{i}": i})
data.append(tmp)
payload = {
"collectionName": name,
"data": data,
}
rsp = self.vector_client.vector_insert(payload)
assert rsp['code'] == 0
assert rsp['data']['insertCount'] == nb
c = Collection(name)
res = c.query(
expr="user_id > 0",
limit=1,
output_fields=["*"],
)
logger.info(f"res: {res}")
# query data to make sure the data is inserted
rsp = self.vector_client.vector_query({"collectionName": name, "filter": "user_id > 0", "limit": 50})
assert rsp['code'] == 0
assert len(rsp['data']) == 50
@pytest.mark.parametrize("insert_round", [1])
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("is_partition_key", [True])
@pytest.mark.parametrize("enable_dynamic_schema", [True])
@pytest.mark.parametrize("nb", [3000])
@pytest.mark.parametrize("dim", [128])
def test_insert_entities_with_all_vector_datatype_2(self, nb, dim, insert_round, auto_id,
is_partition_key, enable_dynamic_schema):
"""
Insert a vector with a simple payload
"""
# create a collection
name = gen_collection_name()
payload = {
"collectionName": name,
"schema": {
"autoId": auto_id,
"enableDynamicField": enable_dynamic_schema,
"fields": [
{"fieldName": "book_id", "dataType": "Int64", "isPrimary": True, "elementTypeParams": {}},
{"fieldName": "user_id", "dataType": "Int64", "isPartitionKey": is_partition_key,
"elementTypeParams": {}},
{"fieldName": "word_count", "dataType": "Int64", "elementTypeParams": {}},
{"fieldName": "book_describe", "dataType": "VarChar", "elementTypeParams": {"max_length": "256"}},
{"fieldName": "binary_vector_0", "dataType": "BinaryVector", "elementTypeParams": {"dim": f"{dim}"}},
{"fieldName": "binary_vector_1", "dataType": "BinaryVector", "elementTypeParams": {"dim": f"{dim}"}},
{"fieldName": "sparse_float_vector_0", "dataType": "SparseFloatVector"},
{"fieldName": "sparse_float_vector_1", "dataType": "SparseFloatVector"},
]
},
"indexParams": [
{"fieldName": "binary_vector_0", "indexName": "binary_vector_0_index", "metricType": "HAMMING",
"params": {"index_type": "BIN_FLAT"}},
{"fieldName": "binary_vector_1", "indexName": "binary_vector_1_index", "metricType": "HAMMING",
"params": {"index_type": "BIN_IVF_FLAT", "nlist": "512"}},
{"fieldName": "sparse_float_vector_0", "indexName": "sparse_float_vector_0_index", "metricType": "IP",
"params": {"index_type": "SPARSE_INVERTED_INDEX", "drop_ratio_build": "0.2"}},
{"fieldName": "sparse_float_vector_1", "indexName": "sparse_float_vector_1_index", "metricType": "IP",
"params": {"index_type": "SPARSE_WAND", "drop_ratio_build": "0.2"}}
]
}
rsp = self.collection_client.collection_create(payload)
assert rsp['code'] == 0
rsp = self.collection_client.collection_describe(name)
logger.info(f"rsp: {rsp}")
assert rsp['code'] == 0
# insert data
for i in range(insert_round):
data = []
for i in range(nb):
if auto_id:
tmp = {
"user_id": i,
"word_count": i,
"book_describe": f"book_{i}",
"binary_vector_0": gen_vector(datatype="BinaryVector", dim=dim),
"binary_vector_1": gen_vector(datatype="BinaryVector", dim=dim),
"sparse_float_vector_0": gen_vector(datatype="SparseFloatVector", dim=dim, sparse_format="dok"),
"sparse_float_vector_1": gen_vector(datatype="SparseFloatVector", dim=dim, sparse_format="dok"),
}
else:
tmp = {
"book_id": i,
"user_id": i,
"word_count": i,
"book_describe": f"book_{i}",
"binary_vector_0": gen_vector(datatype="BinaryVector", dim=dim),
"binary_vector_1": gen_vector(datatype="BinaryVector", dim=dim),
"sparse_float_vector_0": gen_vector(datatype="SparseFloatVector", dim=dim, sparse_format="dok"),
"sparse_float_vector_1": gen_vector(datatype="SparseFloatVector", dim=dim, sparse_format="dok"),
}
if enable_dynamic_schema:
tmp.update({f"dynamic_field_{i}": i})
data.append(tmp)
payload = {
"collectionName": name,
"data": data,
}
rsp = self.vector_client.vector_insert(payload)
assert rsp['code'] == 0
assert rsp['data']['insertCount'] == nb
c = Collection(name)
res = c.query(
expr="user_id > 0",
limit=1,
output_fields=["*"],
)
logger.info(f"res: {res}")
# query data to make sure the data is inserted
rsp = self.vector_client.vector_query({"collectionName": name, "filter": "user_id > 0", "limit": 50})
assert rsp['code'] == 0
assert len(rsp['data']) == 50
@pytest.mark.parametrize("insert_round", [1])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("is_partition_key", [True, False])