From d9a9eefa49b9972e7c704506900b14523b678528 Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Wed, 20 Dec 2023 14:38:40 +0800 Subject: [PATCH] test: add bulk insert benchmark for different file size (#29329) add bulk insert benchmark for different file size pr: https://github.com/milvus-io/milvus/pull/29320 Signed-off-by: zhuwenxing --- .../bulk_insert/test_bulk_insert_bench.py | 232 ++++++++++++++++++ ...st_bulk_insert_perf_with_cohere_dataset.py | 4 +- .../python_client/common/bulk_insert_data.py | 77 +++++- 3 files changed, 302 insertions(+), 11 deletions(-) create mode 100644 tests/python_client/bulk_insert/test_bulk_insert_bench.py diff --git a/tests/python_client/bulk_insert/test_bulk_insert_bench.py b/tests/python_client/bulk_insert/test_bulk_insert_bench.py new file mode 100644 index 0000000000..e64b6b3d8a --- /dev/null +++ b/tests/python_client/bulk_insert/test_bulk_insert_bench.py @@ -0,0 +1,232 @@ +import logging +import time +import pytest +from pymilvus import DataType +from pathlib import Path +from base.client_base import TestcaseBase +from common import common_func as cf +from common.milvus_sys import MilvusSys +from common.common_type import CaseLabel +from utils.util_log import test_log as log +from common.bulk_insert_data import ( + prepare_bulk_insert_new_json_files, + prepare_bulk_insert_numpy_files, + prepare_bulk_insert_parquet_files, + DataField as df, +) + + +default_vec_only_fields = [df.vec_field] +default_multi_fields = [ + df.vec_field, + df.int_field, + df.string_field, + df.bool_field, + df.float_field, + df.array_int_field +] +default_vec_n_int_fields = [df.vec_field, df.int_field, df.array_int_field] + + +# milvus_ns = "chaos-testing" +base_dir = "/tmp/bulk_insert_data" + + +def entity_suffix(entities): + if entities // 1000000 > 0: + suffix = f"{entities // 1000000}m" + elif entities // 1000 > 0: + suffix = f"{entities // 1000}k" + else: + suffix = f"{entities}" + return suffix + + +class TestcaseBaseBulkInsert(TestcaseBase): + + @pytest.fixture(scope="function", autouse=True) + def init_minio_client(self, minio_host): + Path("/tmp/bulk_insert_data").mkdir(parents=True, exist_ok=True) + self._connect() + self.milvus_sys = MilvusSys(alias='default') + ms = MilvusSys() + minio_port = "9000" + self.minio_endpoint = f"{minio_host}:{minio_port}" + self.bucket_name = ms.index_nodes[0]["infos"]["system_configurations"][ + "minio_bucket_name" + ] + + +class TestBulkInsertPerf(TestcaseBaseBulkInsert): + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("dim", [128]) # 128 + @pytest.mark.parametrize("file_size", [1, 10, 15]) # file size in GB + @pytest.mark.parametrize("file_nums", [1]) + @pytest.mark.parametrize("array_len", [100]) + @pytest.mark.parametrize("enable_dynamic_field", [False]) + def test_bulk_insert_all_field_with_parquet(self, auto_id, dim, file_size, file_nums, array_len, enable_dynamic_field): + """ + collection schema 1: [pk, int64, float64, string float_vector] + data file: vectors.parquet and uid.parquet, + Steps: + 1. create collection + 2. import data + 3. verify + """ + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id), + cf.gen_int64_field(name=df.int_field), + cf.gen_float_field(name=df.float_field), + cf.gen_double_field(name=df.double_field), + cf.gen_json_field(name=df.json_field), + cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64), + cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT), + cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR), + cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + ] + data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + files = prepare_bulk_insert_parquet_files( + minio_endpoint=self.minio_endpoint, + bucket_name=self.bucket_name, + rows=3000, + dim=dim, + data_fields=data_fields, + file_size=file_size, + file_nums=file_nums, + array_length=array_len, + enable_dynamic_field=enable_dynamic_field, + force=True, + ) + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) + self.collection_wrap.init_collection(c_name, schema=schema) + + # import data + t0 = time.time() + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, files=files + ) + logging.info(f"bulk insert task ids:{task_id}") + success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=[task_id], timeout=1800 + ) + tt = time.time() - t0 + log.info(f"bulk insert state:{success} in {tt} with states:{states}") + assert success + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("dim", [128]) # 128 + @pytest.mark.parametrize("file_size", [1, 10, 15]) # file size in GB + @pytest.mark.parametrize("file_nums", [1]) + @pytest.mark.parametrize("array_len", [100]) + @pytest.mark.parametrize("enable_dynamic_field", [False]) + def test_bulk_insert_all_field_with_json(self, auto_id, dim, file_size, file_nums, array_len, enable_dynamic_field): + """ + collection schema 1: [pk, int64, float64, string float_vector] + data file: vectors.parquet and uid.parquet, + Steps: + 1. create collection + 2. import data + 3. verify + """ + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id), + cf.gen_int64_field(name=df.int_field), + cf.gen_float_field(name=df.float_field), + cf.gen_double_field(name=df.double_field), + cf.gen_json_field(name=df.json_field), + cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64), + cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT), + cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR), + cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + ] + data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + files = prepare_bulk_insert_new_json_files( + minio_endpoint=self.minio_endpoint, + bucket_name=self.bucket_name, + rows=3000, + dim=dim, + data_fields=data_fields, + file_size=file_size, + file_nums=file_nums, + array_length=array_len, + enable_dynamic_field=enable_dynamic_field, + force=True, + ) + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) + self.collection_wrap.init_collection(c_name, schema=schema) + + # import data + t0 = time.time() + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, files=files + ) + logging.info(f"bulk insert task ids:{task_id}") + success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=[task_id], timeout=1800 + ) + tt = time.time() - t0 + log.info(f"bulk insert state:{success} in {tt} with states:{states}") + assert success + + + @pytest.mark.tags(CaseLabel.L3) + @pytest.mark.parametrize("auto_id", [True]) + @pytest.mark.parametrize("dim", [128]) # 128 + @pytest.mark.parametrize("file_size", [1, 10, 15]) # file size in GB + @pytest.mark.parametrize("file_nums", [1]) + @pytest.mark.parametrize("enable_dynamic_field", [False]) + def test_bulk_insert_all_field_with_numpy(self, auto_id, dim, file_size, file_nums, enable_dynamic_field): + """ + collection schema 1: [pk, int64, float64, string float_vector] + data file: vectors.parquet and uid.parquet, + Steps: + 1. create collection + 2. import data + 3. verify + """ + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id), + cf.gen_int64_field(name=df.int_field), + cf.gen_float_field(name=df.float_field), + cf.gen_double_field(name=df.double_field), + cf.gen_json_field(name=df.json_field), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + ] + data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] + files = prepare_bulk_insert_numpy_files( + minio_endpoint=self.minio_endpoint, + bucket_name=self.bucket_name, + rows=3000, + dim=dim, + data_fields=data_fields, + file_size=file_size, + file_nums=file_nums, + enable_dynamic_field=enable_dynamic_field, + force=True, + ) + self._connect() + c_name = cf.gen_unique_str("bulk_insert") + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field) + self.collection_wrap.init_collection(c_name, schema=schema) + + # import data + t0 = time.time() + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, files=files + ) + logging.info(f"bulk insert task ids:{task_id}") + success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=[task_id], timeout=1800 + ) + tt = time.time() - t0 + log.info(f"bulk insert state:{success} in {tt} with states:{states}") + assert success diff --git a/tests/python_client/bulk_insert/test_bulk_insert_perf_with_cohere_dataset.py b/tests/python_client/bulk_insert/test_bulk_insert_perf_with_cohere_dataset.py index 814312c7cf..0b69c679bf 100644 --- a/tests/python_client/bulk_insert/test_bulk_insert_perf_with_cohere_dataset.py +++ b/tests/python_client/bulk_insert/test_bulk_insert_perf_with_cohere_dataset.py @@ -74,10 +74,12 @@ class TestBUlkInsertPerf(TestBulkInsertBase): fields_name = ["id", "title", "text", "url", "wiki_id", "views", "paragraph_id", "langs", "emb"] files = [] if file_type == "json": - files = ["json-train-00000-of-00252.json"] + files = ["train-00000-of-00252.json"] if file_type == "npy": for field_name in fields_name: files.append(f"{field_name}.npy") + if file_type == "parquet": + files = ["train-00000-of-00252.parquet"] checkers = { Op.bulk_insert: BulkInsertChecker(collection_name=c_name, use_one_collection=False, schema=schema, files=files, insert_data=False) diff --git a/tests/python_client/common/bulk_insert_data.py b/tests/python_client/common/bulk_insert_data.py index 1863c65496..7697a9e1f2 100644 --- a/tests/python_client/common/bulk_insert_data.py +++ b/tests/python_client/common/bulk_insert_data.py @@ -568,22 +568,40 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d return data -def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False): +def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, file_size=None, err_type="", enable_dynamic_field=False): files = [] + if file_size is not None: + rows = 5000 start_uid = 0 for i in range(file_nums): file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json" file = f"{data_source}/{file_name}" data = gen_dict_data_by_data_field(data_fields=data_fields, rows=rows, start=start_uid, float_vector=float_vector, dim=dim, array_length=array_length, enable_dynamic_field=enable_dynamic_field) - log.info(f"data: {data}") + # log.info(f"data: {data}") with open(file, "w") as f: json.dump(data, f) + # get the file size + if file_size is not None: + batch_file_size = os.path.getsize(f"{data_source}/{file_name}") + log.info(f"file_size with rows {rows} for {file_name}: {batch_file_size/1024/1024} MB") + # calculate the rows to be generated + total_batch = int(file_size*1024*1024*1024/batch_file_size) + total_rows = total_batch * rows + log.info(f"total_rows: {total_rows}") + all_data = [] + for _ in range(total_batch): + all_data += data + file_name = f"data-fields-{len(data_fields)}-rows-{total_rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json" + with open(f"{data_source}/{file_name}", "w") as f: + json.dump(all_data, f) + batch_file_size = os.path.getsize(f"{data_source}/{file_name}") + log.info(f"file_size with rows {total_rows} for {file_name}: {batch_file_size/1024/1024/1024} GB") files.append(file_name) start_uid += rows return files -def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type="", force=False, enable_dynamic_field=False): +def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_nums=1, err_type="", force=False, enable_dynamic_field=False): # gen numpy files files = [] start_uid = 0 @@ -606,6 +624,28 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type="" if enable_dynamic_field: file_name = gen_dynamic_field_in_numpy_file(dir=data_source, rows=rows, force=force) files.append(file_name) + if file_size is not None: + batch_file_size = 0 + for file_name in files: + batch_file_size += os.path.getsize(f"{data_source}/{file_name}") + log.info(f"file_size with rows {rows} for {files}: {batch_file_size/1024/1024} MB") + # calculate the rows to be generated + total_batch = int(file_size*1024*1024*1024/batch_file_size) + total_rows = total_batch * rows + new_files = [] + for f in files: + arr = np.load(f"{data_source}/{f}") + all_arr = np.concatenate([arr for _ in range(total_batch)], axis=0) + file_name = f + np.save(f"{data_source}/{file_name}", all_arr) + log.info(f"file_name: {file_name} data type: {all_arr.dtype} data shape: {all_arr.shape}") + new_files.append(file_name) + files = new_files + batch_file_size = 0 + for file_name in files: + batch_file_size += os.path.getsize(f"{data_source}/{file_name}") + log.info(f"file_size with rows {total_rows} for {files}: {batch_file_size/1024/1024/1024} GB") + else: for i in range(file_nums): subfolder = gen_subfolder(root=data_source, dim=dim, rows=rows, file_num=i) @@ -630,11 +670,14 @@ def gen_dynamic_field_data_in_parquet_file(rows, start=0): return data -def gen_parquet_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False): +def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False): # gen numpy files if err_type == "": err_type = "none" files = [] + # generate 5000 entities and check the file size, then calculate the rows to be generated + if file_size is not None: + rows = 5000 start_uid = 0 if file_nums == 1: all_field_data = {} @@ -648,6 +691,20 @@ def gen_parquet_files(float_vector, rows, dim, data_fields, file_nums=1, array_l log.info(f"df: \n{df}") file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{file_nums}-error-{err_type}-{int(time.time())}.parquet" df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow') + + # get the file size + if file_size is not None: + batch_file_size = os.path.getsize(f"{data_source}/{file_name}") + log.info(f"file_size with rows {rows} for {file_name}: {batch_file_size/1024/1024} MB") + # calculate the rows to be generated + total_batch = int(file_size*1024*1024*1024/batch_file_size) + total_rows = total_batch * rows + all_df = pd.concat([df for _ in range(total_batch)], axis=0, ignore_index=True) + file_name = f"data-fields-{len(data_fields)}-rows-{total_rows}-dim-{dim}-file-num-{file_nums}-error-{err_type}-{int(time.time())}.parquet" + log.info(f"all df: \n {all_df}") + all_df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow') + batch_file_size = os.path.getsize(f"{data_source}/{file_name}") + log.info(f"file_size with rows {total_rows} for {file_name}: {batch_file_size/1024/1024} MB") files.append(file_name) else: for i in range(file_nums): @@ -740,18 +797,18 @@ def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket def prepare_bulk_insert_new_json_files(minio_endpoint="", bucket_name="milvus-bucket", - rows=100, dim=128, float_vector=True, + rows=100, dim=128, float_vector=True, file_size=None, data_fields=[], file_nums=1, enable_dynamic_field=False, err_type="", force=False, **kwargs): log.info(f"data_fields: {data_fields}") - files = gen_new_json_files(float_vector=float_vector, rows=rows, dim=dim, data_fields=data_fields, file_nums=file_nums, err_type=err_type, enable_dynamic_field=enable_dynamic_field, **kwargs) + files = gen_new_json_files(float_vector=float_vector, rows=rows, dim=dim, data_fields=data_fields, file_nums=file_nums, file_size=file_size, err_type=err_type, enable_dynamic_field=enable_dynamic_field, **kwargs) copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) return files -def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, enable_dynamic_field=False, +def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, enable_dynamic_field=False, file_size=None, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False): """ Generate column based files based on params in numpy format and copy them to the minio @@ -782,7 +839,7 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke Return: List File name list or file name with sub-folder list """ - files = gen_npy_files(rows=rows, dim=dim, float_vector=float_vector, + files = gen_npy_files(rows=rows, dim=dim, float_vector=float_vector, file_size=file_size, data_fields=data_fields, enable_dynamic_field=enable_dynamic_field, file_nums=file_nums, force=force) @@ -790,7 +847,7 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke return files -def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, array_length=None, +def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, array_length=None, file_size=None, enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False): """ Generate column based files based on params in parquet format and copy them to the minio @@ -822,7 +879,7 @@ def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-buc File name list or file name with sub-folder list """ files = gen_parquet_files(rows=rows, dim=dim, float_vector=float_vector, enable_dynamic_field=enable_dynamic_field, - data_fields=data_fields, array_length=array_length, + data_fields=data_fields, array_length=array_length, file_size=file_size, file_nums=file_nums) copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) return files