test: update import test case to support different dim (#33709)

add test case for https://github.com/milvus-io/milvus/issues/33681

---------

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/33834/head
zhuwenxing 2024-06-13 17:11:55 +08:00 committed by GitHub
parent 144ee269f2
commit ca1f7ab019
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 126 additions and 61 deletions

View File

@ -491,10 +491,16 @@ def gen_sparse_vectors(rows, sparse_format="dok"):
return vectors
def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128, array_length=None, sparse_format="dok"):
def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128, array_length=None, sparse_format="dok", **kwargs):
if array_length is None:
array_length = random.randint(0, 10)
schema = kwargs.get("schema", None)
schema = schema.to_dict() if schema is not None else None
if schema is not None:
fields = schema.get("fields", [])
for field in fields:
if data_field == field["name"] and "params" in field:
dim = field["params"].get("dim", dim)
data = []
if rows > 0:
if "vec" in data_field:
@ -618,10 +624,18 @@ def gen_json_files(is_row_based, rows, dim, auto_id, str_pk,
def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, dim=128, array_length=None, enable_dynamic_field=False, **kwargs):
schema = kwargs.get("schema", None)
schema = schema.to_dict() if schema is not None else None
data = []
for r in range(rows):
d = {}
for data_field in data_fields:
if schema is not None:
fields = schema.get("fields", [])
for field in fields:
if data_field == field["name"] and "params" in field:
dim = field["params"].get("dim", dim)
if "vec" in data_field:
if "float" in data_field:
float_vector = True
@ -718,19 +732,24 @@ def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_
def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_nums=1, err_type="", force=False, enable_dynamic_field=False, include_meta=True, **kwargs):
# gen numpy files
schema = kwargs.get("schema", None)
schema = schema.to_dict() if schema is not None else None
u_id = f"numpy-{uuid.uuid4()}"
data_source_new = f"{data_source}/{u_id}"
schema_file = f"{data_source_new}/schema.json"
Path(schema_file).parent.mkdir(parents=True, exist_ok=True)
if schema is not None:
data = schema.to_dict()
with open(schema_file, "w") as f:
json.dump(data, f)
json.dump(schema, f)
files = []
start_uid = 0
if file_nums == 1:
# gen the numpy file without subfolders if only one set of files
for data_field in data_fields:
if schema is not None:
fields = schema.get("fields", [])
for field in fields:
if data_field == field["name"] and "params" in field:
dim = field["params"].get("dim", dim)
if "vec" in data_field:
vector_type = "float32"
if "float" in data_field:
@ -745,6 +764,7 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
if "fp16" in data_field:
float_vector = True
vector_type = "fp16"
file_name = gen_vectors_in_numpy_file(dir=data_source_new, data_field=data_field, float_vector=float_vector,
vector_type=vector_type, rows=rows, dim=dim, force=force)
elif data_field == DataField.string_field: # string field for numpy not supported yet at 2022-10-17
@ -830,7 +850,7 @@ def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_
all_field_data = {}
for data_field in data_fields:
data = gen_data_by_data_field(data_field=data_field, rows=rows, start=0,
float_vector=float_vector, dim=dim, array_length=array_length, sparse_format=sparse_format)
float_vector=float_vector, dim=dim, array_length=array_length, sparse_format=sparse_format, **kwargs)
all_field_data[data_field] = data
if enable_dynamic_field and include_meta:
all_field_data["$meta"] = gen_dynamic_field_data_in_parquet_file(rows=rows, start=0)

View File

@ -1,7 +1,7 @@
[pytest]
addopts = --host localhost --html=/tmp/ci_logs/report.html --self-contained-html -v --log-cli-level=INFO --capture=no
addopts = --host localhost --html=/tmp/ci_logs/report.html --self-contained-html -v
# python3 -W ignore -m pytest
log_format = [%(asctime)s - %(levelname)s - %(name)s]: %(message)s (%(filename)s:%(lineno)s)

View File

