test: add test case for bulkwriter (#33879)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/33997/head^2
zhuwenxing 2024-06-20 09:24:04 +08:00 committed by GitHub
parent 0264588df9
commit f3d902cf16
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 390 additions and 40 deletions

View File

@ -24,13 +24,13 @@ FLOAT = "float"
class DataField:
pk_field = "uid"
vec_field = "vectors"
float_vec_field = "float_vectors"
float_vec_field = "float32_vectors"
sparse_vec_field = "sparse_vectors"
image_float_vec_field = "image_float_vec_field"
text_float_vec_field = "text_float_vec_field"
binary_vec_field = "binary_vec_field"
bf16_vec_field = "bf16_vec_field"
fp16_vec_field = "fp16_vec_field"
bf16_vec_field = "brain_float16_vec_field"
fp16_vec_field = "float16_vec_field"
int_field = "int_scalar"
string_field = "string_scalar"
bool_field = "bool_scalar"
@ -504,16 +504,16 @@ def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128
data = []
if rows > 0:
if "vec" in data_field:
if "float" in data_field:
if "float" in data_field and "16" not in data_field:
data = gen_vectors(float_vector=True, rows=rows, dim=dim)
data = pd.Series([np.array(x, dtype=np.dtype("float32")) for x in data])
elif "sparse" in data_field:
data = gen_sparse_vectors(rows, sparse_format=sparse_format)
data = pd.Series([json.dumps(x) for x in data], dtype=np.dtype("str"))
elif "fp16" in data_field:
elif "float16" in data_field:
data = gen_fp16_vectors(rows, dim)[1]
data = pd.Series([np.array(x, dtype=np.dtype("uint8")) for x in data])
elif "bf16" in data_field:
elif "brain_float16" in data_field:
data = gen_bf16_vectors(rows, dim)[1]
data = pd.Series([np.array(x, dtype=np.dtype("uint8")) for x in data])
elif "binary" in data_field:
@ -758,10 +758,10 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
if "binary" in data_field:
float_vector = False
vector_type = "binary"
if "bf16" in data_field:
if "brain_float16" in data_field:
float_vector = True
vector_type = "bf16"
if "fp16" in data_field:
if "float16" in data_field:
float_vector = True
vector_type = "fp16"

View File

@ -2114,19 +2114,19 @@ def gen_fp16_vectors(num, dim):
return raw_vectors, fp16_vectors
def gen_sparse_vectors(nb, dim):
"""
generate sparse vector data
return sparse_vectors
"""
def gen_sparse_vectors(nb, dim=1000, sparse_format="dok"):
# default sparse format is dok, dict of keys
# another option is coo, coordinate List
rng = np.random.default_rng()
entities = [
{
d: rng.random() for d in random.sample(range(dim), random.randint(1, 1))
}
for _ in range(nb)
]
return entities
vectors = [{
d: rng.random() for d in random.sample(range(dim), random.randint(20, 30))
} for _ in range(nb)]
if sparse_format == "coo":
vectors = [
{"indices": list(x.keys()), "values": list(x.values())} for x in vectors
]
return vectors
def gen_vectors_based_on_vector_type(num, dim, vector_data_type):

View File

@ -3,6 +3,7 @@ import random
import time
import pytest
from pymilvus import DataType
from pymilvus.bulk_writer import RemoteBulkWriter, BulkFileType
import numpy as np
from pathlib import Path
from base.client_base import TestcaseBase
@ -16,11 +17,10 @@ from common.bulk_insert_data import (
prepare_bulk_insert_new_json_files,
prepare_bulk_insert_numpy_files,
prepare_bulk_insert_parquet_files,
prepare_bulk_insert_csv_files,
DataField as df,
)
from faker import Faker
fake = Faker()
default_vec_only_fields = [df.vec_field]
default_multi_fields = [
df.vec_field,
@ -816,10 +816,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.create_index(
field_name=f, index_params=index_params
)
for f in [df.bf16_vec_field, df.fp16_vec_field]:
self.collection_wrap.create_index(
field_name=f, index_params=index_params
)
for f in binary_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=ct.default_binary_index
@ -964,10 +960,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.create_index(
field_name=f, index_params=index_params
)
for f in [df.bf16_vec_field, df.fp16_vec_field]:
self.collection_wrap.create_index(
field_name=f, index_params=index_params
)
for f in binary_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=ct.default_binary_index
@ -1115,10 +1107,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
self.collection_wrap.create_index(
field_name=f, index_params=index_params
)
for f in [df.bf16_vec_field, df.fp16_vec_field]:
self.collection_wrap.create_index(
field_name=f, index_params=index_params
)
for f in binary_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=ct.default_binary_index
@ -1433,6 +1421,373 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert "name" in fields_from_search
assert "address" in fields_from_search
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.parametrize("enable_dynamic_field", [True, False])
@pytest.mark.parametrize("sparse_format", ["doc", "coo"])
def test_with_all_field_json_with_bulk_writer(self, auto_id, dim, entities, enable_dynamic_field, sparse_format):
"""
collection schema 1: [pk, int64, float64, string float_vector]
data file: vectors.npy and uid.npy,
Steps:
1. create collection
2. import data
3. verify
"""
self._connect()
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_string_field(name=df.string_field),
cf.gen_json_field(name=df.json_field),
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim),
cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=dim),
cf.gen_sparse_vec_field(name=df.sparse_vec_field),
]
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema)
with RemoteBulkWriter(
schema=schema,
remote_path="bulk_data",
connect_param=RemoteBulkWriter.ConnectParam(
bucket_name=self.bucket_name,
endpoint=self.minio_endpoint,
access_key="minioadmin",
secret_key="minioadmin",
),
file_type=BulkFileType.JSON,
) as remote_writer:
json_value = [
# 1,
# 1.0,
# "1",
# [1, 2, 3],
# ["1", "2", "3"],
# [1, 2, "3"],
{"key": "value"},
]
for i in range(entities):
row = {
df.pk_field: i,
df.int_field: 1,
df.float_field: 1.0,
df.string_field: "string",
df.json_field: json_value[i%len(json_value)],
df.array_int_field: [1, 2],
df.array_float_field: [1.0, 2.0],
df.array_string_field: ["string1", "string2"],
df.array_bool_field: [True, False],
df.float_vec_field: cf.gen_vectors(1, dim)[0],
df.fp16_vec_field: cf.gen_vectors(1, dim, vector_data_type="FLOAT16_VECTOR")[0],
df.bf16_vec_field: cf.gen_vectors(1, dim, vector_data_type="BFLOAT16_VECTOR")[0],
df.sparse_vec_field: cf.gen_sparse_vectors(1, dim, sparse_format=sparse_format)[0]
}
if auto_id:
row.pop(df.pk_field)
if enable_dynamic_field:
row["name"] = fake.name()
row["address"] = fake.address()
remote_writer.append_row(row)
remote_writer.commit()
files = remote_writer.batch_files
# import data
for f in files:
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=f
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
assert success
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name]
sparse_vec_fields = [f.name for f in fields if "vec" in f.name and "sparse" in f.name]
for f in float_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=index_params
)
for f in sparse_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=ct.default_sparse_inverted_index
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.float_vec_field,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.parametrize("enable_dynamic_field", [True, False])
def test_with_all_field_numpy_with_bulk_writer(self, auto_id, dim, entities, enable_dynamic_field):
"""
"""
self._connect()
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_string_field(name=df.string_field),
cf.gen_json_field(name=df.json_field),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim),
cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=dim),
]
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema)
with RemoteBulkWriter(
schema=schema,
remote_path="bulk_data",
connect_param=RemoteBulkWriter.ConnectParam(
bucket_name=self.bucket_name,
endpoint=self.minio_endpoint,
access_key="minioadmin",
secret_key="minioadmin",
),
file_type=BulkFileType.NUMPY,
) as remote_writer:
json_value = [
# 1,
# 1.0,
# "1",
# [1, 2, 3],
# ["1", "2", "3"],
# [1, 2, "3"],
{"key": "value"},
]
for i in range(entities):
row = {
df.pk_field: i,
df.int_field: 1,
df.float_field: 1.0,
df.string_field: "string",
df.json_field: json_value[i%len(json_value)],
df.float_vec_field: cf.gen_vectors(1, dim)[0],
df.fp16_vec_field: cf.gen_vectors(1, dim, vector_data_type="FLOAT16_VECTOR")[0],
df.bf16_vec_field: cf.gen_vectors(1, dim, vector_data_type="BFLOAT16_VECTOR")[0],
}
if auto_id:
row.pop(df.pk_field)
if enable_dynamic_field:
row["name"] = fake.name()
row["address"] = fake.address()
remote_writer.append_row(row)
remote_writer.commit()
files = remote_writer.batch_files
# import data
for f in files:
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=f
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
assert success
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name]
sparse_vec_fields = [f.name for f in fields if "vec" in f.name and "sparse" in f.name]
for f in float_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=index_params
)
for f in sparse_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=ct.default_sparse_inverted_index
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.float_vec_field,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.parametrize("enable_dynamic_field", [True, False])
@pytest.mark.parametrize("sparse_format", ["doc", "coo"])
def test_with_all_field_parquet_with_bulk_writer(self, auto_id, dim, entities, enable_dynamic_field, sparse_format):
"""
"""
self._connect()
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_string_field(name=df.string_field),
cf.gen_json_field(name=df.json_field),
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
cf.gen_float16_vec_field(name=df.fp16_vec_field, dim=dim),
cf.gen_bfloat16_vec_field(name=df.bf16_vec_field, dim=dim),
cf.gen_sparse_vec_field(name=df.sparse_vec_field),
]
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema)
with RemoteBulkWriter(
schema=schema,
remote_path="bulk_data",
connect_param=RemoteBulkWriter.ConnectParam(
bucket_name=self.bucket_name,
endpoint=self.minio_endpoint,
access_key="minioadmin",
secret_key="minioadmin",
),
file_type=BulkFileType.JSON,
) as remote_writer:
json_value = [
# 1,
# 1.0,
# "1",
# [1, 2, 3],
# ["1", "2", "3"],
# [1, 2, "3"],
{"key": "value"},
]
for i in range(entities):
row = {
df.pk_field: i,
df.int_field: 1,
df.float_field: 1.0,
df.string_field: "string",
df.json_field: json_value[i%len(json_value)],
df.array_int_field: [1, 2],
df.array_float_field: [1.0, 2.0],
df.array_string_field: ["string1", "string2"],
df.array_bool_field: [True, False],
df.float_vec_field: cf.gen_vectors(1, dim)[0],
df.fp16_vec_field: cf.gen_vectors(1, dim, vector_data_type="FLOAT16_VECTOR")[0],
df.bf16_vec_field: cf.gen_vectors(1, dim, vector_data_type="BFLOAT16_VECTOR")[0],
df.sparse_vec_field: cf.gen_sparse_vectors(1, dim, sparse_format=sparse_format)[0]
}
if auto_id:
row.pop(df.pk_field)
if enable_dynamic_field:
row["name"] = fake.name()
row["address"] = fake.address()
remote_writer.append_row(row)
remote_writer.commit()
files = remote_writer.batch_files
# import data
for f in files:
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=f
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
assert success
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name]
sparse_vec_fields = [f.name for f in fields if "vec" in f.name and "sparse" in f.name]
for f in float_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=index_params
)
for f in sparse_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=ct.default_sparse_inverted_index
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.float_vec_field,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True])
@ -1756,8 +2111,3 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
empty_partition_num += 1
num_entities += p.num_entities
assert num_entities == entities * file_nums