mirror of https://github.com/milvus-io/milvus.git
test: add invert index and string datatype for bulk insert test (#30334)
add invert index and string datatype for bulk insert test Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>pull/30336/head
parent
406bf14e84
commit
fcd9f894ca
|
@ -605,6 +605,139 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
|
|||
results, _ = self.collection_wrap.query(expr=expr)
|
||||
assert len(results) == len(ids)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L3)
|
||||
def test_index_load_before_bulk_insert(self):
|
||||
"""
|
||||
Steps:
|
||||
1. create collection
|
||||
2. create index and load collection
|
||||
3. import data
|
||||
4. verify
|
||||
"""
|
||||
enable_dynamic_field = True
|
||||
auto_id = True
|
||||
dim = 128
|
||||
entities = 1000
|
||||
fields = [
|
||||
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
|
||||
cf.gen_int64_field(name=df.int_field),
|
||||
cf.gen_float_field(name=df.float_field),
|
||||
cf.gen_string_field(name=df.string_field),
|
||||
cf.gen_json_field(name=df.json_field),
|
||||
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
|
||||
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
|
||||
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100),
|
||||
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
|
||||
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
|
||||
cf.gen_float_vec_field(name=df.image_float_vec_field, dim=dim),
|
||||
cf.gen_float_vec_field(name=df.text_float_vec_field, dim=dim),
|
||||
cf.gen_binary_vec_field(name=df.binary_vec_field, dim=dim)
|
||||
]
|
||||
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
|
||||
files = prepare_bulk_insert_new_json_files(
|
||||
minio_endpoint=self.minio_endpoint,
|
||||
bucket_name=self.bucket_name,
|
||||
rows=entities,
|
||||
dim=dim,
|
||||
data_fields=data_fields,
|
||||
enable_dynamic_field=enable_dynamic_field,
|
||||
force=True,
|
||||
)
|
||||
self._connect()
|
||||
c_name = cf.gen_unique_str("bulk_insert")
|
||||
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
|
||||
self.collection_wrap.init_collection(c_name, schema=schema)
|
||||
# create index and load before bulk insert
|
||||
scalar_field_list = [df.int_field, df.float_field, df.double_field, df.string_field]
|
||||
scalar_fields = [f.name for f in fields if f.name in scalar_field_list]
|
||||
float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name]
|
||||
binary_vec_fields = [f.name for f in fields if "vec" in f.name and "binary" in f.name]
|
||||
for f in scalar_fields:
|
||||
self.collection_wrap.create_index(
|
||||
field_name=f, index_params={"index_type": "INVERTED"}
|
||||
)
|
||||
for f in float_vec_fields:
|
||||
self.collection_wrap.create_index(
|
||||
field_name=f, index_params=ct.default_index
|
||||
)
|
||||
for f in binary_vec_fields:
|
||||
self.collection_wrap.create_index(
|
||||
field_name=f, index_params=ct.default_binary_index
|
||||
)
|
||||
self.collection_wrap.load()
|
||||
|
||||
t0 = time.time()
|
||||
task_id, _ = self.utility_wrap.do_bulk_insert(
|
||||
collection_name=c_name, files=files
|
||||
)
|
||||
logging.info(f"bulk insert task ids:{task_id}")
|
||||
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
|
||||
task_ids=[task_id], timeout=300
|
||||
)
|
||||
tt = time.time() - t0
|
||||
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
|
||||
assert success
|
||||
num_entities = self.collection_wrap.num_entities
|
||||
log.info(f" collection entities: {num_entities}")
|
||||
assert num_entities == entities
|
||||
# verify imported data is available for search
|
||||
log.info(f"wait for load finished and be ready for search")
|
||||
self.collection_wrap.load(_refresh=True)
|
||||
time.sleep(5)
|
||||
|
||||
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
|
||||
# query data
|
||||
for f in scalar_fields:
|
||||
if f == df.string_field:
|
||||
expr = f"{f} > '0'"
|
||||
else:
|
||||
expr = f"{f} > 0"
|
||||
res, result = self.collection_wrap.query(expr=expr, output_fields=["count(*)"])
|
||||
log.info(f"query result: {res}")
|
||||
assert result
|
||||
# search data
|
||||
search_data = cf.gen_vectors(1, dim)
|
||||
search_params = ct.default_search_params
|
||||
for field_name in float_vec_fields:
|
||||
res, _ = self.collection_wrap.search(
|
||||
search_data,
|
||||
field_name,
|
||||
param=search_params,
|
||||
limit=1,
|
||||
output_fields=["*"],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": 1},
|
||||
)
|
||||
for hit in res:
|
||||
for r in hit:
|
||||
fields_from_search = r.fields.keys()
|
||||
for f in fields:
|
||||
assert f.name in fields_from_search
|
||||
if enable_dynamic_field:
|
||||
assert "name" in fields_from_search
|
||||
assert "address" in fields_from_search
|
||||
|
||||
_, search_data = cf.gen_binary_vectors(1, dim)
|
||||
search_params = ct.default_search_binary_params
|
||||
for field_name in binary_vec_fields:
|
||||
res, _ = self.collection_wrap.search(
|
||||
search_data,
|
||||
field_name,
|
||||
param=search_params,
|
||||
limit=1,
|
||||
output_fields=["*"],
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": 1, "limit": 1},
|
||||
)
|
||||
for hit in res:
|
||||
for r in hit:
|
||||
fields_from_search = r.fields.keys()
|
||||
for f in fields:
|
||||
assert f.name in fields_from_search
|
||||
if enable_dynamic_field:
|
||||
assert "name" in fields_from_search
|
||||
assert "address" in fields_from_search
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L3)
|
||||
@pytest.mark.parametrize("auto_id", [True, False])
|
||||
@pytest.mark.parametrize("dim", [128]) # 128
|
||||
|
@ -623,7 +756,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
|
|||
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
|
||||
cf.gen_int64_field(name=df.int_field),
|
||||
cf.gen_float_field(name=df.float_field),
|
||||
cf.gen_double_field(name=df.double_field),
|
||||
cf.gen_string_field(name=df.string_field),
|
||||
cf.gen_json_field(name=df.json_field),
|
||||
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
|
||||
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
|
||||
|
@ -708,7 +841,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
|
|||
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
|
||||
cf.gen_int64_field(name=df.int_field),
|
||||
cf.gen_float_field(name=df.float_field),
|
||||
cf.gen_double_field(name=df.double_field),
|
||||
cf.gen_string_field(name=df.string_field),
|
||||
cf.gen_json_field(name=df.json_field),
|
||||
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
|
||||
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
|
||||
|
@ -828,7 +961,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
|
|||
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
|
||||
cf.gen_int64_field(name=df.int_field),
|
||||
cf.gen_float_field(name=df.float_field),
|
||||
cf.gen_double_field(name=df.double_field),
|
||||
cf.gen_string_field(name=df.string_field),
|
||||
cf.gen_json_field(name=df.json_field),
|
||||
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
|
||||
cf.gen_float_vec_field(name=df.image_float_vec_field, dim=dim),
|
||||
|
@ -943,7 +1076,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
|
|||
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
|
||||
cf.gen_int64_field(name=df.int_field),
|
||||
cf.gen_float_field(name=df.float_field),
|
||||
cf.gen_double_field(name=df.double_field),
|
||||
cf.gen_string_field(name=df.string_field),
|
||||
cf.gen_json_field(name=df.json_field),
|
||||
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
|
||||
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
|
||||
|
|
Loading…
Reference in New Issue