mirror of https://github.com/milvus-io/milvus.git
test: add sparse vector datatype for import test (#33166)
Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>pull/33335/head
parent
3bec2c4928
commit
6c186112bd
|
@ -23,6 +23,7 @@ class DataField:
|
|||
pk_field = "uid"
|
||||
vec_field = "vectors"
|
||||
float_vec_field = "float_vectors"
|
||||
sparse_vec_field = "sparse_vectors"
|
||||
image_float_vec_field = "image_float_vec_field"
|
||||
text_float_vec_field = "text_float_vec_field"
|
||||
binary_vec_field = "binary_vec_field"
|
||||
|
@ -473,7 +474,22 @@ def gen_vectors(float_vector, rows, dim):
|
|||
return vectors
|
||||
|
||||
|
||||
def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128, array_length=None):
|
||||
def gen_sparse_vectors(rows, sparse_format="dok"):
|
||||
# default sparse format is dok, dict of keys
|
||||
# another option is coo, coordinate List
|
||||
|
||||
rng = np.random.default_rng()
|
||||
vectors = [{
|
||||
d: rng.random() for d in random.sample(range(1000), random.randint(20, 30))
|
||||
} for _ in range(rows)]
|
||||
if sparse_format == "coo":
|
||||
vectors = [
|
||||
{"indices": list(x.keys()), "values": list(x.values())} for x in vectors
|
||||
]
|
||||
return vectors
|
||||
|
||||
|
||||
def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128, array_length=None, sparse_format="dok"):
|
||||
if array_length is None:
|
||||
array_length = random.randint(0, 10)
|
||||
|
||||
|
@ -483,6 +499,9 @@ def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128
|
|||
if "float" in data_field:
|
||||
data = gen_vectors(float_vector=True, rows=rows, dim=dim)
|
||||
data = pd.Series([np.array(x, dtype=np.dtype("float32")) for x in data])
|
||||
elif "sparse" in data_field:
|
||||
data = gen_sparse_vectors(rows, sparse_format=sparse_format)
|
||||
data = pd.Series([json.dumps(x) for x in data], dtype=np.dtype("str"))
|
||||
elif "fp16" in data_field:
|
||||
data = gen_fp16_vectors(rows, dim)[1]
|
||||
data = pd.Series([np.array(x, dtype=np.dtype("uint8")) for x in data])
|
||||
|
@ -596,7 +615,7 @@ def gen_json_files(is_row_based, rows, dim, auto_id, str_pk,
|
|||
return files
|
||||
|
||||
|
||||
def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, dim=128, array_length=None, enable_dynamic_field=False):
|
||||
def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, dim=128, array_length=None, enable_dynamic_field=False, **kwargs):
|
||||
data = []
|
||||
for r in range(rows):
|
||||
d = {}
|
||||
|
@ -605,6 +624,9 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d
|
|||
if "float" in data_field:
|
||||
float_vector = True
|
||||
d[data_field] = gen_vectors(float_vector=float_vector, rows=1, dim=dim)[0]
|
||||
if "sparse" in data_field:
|
||||
sparse_format = kwargs.get("sparse_format", "dok")
|
||||
d[data_field] = gen_sparse_vectors(1, sparse_format=sparse_format)[0]
|
||||
if "binary" in data_field:
|
||||
float_vector = False
|
||||
d[data_field] = gen_vectors(float_vector=float_vector, rows=1, dim=dim)[0]
|
||||
|
@ -647,7 +669,7 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d
|
|||
return data
|
||||
|
||||
|
||||
def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, file_size=None, err_type="", enable_dynamic_field=False):
|
||||
def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, file_size=None, err_type="", enable_dynamic_field=False, **kwargs):
|
||||
files = []
|
||||
if file_size is not None:
|
||||
rows = 5000
|
||||
|
@ -655,7 +677,7 @@ def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_
|
|||
for i in range(file_nums):
|
||||
file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json"
|
||||
file = f"{data_source}/{file_name}"
|
||||
data = gen_dict_data_by_data_field(data_fields=data_fields, rows=rows, start=start_uid, float_vector=float_vector, dim=dim, array_length=array_length, enable_dynamic_field=enable_dynamic_field)
|
||||
data = gen_dict_data_by_data_field(data_fields=data_fields, rows=rows, start=start_uid, float_vector=float_vector, dim=dim, array_length=array_length, enable_dynamic_field=enable_dynamic_field, **kwargs)
|
||||
# log.info(f"data: {data}")
|
||||
with open(file, "w") as f:
|
||||
json.dump(data, f)
|
||||
|
@ -762,7 +784,7 @@ def gen_dynamic_field_data_in_parquet_file(rows, start=0):
|
|||
return data
|
||||
|
||||
|
||||
def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_group_size=None, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False, include_meta=True):
|
||||
def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_group_size=None, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False, include_meta=True, sparse_format="doc"):
|
||||
# gen numpy files
|
||||
if err_type == "":
|
||||
err_type = "none"
|
||||
|
@ -775,7 +797,7 @@ def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_
|
|||
all_field_data = {}
|
||||
for data_field in data_fields:
|
||||
data = gen_data_by_data_field(data_field=data_field, rows=rows, start=0,
|
||||
float_vector=float_vector, dim=dim, array_length=array_length)
|
||||
float_vector=float_vector, dim=dim, array_length=array_length, sparse_format=sparse_format)
|
||||
all_field_data[data_field] = data
|
||||
if enable_dynamic_field and include_meta:
|
||||
all_field_data["$meta"] = gen_dynamic_field_data_in_parquet_file(rows=rows, start=0)
|
||||
|
@ -948,7 +970,7 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke
|
|||
|
||||
|
||||
def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, array_length=None, file_size=None, row_group_size=None,
|
||||
enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True):
|
||||
enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True, sparse_format="doc"):
|
||||
"""
|
||||
Generate column based files based on params in parquet format and copy them to the minio
|
||||
Note: each field in data_fields would be generated one parquet file.
|
||||
|
@ -980,7 +1002,7 @@ def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-buc
|
|||
"""
|
||||
files = gen_parquet_files(rows=rows, dim=dim, float_vector=float_vector, enable_dynamic_field=enable_dynamic_field,
|
||||
data_fields=data_fields, array_length=array_length, file_size=file_size, row_group_size=row_group_size,
|
||||
file_nums=file_nums, include_meta=include_meta)
|
||||
file_nums=file_nums, include_meta=include_meta, sparse_format=sparse_format)
|
||||
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
|
||||
return files
|
||||
|
||||
|
|
|
@ -1224,6 +1224,248 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
|
|||
assert len(res) == len(query_data)
|
||||
if enable_partition_key:
|
||||
assert len(self.collection_wrap.partitions) > 1
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L3)
|
||||
@pytest.mark.parametrize("auto_id", [True, False])
|
||||
@pytest.mark.parametrize("dim", [128]) # 128
|
||||
@pytest.mark.parametrize("entities", [1000]) # 1000
|
||||
@pytest.mark.parametrize("enable_dynamic_field", [True, False])
|
||||
@pytest.mark.parametrize("include_meta", [True, False])
|
||||
@pytest.mark.parametrize("sparse_format", ["doc", "coo"])
|
||||
def test_bulk_insert_sparse_vector_with_parquet(self, auto_id, dim, entities, enable_dynamic_field, include_meta, sparse_format):
|
||||
"""
|
||||
collection schema 1: [pk, int64, float64, string float_vector]
|
||||
data file: vectors.parquet and uid.parquet,
|
||||
Steps:
|
||||
1. create collection
|
||||
2. import data
|
||||
3. verify
|
||||
"""
|
||||
if enable_dynamic_field is False and include_meta is True:
|
||||
pytest.skip("include_meta only works with enable_dynamic_field")
|
||||
fields = [
|
||||
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
|
||||
cf.gen_int64_field(name=df.int_field),
|
||||
cf.gen_float_field(name=df.float_field),
|
||||
cf.gen_string_field(name=df.string_field),
|
||||
cf.gen_json_field(name=df.json_field),
|
||||
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
|
||||
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
|
||||
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100),
|
||||
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
|
||||
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
|
||||
cf.gen_sparse_vec_field(name=df.sparse_vec_field),
|
||||
]
|
||||
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
|
||||
files = prepare_bulk_insert_parquet_files(
|
||||
minio_endpoint=self.minio_endpoint,
|
||||
bucket_name=self.bucket_name,
|
||||
rows=entities,
|
||||
dim=dim,
|
||||
data_fields=data_fields,
|
||||
enable_dynamic_field=enable_dynamic_field,
|
||||
force=True,
|
||||
include_meta=include_meta,
|
||||
sparse_format=sparse_format
|
||||
)
|
||||
self._connect()
|
||||
c_name = cf.gen_unique_str("bulk_insert")
|
||||
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
|
||||
self.collection_wrap.init_collection(c_name, schema=schema)
|
||||
|
||||
# import data
|
||||
t0 = time.time()
|
||||
task_id, _ = self.utility_wrap.do_bulk_insert(
|
||||
collection_name=c_name, files=files
|
||||
)
|
||||
logging.info(f"bulk insert task ids:{task_id}")
|
||||
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
|
||||
task_ids=[task_id], timeout=300
|
||||
)
|
||||
tt = time.time() - t0
|
||||
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
|
||||
assert success
|
||||
num_entities = self.collection_wrap.num_entities
|
||||
log.info(f" collection entities: {num_entities}")
|
||||
assert num_entities == entities
|
||||
# verify imported data is available for search
|
||||
index_params = ct.default_index
|
||||
float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name]
|
||||
sparse_vec_fields = [f.name for f in fields if "vec" in f.name and "sparse" in f.name]
|
||||
for f in float_vec_fields:
|
||||
self.collection_wrap.create_index(
|
||||
field_name=f, index_params=index_params
|
||||
)
|
||||
for f in sparse_vec_fields:
|
||||
self.collection_wrap.create_index(
|
||||
field_name=f, index_params=ct.default_sparse_inverted_index
|
||||
)
|
||||
self.collection_wrap.load()
|
||||
log.info(f"wait for load finished and be ready for search")
|
||||
time.sleep(2)
|
||||
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
|
||||
search_data = cf.gen_vectors(1, dim)
|
||||
search_params = ct.default_search_params
|
||||
for field_name in float_vec_fields:
|
||||
res, _ = self.collection_wrap.search(
|
||||
search_data,
|
||||
field_name,
|
||||
param=search_params,
|
||||
limit=1,
|
||||
output_fields=["*"],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": 1},
|
||||
)
|
||||
for hit in res:
|
||||
for r in hit:
|
||||
fields_from_search = r.fields.keys()
|
||||
for f in fields:
|
||||
assert f.name in fields_from_search
|
||||
if enable_dynamic_field and include_meta:
|
||||
assert "name" in fields_from_search
|
||||
assert "address" in fields_from_search
|
||||
search_data = cf.gen_sparse_vectors(1, dim)
|
||||
search_params = ct.default_sparse_search_params
|
||||
for field_name in sparse_vec_fields:
|
||||
res, _ = self.collection_wrap.search(
|
||||
search_data,
|
||||
field_name,
|
||||
param=search_params,
|
||||
limit=1,
|
||||
output_fields=["*"],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": 1},
|
||||
)
|
||||
for hit in res:
|
||||
for r in hit:
|
||||
fields_from_search = r.fields.keys()
|
||||
for f in fields:
|
||||
assert f.name in fields_from_search
|
||||
if enable_dynamic_field and include_meta:
|
||||
assert "name" in fields_from_search
|
||||
assert "address" in fields_from_search
|
||||
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L3)
|
||||
@pytest.mark.parametrize("auto_id", [True, False])
|
||||
@pytest.mark.parametrize("dim", [128]) # 128
|
||||
@pytest.mark.parametrize("entities", [1000]) # 1000
|
||||
@pytest.mark.parametrize("enable_dynamic_field", [True, False])
|
||||
@pytest.mark.parametrize("include_meta", [True, False])
|
||||
@pytest.mark.parametrize("sparse_format", ["doc", "coo"])
|
||||
def test_bulk_insert_sparse_vector_with_json(self, auto_id, dim, entities, enable_dynamic_field, include_meta, sparse_format):
|
||||
"""
|
||||
collection schema 1: [pk, int64, float64, string float_vector]
|
||||
data file: vectors.parquet and uid.parquet,
|
||||
Steps:
|
||||
1. create collection
|
||||
2. import data
|
||||
3. verify
|
||||
"""
|
||||
if enable_dynamic_field is False and include_meta is True:
|
||||
pytest.skip("include_meta only works with enable_dynamic_field")
|
||||
fields = [
|
||||
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
|
||||
cf.gen_int64_field(name=df.int_field),
|
||||
cf.gen_float_field(name=df.float_field),
|
||||
cf.gen_string_field(name=df.string_field),
|
||||
cf.gen_json_field(name=df.json_field),
|
||||
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
|
||||
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
|
||||
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100),
|
||||
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
|
||||
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
|
||||
cf.gen_sparse_vec_field(name=df.sparse_vec_field),
|
||||
]
|
||||
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
|
||||
files = prepare_bulk_insert_new_json_files(
|
||||
minio_endpoint=self.minio_endpoint,
|
||||
bucket_name=self.bucket_name,
|
||||
rows=entities,
|
||||
dim=dim,
|
||||
data_fields=data_fields,
|
||||
enable_dynamic_field=enable_dynamic_field,
|
||||
force=True,
|
||||
include_meta=include_meta,
|
||||
sparse_format=sparse_format
|
||||
)
|
||||
self._connect()
|
||||
c_name = cf.gen_unique_str("bulk_insert")
|
||||
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
|
||||
self.collection_wrap.init_collection(c_name, schema=schema)
|
||||
|
||||
# import data
|
||||
t0 = time.time()
|
||||
task_id, _ = self.utility_wrap.do_bulk_insert(
|
||||
collection_name=c_name, files=files
|
||||
)
|
||||
logging.info(f"bulk insert task ids:{task_id}")
|
||||
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
|
||||
task_ids=[task_id], timeout=300
|
||||
)
|
||||
tt = time.time() - t0
|
||||
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
|
||||
assert success
|
||||
num_entities = self.collection_wrap.num_entities
|
||||
log.info(f" collection entities: {num_entities}")
|
||||
assert num_entities == entities
|
||||
# verify imported data is available for search
|
||||
index_params = ct.default_index
|
||||
float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name]
|
||||
sparse_vec_fields = [f.name for f in fields if "vec" in f.name and "sparse" in f.name]
|
||||
for f in float_vec_fields:
|
||||
self.collection_wrap.create_index(
|
||||
field_name=f, index_params=index_params
|
||||
)
|
||||
for f in sparse_vec_fields:
|
||||
self.collection_wrap.create_index(
|
||||
field_name=f, index_params=ct.default_sparse_inverted_index
|
||||
)
|
||||
self.collection_wrap.load()
|
||||
log.info(f"wait for load finished and be ready for search")
|
||||
time.sleep(2)
|
||||
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
|
||||
search_data = cf.gen_vectors(1, dim)
|
||||
search_params = ct.default_search_params
|
||||
for field_name in float_vec_fields:
|
||||
res, _ = self.collection_wrap.search(
|
||||
search_data,
|
||||
field_name,
|
||||
param=search_params,
|
||||
limit=1,
|
||||
output_fields=["*"],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": 1},
|
||||
)
|
||||
for hit in res:
|
||||
for r in hit:
|
||||
fields_from_search = r.fields.keys()
|
||||
for f in fields:
|
||||
assert f.name in fields_from_search
|
||||
if enable_dynamic_field and include_meta:
|
||||
assert "name" in fields_from_search
|
||||
assert "address" in fields_from_search
|
||||
search_data = cf.gen_sparse_vectors(1, dim)
|
||||
search_params = ct.default_sparse_search_params
|
||||
for field_name in sparse_vec_fields:
|
||||
res, _ = self.collection_wrap.search(
|
||||
search_data,
|
||||
field_name,
|
||||
param=search_params,
|
||||
limit=1,
|
||||
output_fields=["*"],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": 1},
|
||||
)
|
||||
for hit in res:
|
||||
for r in hit:
|
||||
fields_from_search = r.fields.keys()
|
||||
for f in fields:
|
||||
assert f.name in fields_from_search
|
||||
if enable_dynamic_field and include_meta:
|
||||
assert "name" in fields_from_search
|
||||
assert "address" in fields_from_search
|
||||
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L3)
|
||||
@pytest.mark.parametrize("auto_id", [True])
|
||||
|
|
Loading…
Reference in New Issue