From 939ee0c257efbbf26753a6681cb5a30c6ec7a7dd Mon Sep 17 00:00:00 2001 From: KumaJie <61139665+KumaJie@users.noreply.github.com> Date: Tue, 7 Nov 2023 20:42:22 +0800 Subject: [PATCH] Add csv bulk insert test (#28189) Signed-off-by: kuma <675613722@qq.com> Co-authored-by: kuma <675613722@qq.com> --- .../python_client/common/bulk_insert_data.py | 92 ++++- .../testcases/test_bulk_insert.py | 357 ++++++++++++++++++ 2 files changed, 446 insertions(+), 3 deletions(-) diff --git a/tests/python_client/common/bulk_insert_data.py b/tests/python_client/common/bulk_insert_data.py index 9dcf4ac2ee..ec613de3cb 100644 --- a/tests/python_client/common/bulk_insert_data.py +++ b/tests/python_client/common/bulk_insert_data.py @@ -474,8 +474,8 @@ def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket file names list """ data_fields_c = copy.deepcopy(data_fields) - print(f"data_fields: {data_fields}") - print(f"data_fields_c: {data_fields_c}") + log.info(f"data_fields: {data_fields}") + log.info(f"data_fields_c: {data_fields_c}") files = gen_json_files(is_row_based=is_row_based, rows=rows, dim=dim, auto_id=auto_id, str_pk=str_pk, float_vector=float_vector, data_fields=data_fields_c, file_nums=file_nums, multi_folder=multi_folder, @@ -521,4 +521,90 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke file_nums=file_nums, force=force) copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) - return files \ No newline at end of file + return files + +def gen_csv_file(file, float_vector, data_fields, rows, dim, start_uid): + with open(file, "w") as f: + # field name + for i in range(len(data_fields)): + f.write(data_fields[i]) + if i != len(data_fields) - 1: + f.write(",") + f.write("\n") + + for i in range(rows): + # field value + for j in range(len(data_fields)): + data_field = data_fields[j] + if data_field == DataField.pk_field: + f.write(str(i + start_uid)) + if data_field == DataField.int_field: + f.write(str(random.randint(-999999, 9999999))) + if data_field == DataField.float_field: + f.write(str(random.random())) + if data_field == DataField.string_field: + f.write(str(gen_unique_str())) + if data_field == DataField.bool_field: + f.write(str(random.choice(["true", "false"]))) + if data_field == DataField.vec_field: + vectors = gen_float_vectors(1, dim) if float_vector else gen_binary_vectors(1, dim//8) + f.write('"' + ','.join(str(x) for x in vectors) + '"') + if j != len(data_fields) - 1: + f.write(",") + f.write("\n") + + +def gen_csv_files(rows, dim, auto_id, float_vector, data_fields, file_nums, force): + files = [] + start_uid = 0 + if (not auto_id) and (DataField.pk_field not in data_fields): + data_fields.append(DataField.pk_field) + for i in range(file_nums): + file_name = gen_file_name(is_row_based=True, rows=rows, dim=dim, auto_id=auto_id, float_vector=float_vector, data_fields=data_fields, file_num=i, file_type=".csv", str_pk=False, err_type="") + file = f"{data_source}/{file_name}" + if not os.path.exists(file) or force: + gen_csv_file(file, float_vector, data_fields, rows, dim, start_uid) + start_uid += rows + files.append(file_name) + return files + + +def prepare_bulk_insert_csv_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, auto_id=True, float_vector=True, data_fields=[], file_nums=1, force=False): + """ + Generate row based files based on params in csv format and copy them to minio + + :param minio_endpoint: the minio_endpoint of minio + :type minio_endpoint: str + + :param bucket_name: the bucket name of Milvus + :type bucket_name: str + + :param rows: the number entities to be generated in the file + :type rows: int + + :param dim: dim of vector data + :type dim: int + + :param auto_id: generate primary key data or not + :type auto_id: bool + + :param float_vector: generate float vectors or binary vectors + :type float_vector: boolean + + :param: data_fields: data fields to be generated in the file(s): + It supports one or all of [pk, vectors, int, float, string, boolean] + Note: it automatically adds pk field if auto_id=False + :type data_fields: list + + :param file_nums: file numbers to be generated + :type file_nums: int + + :param force: re-generate the file(s) regardless existing or not + :type force: boolean + """ + data_fields_c = copy.deepcopy(data_fields) + log.info(f"data_fields: {data_fields}") + log.info(f"data_fields_c: {data_fields_c}") + files = gen_csv_files(rows=rows, dim=dim, auto_id=auto_id, float_vector=float_vector, data_fields=data_fields_c, file_nums=file_nums, force=force) + copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) + return files diff --git a/tests/python_client/testcases/test_bulk_insert.py b/tests/python_client/testcases/test_bulk_insert.py index ce13688559..15bbaa02e4 100644 --- a/tests/python_client/testcases/test_bulk_insert.py +++ b/tests/python_client/testcases/test_bulk_insert.py @@ -12,6 +12,7 @@ from utils.util_log import test_log as log from common.bulk_insert_data import ( prepare_bulk_insert_json_files, prepare_bulk_insert_numpy_files, + prepare_bulk_insert_csv_files, DataField as df, ) @@ -917,6 +918,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # verify search and query log.info(f"wait for load finished and be ready for search") self.collection_wrap.load(_refresh=True) + time.sleep(2) search_data = cf.gen_vectors(1, dim) search_params = ct.default_search_params res, _ = self.collection_wrap.search( @@ -937,3 +939,358 @@ class TestBulkInsert(TestcaseBaseBulkInsert): num_entities += p.num_entities assert num_entities == entities * file_nums + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("auto_id", [True, False]) + @pytest.mark.parametrize("partition_key_field", [df.int_field, df.string_field]) + def test_partition_key_on_csv_file(self, auto_id, partition_key_field): + """ + collection: auto_id, customized_id + collection scheam: [pk, float_vector, int64, varchar, bool, float] + Step: + 1. create collection with partition key enabled + 2. import data + 3. verify the data entities equal the import data and distributed by values of partition key field + 4. load the collection + 5. verify search successfully + 6. verify query successfully + """ + dim = 12 + entities = 200 + files = prepare_bulk_insert_csv_files( + minio_endpoint=self.minio_endpoint, + bucket_name=self.bucket_name, + rows=entities, + dim=dim, + auto_id=auto_id, + data_fields=default_multi_fields, + force=True + ) + self._connect() + c_name = cf.gen_unique_str("bulk_parkey") + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + cf.gen_int64_field(name=df.int_field, is_partition_key=(partition_key_field == df.int_field)), + cf.gen_string_field(name=df.string_field, is_partition_key=(partition_key_field == df.string_field)), + cf.gen_bool_field(name=df.bool_field), + cf.gen_float_field(name=df.float_field), + ] + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) + self.collection_wrap.init_collection(c_name, schema=schema) + assert len(self.collection_wrap.partitions) == ct.default_partition_num + + # import data + t0 = time.time() + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, + partition_name=None, + files=files, + ) + logging.info(f"bulk insert task id:{task_id}") + success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=[task_id], timeout=300 + ) + tt = time.time() - t0 + log.info(f"bulk insert state:{success} in {tt}") + assert success + + num_entities = self.collection_wrap.num_entities + log.info(f" collection entities: {num_entities}") + assert num_entities == entities + + # verify imported data is available for search + index_params = ct.default_index + self.collection_wrap.create_index( + field_name=df.vec_field, index_params=index_params + ) + self.collection_wrap.load() + log.info(f"wait for load finished and be ready for search") + time.sleep(10) + log.info( + f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}" + ) + nq = 2 + topk = 2 + search_data = cf.gen_vectors(nq, dim) + search_params = ct.default_search_params + res, _ = self.collection_wrap.search( + search_data, + df.vec_field, + param=search_params, + limit=topk, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, "limit": topk}, + ) + for hits in res: + ids = hits.ids + results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") + assert len(results) == len(ids) + + # verify data was bulk inserted into different partitions + num_entities = 0 + empty_partition_num = 0 + for p in self.collection_wrap.partitions: + if p.num_entities == 0: + empty_partition_num += 1 + num_entities += p.num_entities + assert num_entities == entities + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("auto_id", [True, False]) + @pytest.mark.parametrize("dim", [128]) + @pytest.mark.parametrize("entities", [100]) + def test_float_vector_csv(self, auto_id, dim, entities): + """ + collection: auto_id, customized_id + collection schema: [pk, float_vector] + Steps: + 1. create collection + 2. import data + 3. verify the data entities equal the import data + 4. load the collection + 5. verify search successfully + 6. verify query successfully + """ + files = prepare_bulk_insert_csv_files( + minio_endpoint=self.minio_endpoint, + bucket_name=self.bucket_name, + rows=entities, + dim=dim, + auto_id=auto_id, + data_fields=default_vec_only_fields, + force=True + ) + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + ] + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) + self.collection_wrap.init_collection(c_name, schema=schema) + # import data + t0 = time.time() + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, + partition_name=None, + files=files + ) + logging.info(f"bulk insert task id:{task_id}") + success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=[task_id], timeout=300 + ) + tt = time.time() - t0 + log.info(f"bulk insert state:{success} in {tt}") + num_entities = self.collection_wrap.num_entities + log.info(f"collection entities:{num_entities}") + assert num_entities == entities + + # verify imported data is available for search + index_params = ct.default_index + self.collection_wrap.create_index( + field_name=df.vec_field, index_params=index_params + ) + time.sleep(2) + self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300) + res, _ = self.utility_wrap.index_building_progress(c_name) + log.info(f"index building progress: {res}") + self.collection_wrap.load() + self.collection_wrap.load(_refresh=True) + log.info(f"wait for load finished and be ready for search") + time.sleep(2) + log.info( + f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}" + ) + nq = 2 + topk = 2 + search_data = cf.gen_vectors(nq, dim) + search_params = ct.default_search_params + res, _ = self.collection_wrap.search( + search_data, + df.vec_field, + param=search_params, + limit=topk, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, "limit": topk}, + ) + for hits in res: + ids = hits.ids + results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") + assert len(results) == len(ids) + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("auto_id", [True, False]) + @pytest.mark.parametrize("dim", [128]) + @pytest.mark.parametrize("entities", [2000]) + def test_binary_vector_csv(self, auto_id, dim, entities): + """ + collection: auto_id, customized_id + collection schema: [pk, int64, binary_vector] + Step: + 1. create collection + 2. create index and load collection + 3. import data + 4. verify data entities + 5. load collection + 6. verify search successfully + 7. verify query successfully + """ + files = prepare_bulk_insert_csv_files( + minio_endpoint=self.minio_endpoint, + bucket_name=self.bucket_name, + rows=entities, + dim=dim, + auto_id=auto_id, + float_vector=False, + data_fields=default_vec_only_fields, + force=True + ) + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_binary_vec_field(name=df.vec_field, dim=dim) + ] + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) + self.collection_wrap.init_collection(c_name, schema=schema) + # build index before bulk insert + binary_index_params = { + "index_type": "BIN_IVF_FLAT", + "metric_type": "JACCARD", + "params": {"nlist": 64}, + } + self.collection_wrap.create_index( + field_name=df.vec_field, index_params=binary_index_params + ) + # load collection + self.collection_wrap.load() + # import data + t0 = time.time() + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, + partition_name=None, + files=files + ) + logging.info(f"bulk insert task ids:{task_id}") + success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=[task_id], timeout=300 + ) + tt = time.time() - t0 + log.info(f"bulk insert state:{success} in {tt}") + assert success + time.sleep(2) + self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300) + res, _ = self.utility_wrap.index_building_progress(c_name) + log.info(f"index building progress: {res}") + + # verify num entities + assert self.collection_wrap.num_entities == entities + # verify search and query + log.info(f"wait for load finished and be ready for search") + self.collection_wrap.load(_refresh=True) + time.sleep(2) + search_data = cf.gen_binary_vectors(1, dim)[1] + search_params = {"metric_type": "JACCARD", "params": {"nprobe": 10}} + res, _ = self.collection_wrap.search( + search_data, + df.vec_field, + param=search_params, + limit=1, + check_task=CheckTasks.check_search_results, + check_items={"nq": 1, "limit": 1}, + ) + for hits in res: + ids = hits.ids + results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") + assert len(results) == len(ids) + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("auto_id", [True, False]) + @pytest.mark.parametrize("dim", [128]) + @pytest.mark.parametrize("entities", [2000]) + def test_partition_csv(self, auto_id, dim, entities): + """ + collection schema: [pk, int64, string, float_vector] + Step: + 1. create collection and partition + 2. bulid index and load partition + 3. import data into the partition + 4. verify num entities + 5. verify index status + 6. verify search and query + """ + data_fields = [df.int_field, df.string_field, df.vec_field] + files = prepare_bulk_insert_csv_files( + minio_endpoint=self.minio_endpoint, + bucket_name=self.bucket_name, + rows=entities, + dim=dim, + auto_id=auto_id, + data_fields=data_fields, + force=True + ) + + self._connect() + c_name = cf.gen_unique_str("bulk_insert_partition") + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True), + cf.gen_int64_field(name=df.int_field), + cf.gen_string_field(name=df.string_field), + cf.gen_float_vec_field(name=df.vec_field, dim=dim) + ] + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) + self.collection_wrap.init_collection(c_name, schema=schema) + # create a partition + p_name = cf.gen_unique_str("bulk_insert_partition") + m_partition, _ = self.collection_wrap.create_partition(partition_name=p_name) + # build index + index_params = ct.default_index + self.collection_wrap.create_index( + field_name=df.vec_field, index_params=index_params + ) + # load before bulk insert + self.collection_wrap.load(partition_names=[p_name]) + + t0 = time.time() + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, + partition_name=p_name, + files = files + ) + logging.info(f"bulk insert task ids:{task_id}") + success, state = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=[task_id], timeout=300 + ) + tt = time.time() - t0 + log.info(f"bulk insert state:{success} in {tt}") + assert success + assert m_partition.num_entities == entities + assert self.collection_wrap.num_entities == entities + log.debug(state) + time.sleep(2) + self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300) + res, _ = self.utility_wrap.index_building_progress(c_name) + log.info(f"index building progress: {res}") + log.info(f"wait for load finished and be ready for search") + self.collection_wrap.load(_refresh=True) + time.sleep(2) + log.info( + f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}" + ) + + nq = 10 + topk = 5 + search_data = cf.gen_vectors(nq, dim) + search_params = ct.default_search_params + res, _ = self.collection_wrap.search( + search_data, + df.vec_field, + param=search_params, + limit=topk, + check_task=CheckTasks.check_search_results, + check_items={"nq": nq, "limit": topk}, + ) + for hits in res: + ids = hits.ids + results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") + assert len(results) == len(ids) \ No newline at end of file