[test]Update bulk insert test (#21891)

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/21811/head
zhuwenxing 2023-01-31 15:43:50 +08:00 committed by GitHub
parent 02cfcbbd2f
commit 3b4e5ac297
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 103 additions and 136 deletions

View File

@ -140,7 +140,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
time.sleep(20)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
@ -220,7 +220,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
time.sleep(20)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
@ -228,7 +228,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
topk = 2
search_data = cf.gen_vectors(nq, dim)
search_params = ct.default_search_params
time.sleep(5)
time.sleep(20)
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
@ -310,11 +310,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert m_partition.num_entities == entities
assert self.collection_wrap.num_entities == entities
log.debug(state)
time.sleep(20)
res, _ = self.utility_wrap.index_building_progress(c_name)
exp_res = {"total_rows": entities, "indexed_rows": entities}
assert res == exp_res
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
time.sleep(20)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
@ -403,7 +404,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert self.collection_wrap.num_entities == entities
# verify search and query
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
time.sleep(20)
search_data = cf.gen_binary_vectors(1, dim)[1]
search_params = {"metric_type": "JACCARD", "params": {"nprobe": 10}}
res, _ = self.collection_wrap.search(
@ -423,7 +424,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize(
"fields_num_in_file", ["equal", "more", "less"]
"fields_num_in_file", ["more", "less", "equal"]
) # "equal", "more", "less"
@pytest.mark.parametrize("dim", [16])
@pytest.mark.parametrize("entities", [500])
@ -491,7 +492,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert not success
if is_row_based:
if fields_num_in_file == "less":
failed_reason = f"field '{additional_field}' missed at the row 0"
failed_reason = f"value of field '{additional_field}' is missed"
else:
failed_reason = f"field '{df.float_field}' is not defined in collection schema"
else:
@ -506,12 +507,15 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
log.info(f" collection entities: {num_entities}")
assert num_entities == entities
# verify no index
# verify index status
res, _ = self.collection_wrap.has_index()
assert res is True
res, _ = self.utility_wrap.index_building_progress(c_name)
exp_res = {'total_rows': entities, 'indexed_rows': entities}
assert res == exp_res
# verify search and query
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
time.sleep(20)
nq = 3
topk = 10
search_data = cf.gen_vectors(nq, dim)
@ -608,13 +612,14 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
num_entities = self.collection_wrap.num_entities
log.info(f"collection entities: {num_entities}")
assert num_entities == bulk_insert_row + direct_insert_row
# verify no index
# verify index status
time.sleep(20)
res, _ = self.utility_wrap.index_building_progress(c_name)
exp_res = {'total_rows': num_entities, 'indexed_rows': num_entities}
assert res == exp_res
# verify search and query
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
time.sleep(20)
nq = 3
topk = 10
search_data = cf.gen_vectors(nq, dim=dim)
@ -702,7 +707,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert res == exp_res
# verify search and query
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
time.sleep(20)
nq = 3
topk = 10
search_data = cf.gen_vectors(nq, 16)
@ -811,7 +816,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert res is True
# verify search and query
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
time.sleep(20)
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
@ -829,118 +834,14 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
results, _ = self.collection_wrap.query(expr=expr)
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [pytest.param(True, marks=pytest.mark.xfail(reason="issue: https://github.com/milvus-io/milvus/issues/19499"))]) # True, False
@pytest.mark.parametrize("auto_id", [True, False]) # True, False
@pytest.mark.parametrize("dim", [16]) # 16
@pytest.mark.parametrize("entities", [100]) # 3000
@pytest.mark.parametrize("file_nums", [32]) # 10
@pytest.mark.parametrize("multi_folder", [True, False]) # True, False
def test_float_vector_from_multi_files(
self, is_row_based, auto_id, dim, entities, file_nums, multi_folder
):
"""
collection: auto_id
collection schema: [pk, float_vector,
float_scalar, int_scalar, string_scalar, bool_scalar]
Steps:
1. create collection
2. build index and load collection
3. import data from multiple files
4. verify the data entities
5. verify index status
6. verify search successfully
7. verify query successfully
"""
files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=is_row_based,
rows=entities,
dim=dim,
auto_id=auto_id,
data_fields=default_multi_fields,
file_nums=file_nums,
multi_folder=multi_folder,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
cf.gen_int32_field(name=df.int_field),
cf.gen_string_field(name=df.string_field),
cf.gen_bool_field(name=df.bool_field),
cf.gen_float_field(name=df.float_field)
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# build index
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
# load collection
self.collection_wrap.load()
# import data
t0 = time.time()
err_msg = "row-based import, only allow one JSON file each time"
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files,
check_task=CheckTasks.err_res, check_items={"err_code": 1, "err_msg": err_msg},
)
# logging.info(f"bulk insert task ids:{task_id}")
# success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
# task_ids=[task_id], timeout=90
# )
# tt = time.time() - t0
# log.info(f"bulk insert state:{success} in {tt}")
# if not is_row_based:
# assert not success
# failed_reason = "is duplicated" # "the field xxx is duplicated"
# for state in states.values():
# assert state.state_name in ["Failed", "Failed and cleaned"]
# assert failed_reason in state.infos.get("failed_reason", "")
# else:
# assert success
# num_entities = self.collection_wrap.num_entities
# log.info(f" collection entities: {num_entities}")
# assert num_entities == entities * file_nums
#
# # verify index built
# res, _ = self.utility_wrap.index_building_progress(c_name)
# exp_res = {'total_rows': entities * file_nums, 'indexed_rows': entities * file_nums}
# assert res == exp_res
#
# # verify search and query
# log.info(f"wait for load finished and be ready for search")
# time.sleep(5)
# nq = 5
# topk = 1
# search_data = cf.gen_vectors(nq, dim)
# search_params = ct.default_search_params
# res, _ = self.collection_wrap.search(
# search_data,
# df.vec_field,
# param=search_params,
# limit=topk,
# check_task=CheckTasks.check_search_results,
# check_items={"nq": nq, "limit": topk},
# )
# for hits in res:
# ids = hits.ids
# results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
# assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("is_row_based", [False])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("multi_fields", [True, False])
@pytest.mark.parametrize("dim", [15])
@pytest.mark.parametrize("entities", [200])
@pytest.mark.skip(reason="stop support for numpy files")
# @pytest.mark.skip(reason="stop support for numpy files")
def test_float_vector_from_numpy_file(
self, is_row_based, auto_id, multi_fields, dim, entities
):
@ -1051,7 +952,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
time.sleep(20)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
@ -1129,7 +1030,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
time.sleep(20)
# the pk value was automatically convert to int from float
res, _ = self.collection_wrap.query(
expr=f"{df.pk_field} in [3]", output_fields=[df.pk_field]
@ -1194,7 +1095,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
time.sleep(20)
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
@ -1268,7 +1169,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
time.sleep(20)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
@ -1342,7 +1243,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# verify search and query
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
time.sleep(20)
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
@ -1442,17 +1343,80 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
task_ids=[task_id], timeout=90
)
assert not success
failed_reason = "JSON parser: row count is 0"
failed_reason = "row count is 0"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True]) # True, False
@pytest.mark.parametrize("auto_id", [True, False]) # True, False
@pytest.mark.parametrize("dim", [16]) # 16
@pytest.mark.parametrize("entities", [100]) # 3000
@pytest.mark.parametrize("file_nums", [32]) # 10
@pytest.mark.parametrize("multi_folder", [True, False]) # True, False
def test_float_vector_from_multi_files(
self, is_row_based, auto_id, dim, entities, file_nums, multi_folder
):
"""
collection: auto_id
collection schema: [pk, float_vector,
float_scalar, int_scalar, string_scalar, bool_scalar]
Steps:
1. create collection
2. build index and load collection
3. import data from multiple files
4. verify the data entities
5. verify index status
6. verify search successfully
7. verify query successfully
"""
files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=is_row_based,
rows=entities,
dim=dim,
auto_id=auto_id,
data_fields=default_multi_fields,
file_nums=file_nums,
multi_folder=multi_folder,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
cf.gen_int32_field(name=df.int_field),
cf.gen_string_field(name=df.string_field),
cf.gen_bool_field(name=df.bool_field),
cf.gen_float_field(name=df.float_field)
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# build index
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
# load collection
self.collection_wrap.load()
# import data
t0 = time.time()
err_msg = "row-based import, only allow one JSON file each time"
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files,
check_task=CheckTasks.err_res, check_items={"err_code": 1, "err_msg": err_msg},
)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [8]) # 8
@pytest.mark.parametrize("entities", [100]) # 100
# @pytest.mark.xfail(reason="issue https://github.com/milvus-io/milvus/issues/19658")
def test_wrong_file_type(self, is_row_based, auto_id, dim, entities):
"""
collection schema: [pk, float_vector]
@ -1507,7 +1471,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
log.info(f"bulk insert state:{success} in {tt}")
assert not success
failed_reason = f"the file '{files[0]}' has no corresponding field in collection"
failed_reason2 = "unsupportted file type"
failed_reason2 = "unsupported file type"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "") or \
@ -1560,7 +1524,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
assert not success
if is_row_based:
value = df.vec_field # if auto_id else df.pk_field
failed_reason = f"JSON parser: invalid row-based JSON format, the key {value} is not found"
failed_reason = f"invalid JSON format, the root key should be 'rows', but get {value}"
else:
failed_reason = "JSON parse: row count is 0"
for state in states.values():
@ -1779,7 +1743,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
)
log.info(f"bulk insert state:{success}")
assert not success
failed_reason = f"array size {dim} doesn't equal to vector dimension {wrong_dim} of field vectors"
failed_reason = f"array size {dim} doesn't equal to vector dimension {wrong_dim} of field 'vectors'"
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
assert failed_reason in state.infos.get("failed_reason", "")
@ -1818,7 +1782,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("dim", [4])
@pytest.mark.parametrize("entities", [200])
@pytest.mark.xfail(reason="issue: https://github.com/milvus-io/milvus/issues/19553")
def test_non_existing_partition(self, is_row_based, dim, entities):
"""
collection: create a collection
@ -1845,7 +1808,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
self.collection_wrap.init_collection(c_name, schema=schema)
# import data into a non existing partition
p_name = "non_existing"
err_msg = f" partition {p_name} does not exist"
err_msg = f"partition ID not found for partition name '{p_name}'"
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=p_name,
@ -1853,6 +1816,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
check_task=CheckTasks.err_res,
check_items={"err_code": 11, "err_msg": err_msg},
)
print(task_id)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True])
@ -1901,7 +1865,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
log.info(f"bulk insert state:{success}")
assert not success
failed_reason = (
f"doesn't equal to vector dimension {dim} of field vectors"
f"doesn't equal to vector dimension {dim} of field 'vectors'"
)
for state in states.values():
assert state.state_name in ["Failed", "Failed and cleaned"]
@ -2234,6 +2198,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert):
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
@ -2522,7 +2487,7 @@ class TestBulkInsertAdvanced(TestcaseBaseBulkInsert):
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(5)
time.sleep(20)
loaded_segs = len(self.utility_wrap.get_query_segment_info(c_name)[0])
log.info(f"query seg info: {loaded_segs} segs loaded.")
search_data = cf.gen_vectors(1, dim)

View File

@ -99,7 +99,9 @@ class ResponseChecker:
assert len(error_dict) > 0
if isinstance(res, Error):
error_code = error_dict[ct.err_code]
assert res.code == error_code or error_dict[ct.err_msg] in res.message
assert res.code == error_code or error_dict[ct.err_msg] in res.message, f"expected error code: {error_code} \
or error message: {error_dict[ct.err_msg]}, \
actual error code: {res.code}, actual error message: {res.message}"
else:
log.error("[CheckFunc] Response of API is not an error: %s" % str(res))
assert False
@ -227,8 +229,8 @@ class ResponseChecker:
log.error("search_results_check: limit(topK) searched (%d) "
"is not equal with expected (%d)"
% (len(hits), check_items["limit"]))
assert len(hits) == check_items["limit"]
assert len(hits.ids) == check_items["limit"]
assert len(hits) == check_items["limit"], f"expect {check_items['limit']}, but got {len(hits)}"
assert len(hits.ids) == check_items["limit"], f"expect {check_items['limit']}, but got {len(hits.ids)}"
else:
if check_items.get("ids", None) is not None:
ids_match = pc.list_contain_check(list(hits.ids),