From 3b4e5ac297f77b3cec029296a96d2d07e5933a7c Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Tue, 31 Jan 2023 15:43:50 +0800 Subject: [PATCH] [test]Update bulk insert test (#21891) Signed-off-by: zhuwenxing --- .../bulk_insert/test_bulk_insert.py | 231 ++++++++---------- tests/python_client/check/func_check.py | 8 +- 2 files changed, 103 insertions(+), 136 deletions(-) diff --git a/tests/python_client/bulk_insert/test_bulk_insert.py b/tests/python_client/bulk_insert/test_bulk_insert.py index 579ce6ffbc..681a215e2f 100644 --- a/tests/python_client/bulk_insert/test_bulk_insert.py +++ b/tests/python_client/bulk_insert/test_bulk_insert.py @@ -140,7 +140,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): ) self.collection_wrap.load() log.info(f"wait for load finished and be ready for search") - time.sleep(5) + time.sleep(20) log.info( f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}" ) @@ -220,7 +220,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): ) self.collection_wrap.load() log.info(f"wait for load finished and be ready for search") - time.sleep(5) + time.sleep(20) log.info( f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}" ) @@ -228,7 +228,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): topk = 2 search_data = cf.gen_vectors(nq, dim) search_params = ct.default_search_params - time.sleep(5) + time.sleep(20) res, _ = self.collection_wrap.search( search_data, df.vec_field, @@ -310,11 +310,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert m_partition.num_entities == entities assert self.collection_wrap.num_entities == entities log.debug(state) + time.sleep(20) res, _ = self.utility_wrap.index_building_progress(c_name) exp_res = {"total_rows": entities, "indexed_rows": entities} assert res == exp_res log.info(f"wait for load finished and be ready for search") - time.sleep(5) + time.sleep(20) log.info( f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}" ) @@ -403,7 +404,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert self.collection_wrap.num_entities == entities # verify search and query log.info(f"wait for load finished and be ready for search") - time.sleep(5) + time.sleep(20) search_data = cf.gen_binary_vectors(1, dim)[1] search_params = {"metric_type": "JACCARD", "params": {"nprobe": 10}} res, _ = self.collection_wrap.search( @@ -423,7 +424,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize( - "fields_num_in_file", ["equal", "more", "less"] + "fields_num_in_file", ["more", "less", "equal"] ) # "equal", "more", "less" @pytest.mark.parametrize("dim", [16]) @pytest.mark.parametrize("entities", [500]) @@ -491,7 +492,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert not success if is_row_based: if fields_num_in_file == "less": - failed_reason = f"field '{additional_field}' missed at the row 0" + failed_reason = f"value of field '{additional_field}' is missed" else: failed_reason = f"field '{df.float_field}' is not defined in collection schema" else: @@ -506,12 +507,15 @@ class TestBulkInsert(TestcaseBaseBulkInsert): log.info(f" collection entities: {num_entities}") assert num_entities == entities - # verify no index + # verify index status res, _ = self.collection_wrap.has_index() assert res is True + res, _ = self.utility_wrap.index_building_progress(c_name) + exp_res = {'total_rows': entities, 'indexed_rows': entities} + assert res == exp_res # verify search and query log.info(f"wait for load finished and be ready for search") - time.sleep(5) + time.sleep(20) nq = 3 topk = 10 search_data = cf.gen_vectors(nq, dim) @@ -608,13 +612,14 @@ class TestBulkInsert(TestcaseBaseBulkInsert): num_entities = self.collection_wrap.num_entities log.info(f"collection entities: {num_entities}") assert num_entities == bulk_insert_row + direct_insert_row - # verify no index + # verify index status + time.sleep(20) res, _ = self.utility_wrap.index_building_progress(c_name) exp_res = {'total_rows': num_entities, 'indexed_rows': num_entities} assert res == exp_res # verify search and query log.info(f"wait for load finished and be ready for search") - time.sleep(5) + time.sleep(20) nq = 3 topk = 10 search_data = cf.gen_vectors(nq, dim=dim) @@ -702,7 +707,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert res == exp_res # verify search and query log.info(f"wait for load finished and be ready for search") - time.sleep(5) + time.sleep(20) nq = 3 topk = 10 search_data = cf.gen_vectors(nq, 16) @@ -811,7 +816,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): assert res is True # verify search and query log.info(f"wait for load finished and be ready for search") - time.sleep(5) + time.sleep(20) search_data = cf.gen_vectors(1, dim) search_params = ct.default_search_params res, _ = self.collection_wrap.search( @@ -829,118 +834,14 @@ class TestBulkInsert(TestcaseBaseBulkInsert): results, _ = self.collection_wrap.query(expr=expr) assert len(results) == len(ids) - @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [pytest.param(True, marks=pytest.mark.xfail(reason="issue: https://github.com/milvus-io/milvus/issues/19499"))]) # True, False - @pytest.mark.parametrize("auto_id", [True, False]) # True, False - @pytest.mark.parametrize("dim", [16]) # 16 - @pytest.mark.parametrize("entities", [100]) # 3000 - @pytest.mark.parametrize("file_nums", [32]) # 10 - @pytest.mark.parametrize("multi_folder", [True, False]) # True, False - def test_float_vector_from_multi_files( - self, is_row_based, auto_id, dim, entities, file_nums, multi_folder - ): - """ - collection: auto_id - collection schema: [pk, float_vector, - float_scalar, int_scalar, string_scalar, bool_scalar] - Steps: - 1. create collection - 2. build index and load collection - 3. import data from multiple files - 4. verify the data entities - 5. verify index status - 6. verify search successfully - 7. verify query successfully - """ - files = prepare_bulk_insert_json_files( - minio_endpoint=self.minio_endpoint, - bucket_name=self.bucket_name, - is_row_based=is_row_based, - rows=entities, - dim=dim, - auto_id=auto_id, - data_fields=default_multi_fields, - file_nums=file_nums, - multi_folder=multi_folder, - force=True, - ) - self._connect() - c_name = cf.gen_unique_str("bulk_insert") - fields = [ - cf.gen_int64_field(name=df.pk_field, is_primary=True), - cf.gen_float_vec_field(name=df.vec_field, dim=dim), - cf.gen_int32_field(name=df.int_field), - cf.gen_string_field(name=df.string_field), - cf.gen_bool_field(name=df.bool_field), - cf.gen_float_field(name=df.float_field) - ] - schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) - self.collection_wrap.init_collection(c_name, schema=schema) - # build index - index_params = ct.default_index - self.collection_wrap.create_index( - field_name=df.vec_field, index_params=index_params - ) - # load collection - self.collection_wrap.load() - # import data - t0 = time.time() - err_msg = "row-based import, only allow one JSON file each time" - task_id, _ = self.utility_wrap.do_bulk_insert( - collection_name=c_name, files=files, - check_task=CheckTasks.err_res, check_items={"err_code": 1, "err_msg": err_msg}, - ) - - # logging.info(f"bulk insert task ids:{task_id}") - # success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - # task_ids=[task_id], timeout=90 - # ) - # tt = time.time() - t0 - # log.info(f"bulk insert state:{success} in {tt}") - # if not is_row_based: - # assert not success - # failed_reason = "is duplicated" # "the field xxx is duplicated" - # for state in states.values(): - # assert state.state_name in ["Failed", "Failed and cleaned"] - # assert failed_reason in state.infos.get("failed_reason", "") - # else: - # assert success - # num_entities = self.collection_wrap.num_entities - # log.info(f" collection entities: {num_entities}") - # assert num_entities == entities * file_nums - # - # # verify index built - # res, _ = self.utility_wrap.index_building_progress(c_name) - # exp_res = {'total_rows': entities * file_nums, 'indexed_rows': entities * file_nums} - # assert res == exp_res - # - # # verify search and query - # log.info(f"wait for load finished and be ready for search") - # time.sleep(5) - # nq = 5 - # topk = 1 - # search_data = cf.gen_vectors(nq, dim) - # search_params = ct.default_search_params - # res, _ = self.collection_wrap.search( - # search_data, - # df.vec_field, - # param=search_params, - # limit=topk, - # check_task=CheckTasks.check_search_results, - # check_items={"nq": nq, "limit": topk}, - # ) - # for hits in res: - # ids = hits.ids - # results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") - # assert len(results) == len(ids) @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("is_row_based", [True]) + @pytest.mark.parametrize("is_row_based", [False]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("multi_fields", [True, False]) @pytest.mark.parametrize("dim", [15]) @pytest.mark.parametrize("entities", [200]) - @pytest.mark.skip(reason="stop support for numpy files") + # @pytest.mark.skip(reason="stop support for numpy files") def test_float_vector_from_numpy_file( self, is_row_based, auto_id, multi_fields, dim, entities ): @@ -1051,7 +952,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): ) self.collection_wrap.load() log.info(f"wait for load finished and be ready for search") - time.sleep(5) + time.sleep(20) log.info( f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}" ) @@ -1129,7 +1030,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): ) self.collection_wrap.load() log.info(f"wait for load finished and be ready for search") - time.sleep(5) + time.sleep(20) # the pk value was automatically convert to int from float res, _ = self.collection_wrap.query( expr=f"{df.pk_field} in [3]", output_fields=[df.pk_field] @@ -1194,7 +1095,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): ) self.collection_wrap.load() log.info(f"wait for load finished and be ready for search") - time.sleep(5) + time.sleep(20) search_data = cf.gen_vectors(1, dim) search_params = ct.default_search_params res, _ = self.collection_wrap.search( @@ -1268,7 +1169,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): ) self.collection_wrap.load() log.info(f"wait for load finished and be ready for search") - time.sleep(5) + time.sleep(20) # log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}") search_data = cf.gen_vectors(1, dim) search_params = ct.default_search_params @@ -1342,7 +1243,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # verify search and query log.info(f"wait for load finished and be ready for search") - time.sleep(5) + time.sleep(20) search_data = cf.gen_vectors(1, dim) search_params = ct.default_search_params res, _ = self.collection_wrap.search( @@ -1442,17 +1343,80 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): task_ids=[task_id], timeout=90 ) assert not success - failed_reason = "JSON parser: row count is 0" + failed_reason = "row count is 0" for state in states.values(): assert state.state_name in ["Failed", "Failed and cleaned"] assert failed_reason in state.infos.get("failed_reason", "") + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("is_row_based", [True]) # True, False + @pytest.mark.parametrize("auto_id", [True, False]) # True, False + @pytest.mark.parametrize("dim", [16]) # 16 + @pytest.mark.parametrize("entities", [100]) # 3000 + @pytest.mark.parametrize("file_nums", [32]) # 10 + @pytest.mark.parametrize("multi_folder", [True, False]) # True, False + def test_float_vector_from_multi_files( + self, is_row_based, auto_id, dim, entities, file_nums, multi_folder + ): + """ + collection: auto_id + collection schema: [pk, float_vector, + float_scalar, int_scalar, string_scalar, bool_scalar] + Steps: + 1. create collection + 2. build index and load collection + 3. import data from multiple files + 4. verify the data entities + 5. verify index status + 6. verify search successfully + 7. verify query successfully + """ + files = prepare_bulk_insert_json_files( + minio_endpoint=self.minio_endpoint, + bucket_name=self.bucket_name, + is_row_based=is_row_based, + rows=entities, + dim=dim, + auto_id=auto_id, + data_fields=default_multi_fields, + file_nums=file_nums, + multi_folder=multi_folder, + force=True, + ) + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int32_field(name=df.int_field), + cf.gen_string_field(name=df.string_field), + cf.gen_bool_field(name=df.bool_field), + cf.gen_float_field(name=df.float_field) + ] + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) + self.collection_wrap.init_collection(c_name, schema=schema) + # build index + index_params = ct.default_index + self.collection_wrap.create_index( + field_name=df.vec_field, index_params=index_params + ) + # load collection + self.collection_wrap.load() + # import data + t0 = time.time() + err_msg = "row-based import, only allow one JSON file each time" + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, files=files, + check_task=CheckTasks.err_res, check_items={"err_code": 1, "err_msg": err_msg}, + ) + + @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [8]) # 8 @pytest.mark.parametrize("entities", [100]) # 100 - # @pytest.mark.xfail(reason="issue https://github.com/milvus-io/milvus/issues/19658") def test_wrong_file_type(self, is_row_based, auto_id, dim, entities): """ collection schema: [pk, float_vector] @@ -1507,7 +1471,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): log.info(f"bulk insert state:{success} in {tt}") assert not success failed_reason = f"the file '{files[0]}' has no corresponding field in collection" - failed_reason2 = "unsupportted file type" + failed_reason2 = "unsupported file type" for state in states.values(): assert state.state_name in ["Failed", "Failed and cleaned"] assert failed_reason in state.infos.get("failed_reason", "") or \ @@ -1560,7 +1524,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): assert not success if is_row_based: value = df.vec_field # if auto_id else df.pk_field - failed_reason = f"JSON parser: invalid row-based JSON format, the key {value} is not found" + failed_reason = f"invalid JSON format, the root key should be 'rows', but get {value}" else: failed_reason = "JSON parse: row count is 0" for state in states.values(): @@ -1779,7 +1743,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): ) log.info(f"bulk insert state:{success}") assert not success - failed_reason = f"array size {dim} doesn't equal to vector dimension {wrong_dim} of field vectors" + failed_reason = f"array size {dim} doesn't equal to vector dimension {wrong_dim} of field 'vectors'" for state in states.values(): assert state.state_name in ["Failed", "Failed and cleaned"] assert failed_reason in state.infos.get("failed_reason", "") @@ -1818,7 +1782,6 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): @pytest.mark.parametrize("is_row_based", [True]) @pytest.mark.parametrize("dim", [4]) @pytest.mark.parametrize("entities", [200]) - @pytest.mark.xfail(reason="issue: https://github.com/milvus-io/milvus/issues/19553") def test_non_existing_partition(self, is_row_based, dim, entities): """ collection: create a collection @@ -1845,7 +1808,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data into a non existing partition p_name = "non_existing" - err_msg = f" partition {p_name} does not exist" + err_msg = f"partition ID not found for partition name '{p_name}'" task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, partition_name=p_name, @@ -1853,6 +1816,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): check_task=CheckTasks.err_res, check_items={"err_code": 11, "err_msg": err_msg}, ) + print(task_id) @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("is_row_based", [True]) @@ -1901,7 +1865,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): log.info(f"bulk insert state:{success}") assert not success failed_reason = ( - f"doesn't equal to vector dimension {dim} of field vectors" + f"doesn't equal to vector dimension {dim} of field 'vectors'" ) for state in states.values(): assert state.state_name in ["Failed", "Failed and cleaned"] @@ -2234,6 +2198,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): ] schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) + # import data task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files @@ -2522,7 +2487,7 @@ class TestBulkInsertAdvanced(TestcaseBaseBulkInsert): ) self.collection_wrap.load() log.info(f"wait for load finished and be ready for search") - time.sleep(5) + time.sleep(20) loaded_segs = len(self.utility_wrap.get_query_segment_info(c_name)[0]) log.info(f"query seg info: {loaded_segs} segs loaded.") search_data = cf.gen_vectors(1, dim) diff --git a/tests/python_client/check/func_check.py b/tests/python_client/check/func_check.py index 89f2326af9..1565f1e6a4 100644 --- a/tests/python_client/check/func_check.py +++ b/tests/python_client/check/func_check.py @@ -99,7 +99,9 @@ class ResponseChecker: assert len(error_dict) > 0 if isinstance(res, Error): error_code = error_dict[ct.err_code] - assert res.code == error_code or error_dict[ct.err_msg] in res.message + assert res.code == error_code or error_dict[ct.err_msg] in res.message, f"expected error code: {error_code} \ + or error message: {error_dict[ct.err_msg]}, \ + actual error code: {res.code}, actual error message: {res.message}" else: log.error("[CheckFunc] Response of API is not an error: %s" % str(res)) assert False @@ -227,8 +229,8 @@ class ResponseChecker: log.error("search_results_check: limit(topK) searched (%d) " "is not equal with expected (%d)" % (len(hits), check_items["limit"])) - assert len(hits) == check_items["limit"] - assert len(hits.ids) == check_items["limit"] + assert len(hits) == check_items["limit"], f"expect {check_items['limit']}, but got {len(hits)}" + assert len(hits.ids) == check_items["limit"], f"expect {check_items['limit']}, but got {len(hits.ids)}" else: if check_items.get("ids", None) is not None: ids_match = pc.list_contain_check(list(hits.ids),