test: add bulk insert benchmark for different file size (#29320)

add bulk insert benchmark for different file size

---------

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/29334/head
zhuwenxing 2023-12-20 09:26:49 +08:00 committed by GitHub
parent 1ee016709d
commit 9e846d8db2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 306 additions and 11 deletions

View File

@ -0,0 +1,236 @@
import logging
import time
import pytest
from pymilvus import DataType
import numpy as np
from pathlib import Path
from base.client_base import TestcaseBase
from common import common_func as cf
from common import common_type as ct
from common.milvus_sys import MilvusSys
from common.common_type import CaseLabel, CheckTasks
from utils.util_log import test_log as log
from common.bulk_insert_data import (
prepare_bulk_insert_json_files,
prepare_bulk_insert_new_json_files,
prepare_bulk_insert_numpy_files,
prepare_bulk_insert_parquet_files,
prepare_bulk_insert_csv_files,
DataField as df,
)
default_vec_only_fields = [df.vec_field]
default_multi_fields = [
df.vec_field,
df.int_field,
df.string_field,
df.bool_field,
df.float_field,
df.array_int_field
]
default_vec_n_int_fields = [df.vec_field, df.int_field, df.array_int_field]
# milvus_ns = "chaos-testing"
base_dir = "/tmp/bulk_insert_data"
def entity_suffix(entities):
if entities // 1000000 > 0:
suffix = f"{entities // 1000000}m"
elif entities // 1000 > 0:
suffix = f"{entities // 1000}k"
else:
suffix = f"{entities}"
return suffix
class TestcaseBaseBulkInsert(TestcaseBase):
@pytest.fixture(scope="function", autouse=True)
def init_minio_client(self, minio_host):
Path("/tmp/bulk_insert_data").mkdir(parents=True, exist_ok=True)
self._connect()
self.milvus_sys = MilvusSys(alias='default')
ms = MilvusSys()
minio_port = "9000"
self.minio_endpoint = f"{minio_host}:{minio_port}"
self.bucket_name = ms.index_nodes[0]["infos"]["system_configurations"][
"minio_bucket_name"
]
class TestBulkInsertPerf(TestcaseBaseBulkInsert):
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("file_size", [1, 10, 15]) # file size in GB
@pytest.mark.parametrize("file_nums", [1])
@pytest.mark.parametrize("array_len", [100])
@pytest.mark.parametrize("enable_dynamic_field", [False])
def test_bulk_insert_all_field_with_parquet(self, auto_id, dim, file_size, file_nums, array_len, enable_dynamic_field):
"""
collection schema 1: [pk, int64, float64, string float_vector]
data file: vectors.parquet and uid.parquet,
Steps:
1. create collection
2. import data
3. verify
"""
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_double_field(name=df.double_field),
cf.gen_json_field(name=df.json_field),
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_parquet_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=3000,
dim=dim,
data_fields=data_fields,
file_size=file_size,
file_nums=file_nums,
array_length=array_len,
enable_dynamic_field=enable_dynamic_field,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=1800
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
assert success
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("file_size", [1, 10, 15]) # file size in GB
@pytest.mark.parametrize("file_nums", [1])
@pytest.mark.parametrize("array_len", [100])
@pytest.mark.parametrize("enable_dynamic_field", [False])
def test_bulk_insert_all_field_with_json(self, auto_id, dim, file_size, file_nums, array_len, enable_dynamic_field):
"""
collection schema 1: [pk, int64, float64, string float_vector]
data file: vectors.parquet and uid.parquet,
Steps:
1. create collection
2. import data
3. verify
"""
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_double_field(name=df.double_field),
cf.gen_json_field(name=df.json_field),
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_new_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=3000,
dim=dim,
data_fields=data_fields,
file_size=file_size,
file_nums=file_nums,
array_length=array_len,
enable_dynamic_field=enable_dynamic_field,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=1800
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
assert success
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("file_size", [1, 10, 15]) # file size in GB
@pytest.mark.parametrize("file_nums", [1])
@pytest.mark.parametrize("enable_dynamic_field", [False])
def test_bulk_insert_all_field_with_numpy(self, auto_id, dim, file_size, file_nums, enable_dynamic_field):
"""
collection schema 1: [pk, int64, float64, string float_vector]
data file: vectors.parquet and uid.parquet,
Steps:
1. create collection
2. import data
3. verify
"""
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_double_field(name=df.double_field),
cf.gen_json_field(name=df.json_field),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_numpy_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=3000,
dim=dim,
data_fields=data_fields,
file_size=file_size,
file_nums=file_nums,
enable_dynamic_field=enable_dynamic_field,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=1800
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
assert success

View File

@ -74,10 +74,12 @@ class TestBUlkInsertPerf(TestBulkInsertBase):
fields_name = ["id", "title", "text", "url", "wiki_id", "views", "paragraph_id", "langs", "emb"] fields_name = ["id", "title", "text", "url", "wiki_id", "views", "paragraph_id", "langs", "emb"]
files = [] files = []
if file_type == "json": if file_type == "json":
files = ["json-train-00000-of-00252.json"] files = ["train-00000-of-00252.json"]
if file_type == "npy": if file_type == "npy":
for field_name in fields_name: for field_name in fields_name:
files.append(f"{field_name}.npy") files.append(f"{field_name}.npy")
if file_type == "parquet":
files = ["train-00000-of-00252.parquet"]
checkers = { checkers = {
Op.bulk_insert: BulkInsertChecker(collection_name=c_name, use_one_collection=False, schema=schema, Op.bulk_insert: BulkInsertChecker(collection_name=c_name, use_one_collection=False, schema=schema,
files=files, insert_data=False) files=files, insert_data=False)

View File

@ -568,22 +568,40 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d
return data return data
def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False): def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, file_size=None, err_type="", enable_dynamic_field=False):
files = [] files = []
if file_size is not None:
rows = 5000
start_uid = 0 start_uid = 0
for i in range(file_nums): for i in range(file_nums):
file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json" file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json"
file = f"{data_source}/{file_name}" file = f"{data_source}/{file_name}"
data = gen_dict_data_by_data_field(data_fields=data_fields, rows=rows, start=start_uid, float_vector=float_vector, dim=dim, array_length=array_length, enable_dynamic_field=enable_dynamic_field) data = gen_dict_data_by_data_field(data_fields=data_fields, rows=rows, start=start_uid, float_vector=float_vector, dim=dim, array_length=array_length, enable_dynamic_field=enable_dynamic_field)
log.info(f"data: {data}") # log.info(f"data: {data}")
with open(file, "w") as f: with open(file, "w") as f:
json.dump(data, f) json.dump(data, f)
# get the file size
if file_size is not None:
batch_file_size = os.path.getsize(f"{data_source}/{file_name}")
log.info(f"file_size with rows {rows} for {file_name}: {batch_file_size/1024/1024} MB")
# calculate the rows to be generated
total_batch = int(file_size*1024*1024*1024/batch_file_size)
total_rows = total_batch * rows
log.info(f"total_rows: {total_rows}")
all_data = []
for _ in range(total_batch):
all_data += data
file_name = f"data-fields-{len(data_fields)}-rows-{total_rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json"
with open(f"{data_source}/{file_name}", "w") as f:
json.dump(all_data, f)
batch_file_size = os.path.getsize(f"{data_source}/{file_name}")
log.info(f"file_size with rows {total_rows} for {file_name}: {batch_file_size/1024/1024/1024} GB")
files.append(file_name) files.append(file_name)
start_uid += rows start_uid += rows
return files return files
def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type="", force=False, enable_dynamic_field=False): def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_nums=1, err_type="", force=False, enable_dynamic_field=False):
# gen numpy files # gen numpy files
files = [] files = []
start_uid = 0 start_uid = 0
@ -606,6 +624,28 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type=""
if enable_dynamic_field: if enable_dynamic_field:
file_name = gen_dynamic_field_in_numpy_file(dir=data_source, rows=rows, force=force) file_name = gen_dynamic_field_in_numpy_file(dir=data_source, rows=rows, force=force)
files.append(file_name) files.append(file_name)
if file_size is not None:
batch_file_size = 0
for file_name in files:
batch_file_size += os.path.getsize(f"{data_source}/{file_name}")
log.info(f"file_size with rows {rows} for {files}: {batch_file_size/1024/1024} MB")
# calculate the rows to be generated
total_batch = int(file_size*1024*1024*1024/batch_file_size)
total_rows = total_batch * rows
new_files = []
for f in files:
arr = np.load(f"{data_source}/{f}")
all_arr = np.concatenate([arr for _ in range(total_batch)], axis=0)
file_name = f
np.save(f"{data_source}/{file_name}", all_arr)
log.info(f"file_name: {file_name} data type: {all_arr.dtype} data shape: {all_arr.shape}")
new_files.append(file_name)
files = new_files
batch_file_size = 0
for file_name in files:
batch_file_size += os.path.getsize(f"{data_source}/{file_name}")
log.info(f"file_size with rows {total_rows} for {files}: {batch_file_size/1024/1024/1024} GB")
else: else:
for i in range(file_nums): for i in range(file_nums):
subfolder = gen_subfolder(root=data_source, dim=dim, rows=rows, file_num=i) subfolder = gen_subfolder(root=data_source, dim=dim, rows=rows, file_num=i)
@ -630,11 +670,14 @@ def gen_dynamic_field_data_in_parquet_file(rows, start=0):
return data return data
def gen_parquet_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False): def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False):
# gen numpy files # gen numpy files
if err_type == "": if err_type == "":
err_type = "none" err_type = "none"
files = [] files = []
# generate 5000 entities and check the file size, then calculate the rows to be generated
if file_size is not None:
rows = 5000
start_uid = 0 start_uid = 0
if file_nums == 1: if file_nums == 1:
all_field_data = {} all_field_data = {}
@ -648,6 +691,20 @@ def gen_parquet_files(float_vector, rows, dim, data_fields, file_nums=1, array_l
log.info(f"df: \n{df}") log.info(f"df: \n{df}")
file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{file_nums}-error-{err_type}-{int(time.time())}.parquet" file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{file_nums}-error-{err_type}-{int(time.time())}.parquet"
df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow') df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow')
# get the file size
if file_size is not None:
batch_file_size = os.path.getsize(f"{data_source}/{file_name}")
log.info(f"file_size with rows {rows} for {file_name}: {batch_file_size/1024/1024} MB")
# calculate the rows to be generated
total_batch = int(file_size*1024*1024*1024/batch_file_size)
total_rows = total_batch * rows
all_df = pd.concat([df for _ in range(total_batch)], axis=0, ignore_index=True)
file_name = f"data-fields-{len(data_fields)}-rows-{total_rows}-dim-{dim}-file-num-{file_nums}-error-{err_type}-{int(time.time())}.parquet"
log.info(f"all df: \n {all_df}")
all_df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow')
batch_file_size = os.path.getsize(f"{data_source}/{file_name}")
log.info(f"file_size with rows {total_rows} for {file_name}: {batch_file_size/1024/1024} MB")
files.append(file_name) files.append(file_name)
else: else:
for i in range(file_nums): for i in range(file_nums):
@ -740,18 +797,18 @@ def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket
def prepare_bulk_insert_new_json_files(minio_endpoint="", bucket_name="milvus-bucket", def prepare_bulk_insert_new_json_files(minio_endpoint="", bucket_name="milvus-bucket",
rows=100, dim=128, float_vector=True, rows=100, dim=128, float_vector=True, file_size=None,
data_fields=[], file_nums=1, enable_dynamic_field=False, data_fields=[], file_nums=1, enable_dynamic_field=False,
err_type="", force=False, **kwargs): err_type="", force=False, **kwargs):
log.info(f"data_fields: {data_fields}") log.info(f"data_fields: {data_fields}")
files = gen_new_json_files(float_vector=float_vector, rows=rows, dim=dim, data_fields=data_fields, file_nums=file_nums, err_type=err_type, enable_dynamic_field=enable_dynamic_field, **kwargs) files = gen_new_json_files(float_vector=float_vector, rows=rows, dim=dim, data_fields=data_fields, file_nums=file_nums, file_size=file_size, err_type=err_type, enable_dynamic_field=enable_dynamic_field, **kwargs)
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
return files return files
def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, enable_dynamic_field=False, def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, enable_dynamic_field=False, file_size=None,
data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False): data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False):
""" """
Generate column based files based on params in numpy format and copy them to the minio Generate column based files based on params in numpy format and copy them to the minio
@ -782,7 +839,7 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke
Return: List Return: List
File name list or file name with sub-folder list File name list or file name with sub-folder list
""" """
files = gen_npy_files(rows=rows, dim=dim, float_vector=float_vector, files = gen_npy_files(rows=rows, dim=dim, float_vector=float_vector, file_size=file_size,
data_fields=data_fields, enable_dynamic_field=enable_dynamic_field, data_fields=data_fields, enable_dynamic_field=enable_dynamic_field,
file_nums=file_nums, force=force) file_nums=file_nums, force=force)
@ -790,7 +847,7 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke
return files return files
def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, array_length=None, def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, array_length=None, file_size=None,
enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False): enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False):
""" """
Generate column based files based on params in parquet format and copy them to the minio Generate column based files based on params in parquet format and copy them to the minio
@ -822,7 +879,7 @@ def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-buc
File name list or file name with sub-folder list File name list or file name with sub-folder list
""" """
files = gen_parquet_files(rows=rows, dim=dim, float_vector=float_vector, enable_dynamic_field=enable_dynamic_field, files = gen_parquet_files(rows=rows, dim=dim, float_vector=float_vector, enable_dynamic_field=enable_dynamic_field,
data_fields=data_fields, array_length=array_length, data_fields=data_fields, array_length=array_length, file_size=file_size,
file_nums=file_nums) file_nums=file_nums)
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
return files return files