test: add multi vector for bulk insert test (#30223)

add multi vector for bulk insert test

---------

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/30270/head
zhuwenxing 2024-01-25 15:03:01 +08:00 committed by GitHub
parent 51fe4743f1
commit 78562d0246
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 193 additions and 79 deletions

View File

@ -21,6 +21,10 @@ FLOAT = "float"
class DataField:
pk_field = "uid"
vec_field = "vectors"
float_vec_field = "float_vectors"
image_float_vec_field = "image_float_vec_field"
text_float_vec_field = "text_float_vec_field"
binary_vec_field = "binary_vec_field"
int_field = "int_scalar"
string_field = "string_scalar"
bool_field = "bool_scalar"
@ -317,12 +321,12 @@ def gen_vectors_in_numpy_file(dir, data_field, float_vector, rows, dim, force=Fa
if rows > 0:
if float_vector:
vectors = gen_float_vectors(rows, dim)
arr = np.array(vectors)
else:
vectors = gen_binary_vectors(rows, (dim // 8))
arr = np.array(vectors)
# print(f"file_name: {file_name} data type: {arr.dtype}")
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
np.save(file, arr)
arr = np.array(vectors, dtype=np.dtype("uint8"))
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
np.save(file, arr)
return file_name
@ -421,8 +425,15 @@ def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128
data = []
if rows > 0:
if data_field == DataField.vec_field:
data = gen_vectors(float_vector=float_vector, rows=rows, dim=dim)
if "vec" in data_field:
if "float" in data_field:
data = gen_vectors(float_vector=True, rows=rows, dim=dim)
data = pd.Series([np.array(x, dtype=np.dtype("float32")) for x in data])
elif "binary" in data_field:
data = gen_vectors(float_vector=False, rows=rows, dim=dim)
data = pd.Series([np.array(x, dtype=np.dtype("uint8")) for x in data])
else:
data = gen_vectors(float_vector=float_vector, rows=rows, dim=dim)
elif data_field == DataField.float_field:
data = [np.float32(random.random()) for _ in range(rows)]
elif data_field == DataField.double_field:
@ -530,8 +541,11 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d
for r in range(rows):
d = {}
for data_field in data_fields:
if data_field == DataField.vec_field:
# vector columns
if "vec" in data_field:
if "float" in data_field:
float_vector = True
if "binary" in data_field:
float_vector = False
d[data_field] = gen_vectors(float_vector=float_vector, rows=1, dim=dim)[0]
elif data_field == DataField.float_field:
d[data_field] = random.random()
@ -608,7 +622,11 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
if file_nums == 1:
# gen the numpy file without subfolders if only one set of files
for data_field in data_fields:
if data_field == DataField.vec_field:
if "vec" in data_field:
if "float" in data_field:
float_vector = True
if "binary" in data_field:
float_vector = False
file_name = gen_vectors_in_numpy_file(dir=data_source, data_field=data_field, float_vector=float_vector,
rows=rows, dim=dim, force=force)
elif data_field == DataField.string_field: # string field for numpy not supported yet at 2022-10-17

View File

@ -337,7 +337,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128])
@pytest.mark.parametrize("entities", [2000])
def test_binary_vector_only(self, is_row_based, auto_id, dim, entities):
def test_binary_vector_json(self, is_row_based, auto_id, dim, entities):
"""
collection schema: [pk, binary_vector]
Steps:
@ -692,8 +692,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("dim", [2]) # 128
@pytest.mark.parametrize("entities", [2]) # 1000
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.parametrize("enable_dynamic_field", [True])
def test_bulk_insert_all_field_with_new_json_format(self, auto_id, dim, entities, enable_dynamic_field):
"""
@ -714,7 +714,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
cf.gen_float_vec_field(name=df.image_float_vec_field, dim=dim),
cf.gen_float_vec_field(name=df.text_float_vec_field, dim=dim),
cf.gen_binary_vec_field(name=df.binary_vec_field, dim=dim)
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_new_json_files(
@ -748,32 +751,63 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name]
binary_vec_fields = [f.name for f in fields if "vec" in f.name and "binary" in f.name]
for f in float_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=index_params
)
for f in binary_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=ct.default_binary_index
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
for field_name in float_vec_fields:
res, _ = self.collection_wrap.search(
search_data,
field_name,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
_, search_data = cf.gen_binary_vectors(1, dim)
search_params = ct.default_search_binary_params
for field_name in binary_vec_fields:
res, _ = self.collection_wrap.search(
search_data,
field_name,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@ -796,7 +830,10 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_float_field(name=df.float_field),
cf.gen_double_field(name=df.double_field),
cf.gen_json_field(name=df.json_field),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
cf.gen_float_vec_field(name=df.image_float_vec_field, dim=dim),
cf.gen_float_vec_field(name=df.text_float_vec_field, dim=dim),
cf.gen_binary_vec_field(name=df.binary_vec_field, dim=dim)
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_numpy_files(
@ -805,8 +842,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
rows=entities,
dim=dim,
data_fields=data_fields,
force=True,
enable_dynamic_field=enable_dynamic_field,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
@ -830,32 +867,61 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name]
binary_vec_fields = [f.name for f in fields if "vec" in f.name and "binary" in f.name]
for f in float_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=index_params
)
for f in binary_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=ct.default_binary_index
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
for field_name in float_vec_fields:
res, _ = self.collection_wrap.search(
search_data,
field_name,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
_, search_data = cf.gen_binary_vectors(1, dim)
search_params = ct.default_search_binary_params
for field_name in binary_vec_fields:
res, _ = self.collection_wrap.search(
search_data,
field_name,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@ -883,17 +949,18 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
cf.gen_float_vec_field(name=df.image_float_vec_field, dim=dim),
cf.gen_float_vec_field(name=df.text_float_vec_field, dim=dim),
cf.gen_binary_vec_field(name=df.binary_vec_field, dim=dim)
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_parquet_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
data_fields=data_fields,
file_nums=file_nums,
array_length=array_len,
enable_dynamic_field=enable_dynamic_field,
force=True,
)
@ -919,32 +986,61 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name]
binary_vec_fields = [f.name for f in fields if "vec" in f.name and "binary" in f.name]
for f in float_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=index_params
)
for f in binary_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=ct.default_binary_index
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
for field_name in float_vec_fields:
res, _ = self.collection_wrap.search(
search_data,
field_name,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
_, search_data = cf.gen_binary_vectors(1, dim)
search_params = ct.default_search_binary_params
for field_name in binary_vec_fields:
res, _ = self.collection_wrap.search(
search_data,
field_name,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True])