@ -1,4 +1,5 @@
import logging
import random
import time
import pytest
from pymilvus import DataType
@ -187,6 +188,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
auto_id=auto_id,
str_pk=string_pk,
data_fields=data_fields,
schema=schema,
)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
@ -245,7 +247,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128])
@pytest.mark.parametrize("entities", [3000])
@pytest.mark.parametrize("entities", [2000])
def test_partition_float_vector_int_scalar(
self, is_row_based, auto_id, dim, entities
):
@ -472,6 +474,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
dim=dim,
data_fields=[df.pk_field, df.float_field, df.float_vec_field],
force=True,
schema=schema
)
# import data
t0 = time.time()
@ -630,14 +633,13 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_string_field(name=df.string_field),
cf.gen_json_field(name=df.json_field),
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
cf.gen_float_vec_field(name=df.image_float_vec_field, dim=dim),
cf.gen_float_vec_field(name=df.text_float_vec_field, dim=dim),
cf.gen_binary_vec_field(name=df.binary_vec_field, dim=dim)
]
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema)
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_new_json_files(
minio_endpoint=self.minio_endpoint,
@ -647,11 +649,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
data_fields=data_fields,
enable_dynamic_field=enable_dynamic_field,
force=True,
schema=schema
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema)
# create index and load before bulk insert
scalar_field_list = [df.int_field, df.float_field, df.double_field, df.string_field]
scalar_fields = [f.name for f in fields if f.name in scalar_field_list]
@ -746,7 +745,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.parametrize("entities", [2000])
@pytest.mark.parametrize("enable_dynamic_field", [True])
@pytest.mark.parametrize("enable_partition_key", [True, False])
def test_bulk_insert_all_field_with_new_json_format(self, auto_id, dim, entities, enable_dynamic_field, enable_partition_key):
@ -758,6 +757,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
2. import data
3. verify
"""
float_vec_field_dim = dim
binary_vec_field_dim = ((dim+random.randint(-16, 32)) // 8) * 8
bf16_vec_field_dim = dim+random.randint(-16, 32)
fp16_vec_field_dim = dim+random.randint(-16, 32)
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
@ -768,10 +771,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
cf.gen_binary_vec_field(name=df.binary_vec_field, dim=dim),
cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=dim),
cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim)
cf.gen_float_vec_field(name=df.float_vec_field, dim=float_vec_field_dim),
cf.gen_binary_vec_field(name=df.binary_vec_field, dim=binary_vec_field_dim),
cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=bf16_vec_field_dim),
cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=fp16_vec_field_dim)
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
self._connect()
@ -815,7 +818,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
)
for f in [df.bf16_vec_field, df.fp16_vec_field]:
self.collection_wrap.create_index(
field_name=f, index_params={"index_type": "FLAT", "metric_type": "COSINE"}
field_name=f, index_params=index_params
)
for f in binary_vec_fields:
self.collection_wrap.create_index(
@ -825,12 +828,24 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
for field_name in float_vec_fields:
for f in [df.float_vec_field, df.bf16_vec_field, df.fp16_vec_field]:
vector_data_type = "FLOAT_VECTOR"
if f == df.float_vec_field:
dim = float_vec_field_dim
vector_data_type = "FLOAT_VECTOR"
elif f == df.bf16_vec_field:
dim = bf16_vec_field_dim
vector_data_type = "BFLOAT16_VECTOR"
else:
dim = fp16_vec_field_dim
vector_data_type = "FLOAT16_VECTOR"
search_data = cf.gen_vectors(1, dim, vector_data_type=vector_data_type)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
field_name,
f,
param=search_params,
limit=1,
output_fields=["*"],
@ -846,7 +861,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert "name" in fields_from_search
assert "address" in fields_from_search
_, search_data = cf.gen_binary_vectors(1, dim)
_, search_data = cf.gen_binary_vectors(1, binary_vec_field_dim)
search_params = ct.default_search_binary_params
for field_name in binary_vec_fields:
res, _ = self.collection_wrap.search(
@ -878,7 +893,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.parametrize("entities", [2000])
@pytest.mark.parametrize("enable_dynamic_field", [True, False])
@pytest.mark.parametrize("enable_partition_key", [True, False])
@pytest.mark.parametrize("include_meta", [True, False])
@ -894,18 +909,20 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
"""
if enable_dynamic_field is False and include_meta is True:
pytest.skip("include_meta only works with enable_dynamic_field")
float_vec_field_dim = dim
binary_vec_field_dim = ((dim+random.randint(-16, 32)) // 8) * 8
bf16_vec_field_dim = dim+random.randint(-16, 32)
fp16_vec_field_dim = dim+random.randint(-16, 32)
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_string_field(name=df.string_field, is_partition_key=enable_partition_key),
cf.gen_json_field(name=df.json_field),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
# cf.gen_float_vec_field(name=df.image_float_vec_field, dim=dim),
# cf.gen_float_vec_field(name=df.text_float_vec_field, dim=dim),
cf.gen_binary_vec_field(name=df.binary_vec_field, dim=dim),
cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=dim),
cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim)
cf.gen_float_vec_field(name=df.float_vec_field, dim=float_vec_field_dim),
cf.gen_binary_vec_field(name=df.binary_vec_field, dim=binary_vec_field_dim),
cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=bf16_vec_field_dim),
cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=fp16_vec_field_dim)
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
self._connect()
@ -920,7 +937,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
data_fields=data_fields,
enable_dynamic_field=enable_dynamic_field,
force=True,
include_meta=include_meta,
schema=schema
)
self.collection_wrap.init_collection(c_name, schema=schema)
@ -950,7 +966,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
)
for f in [df.bf16_vec_field, df.fp16_vec_field]:
self.collection_wrap.create_index(
field_name=f, index_params={"index_type": "FLAT", "metric_type": "COSINE"}
field_name=f, index_params=index_params
)
for f in binary_vec_fields:
self.collection_wrap.create_index(
@ -960,12 +976,24 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
for field_name in float_vec_fields:
for f in [df.float_vec_field, df.bf16_vec_field, df.fp16_vec_field]:
vector_data_type = "FLOAT_VECTOR"
if f == df.float_vec_field:
dim = float_vec_field_dim
vector_data_type = "FLOAT_VECTOR"
elif f == df.bf16_vec_field:
dim = bf16_vec_field_dim
vector_data_type = "BFLOAT16_VECTOR"
else:
dim = fp16_vec_field_dim
vector_data_type = "FLOAT16_VECTOR"
search_data = cf.gen_vectors(1, dim, vector_data_type=vector_data_type)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
field_name,
f,
param=search_params,
limit=1,
output_fields=["*"],
@ -977,11 +1005,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field and include_meta:
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
_, search_data = cf.gen_binary_vectors(1, dim)
_, search_data = cf.gen_binary_vectors(1, binary_vec_field_dim)
search_params = ct.default_search_binary_params
for field_name in binary_vec_fields:
res, _ = self.collection_wrap.search(
@ -998,7 +1026,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field and include_meta:
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
# query data
@ -1013,7 +1041,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.parametrize("entities", [2000])
@pytest.mark.parametrize("enable_dynamic_field", [True, False])
@pytest.mark.parametrize("enable_partition_key", [True, False])
@pytest.mark.parametrize("include_meta", [True, False])
@ -1028,6 +1056,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
"""
if enable_dynamic_field is False and include_meta is True:
pytest.skip("include_meta only works with enable_dynamic_field")
float_vec_field_dim = dim
binary_vec_field_dim = ((dim+random.randint(-16, 32)) // 8) * 8
bf16_vec_field_dim = dim+random.randint(-16, 32)
fp16_vec_field_dim = dim+random.randint(-16, 32)
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
@ -1038,15 +1070,16 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
cf.gen_binary_vec_field(name=df.binary_vec_field, dim=dim),
cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=dim),
cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim)
cf.gen_float_vec_field(name=df.float_vec_field, dim=float_vec_field_dim),
cf.gen_binary_vec_field(name=df.binary_vec_field, dim=binary_vec_field_dim),
cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=bf16_vec_field_dim),
cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=fp16_vec_field_dim)
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
files = prepare_bulk_insert_parquet_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
@ -1055,10 +1088,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
data_fields=data_fields,
enable_dynamic_field=enable_dynamic_field,
force=True,
include_meta=include_meta,
schema=schema,
schema=schema
)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
@ -1084,7 +1117,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
)
for f in [df.bf16_vec_field, df.fp16_vec_field]:
self.collection_wrap.create_index(
field_name=f, index_params={"index_type": "FLAT", "metric_type": "COSINE"}
field_name=f, index_params=index_params
)
for f in binary_vec_fields:
self.collection_wrap.create_index(
@ -1094,12 +1127,24 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
for field_name in float_vec_fields:
for f in [df.float_vec_field, df.bf16_vec_field, df.fp16_vec_field]:
vector_data_type = "FLOAT_VECTOR"
if f == df.float_vec_field:
dim = float_vec_field_dim
vector_data_type = "FLOAT_VECTOR"
elif f == df.bf16_vec_field:
dim = bf16_vec_field_dim
vector_data_type = "BFLOAT16_VECTOR"
else:
dim = fp16_vec_field_dim
vector_data_type = "FLOAT16_VECTOR"
search_data = cf.gen_vectors(1, dim, vector_data_type=vector_data_type)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
field_name,
f,
param=search_params,
limit=1,
output_fields=["*"],
@ -1111,11 +1156,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field and include_meta:
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
_, search_data = cf.gen_binary_vectors(1, dim)
_, search_data = cf.gen_binary_vectors(1, binary_vec_field_dim)
search_params = ct.default_search_binary_params
for field_name in binary_vec_fields:
res, _ = self.collection_wrap.search(
@ -1132,7 +1177,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field and include_meta:
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
# query data
@ -1147,7 +1192,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.parametrize("entities", [2000])
@pytest.mark.parametrize("enable_dynamic_field", [True, False])
@pytest.mark.parametrize("include_meta", [True, False])
@pytest.mark.parametrize("sparse_format", ["doc", "coo"])
@ -1270,7 +1315,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.parametrize("entities", [2000])
@pytest.mark.parametrize("enable_dynamic_field", [True, False])
@pytest.mark.parametrize("include_meta", [True, False])
@pytest.mark.parametrize("sparse_format", ["doc", "coo"])