test: add array data type and parquet file type for bulk insert case (#29030)

add array data type and parquet file type for the bulk insert case

---------

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/29178/head
zhuwenxing 2023-12-13 19:56:38 +08:00 committed by GitHub
parent b5ee563914
commit b348827102
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 656 additions and 18 deletions

View File

@ -1,14 +1,19 @@
import copy import copy
import json
import os import os
import time
import numpy as np import numpy as np
import pandas as pd
import random import random
from faker import Faker
from sklearn import preprocessing from sklearn import preprocessing
from common.common_func import gen_unique_str from common.common_func import gen_unique_str
from common.minio_comm import copy_files_to_minio from common.minio_comm import copy_files_to_minio
from utils.util_log import test_log as log from utils.util_log import test_log as log
data_source = "/tmp/bulk_insert_data" data_source = "/tmp/bulk_insert_data"
fake = Faker()
BINARY = "binary" BINARY = "binary"
FLOAT = "float" FLOAT = "float"
@ -21,6 +26,11 @@ class DataField:
bool_field = "bool_scalar" bool_field = "bool_scalar"
float_field = "float_scalar" float_field = "float_scalar"
double_field = "double_scalar" double_field = "double_scalar"
json_field = "json"
array_bool_field = "array_bool"
array_int_field = "array_int"
array_float_field = "array_float"
array_string_field = "array_string"
class DataErrorType: class DataErrorType:
@ -31,6 +41,8 @@ class DataErrorType:
typo_on_bool = "typo_on_bool" typo_on_bool = "typo_on_bool"
str_on_float_scalar = "str_on_float_scalar" str_on_float_scalar = "str_on_float_scalar"
str_on_vector_field = "str_on_vector_field" str_on_vector_field = "str_on_vector_field"
empty_array_field = "empty_array_field"
mismatch_type_array_field = "mismatch_type_array_field"
def gen_file_prefix(is_row_based=True, auto_id=True, prefix=""): def gen_file_prefix(is_row_based=True, auto_id=True, prefix=""):
@ -75,7 +87,7 @@ def gen_binary_vectors(nb, dim):
def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect, def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect,
rows, dim, start_uid=0, err_type="", **kwargs): rows, dim, start_uid=0, err_type="", enable_dynamic_field=False, **kwargs):
if err_type == DataErrorType.str_on_int_pk: if err_type == DataErrorType.str_on_int_pk:
str_pk = True str_pk = True
@ -99,7 +111,9 @@ def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect,
data_field = data_fields[j] data_field = data_fields[j]
if data_field == DataField.pk_field: if data_field == DataField.pk_field:
if str_pk: if str_pk:
f.write('"uid":"' + str(gen_unique_str()) + '"') line = '"uid":"' + str(gen_unique_str()) + '"'
f.write(line)
# f.write('"uid":"' + str(gen_unique_str()) + '"')
else: else:
if err_type == DataErrorType.float_on_int_pk: if err_type == DataErrorType.float_on_int_pk:
f.write('"uid":' + str(i + start_uid + random.random()) + '') f.write('"uid":' + str(i + start_uid + random.random()) + '')
@ -110,14 +124,24 @@ def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect,
# if not auto_id, use the same value as pk to check the query results later # if not auto_id, use the same value as pk to check the query results later
f.write('"int_scalar":' + str(i + start_uid) + '') f.write('"int_scalar":' + str(i + start_uid) + '')
else: else:
f.write('"int_scalar":' + str(random.randint(-999999, 9999999)) + '') line = '"int_scalar":' + str(random.randint(-999999, 9999999)) + ''
f.write(line)
if data_field == DataField.float_field: if data_field == DataField.float_field:
if err_type == DataErrorType.int_on_float_scalar: if err_type == DataErrorType.int_on_float_scalar:
f.write('"float_scalar":' + str(random.randint(-999999, 9999999)) + '') f.write('"float_scalar":' + str(random.randint(-999999, 9999999)) + '')
elif err_type == DataErrorType.str_on_float_scalar: elif err_type == DataErrorType.str_on_float_scalar:
f.write('"float_scalar":"' + str(gen_unique_str()) + '"') f.write('"float_scalar":"' + str(gen_unique_str()) + '"')
else: else:
f.write('"float_scalar":' + str(random.random()) + '') line = '"float_scalar":' + str(random.random()) + ''
f.write(line)
if data_field == DataField.double_field:
if err_type == DataErrorType.int_on_float_scalar:
f.write('"double_scalar":' + str(random.randint(-999999, 9999999)) + '')
elif err_type == DataErrorType.str_on_float_scalar:
f.write('"double_scalar":"' + str(gen_unique_str()) + '"')
else:
line = '"double_scalar":' + str(random.random()) + ''
f.write(line)
if data_field == DataField.string_field: if data_field == DataField.string_field:
f.write('"string_scalar":"' + str(gen_unique_str()) + '"') f.write('"string_scalar":"' + str(gen_unique_str()) + '"')
if data_field == DataField.bool_field: if data_field == DataField.bool_field:
@ -125,6 +149,41 @@ def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect,
f.write('"bool_scalar":' + str(random.choice(["True", "False", "TRUE", "FALSE", "0", "1"])) + '') f.write('"bool_scalar":' + str(random.choice(["True", "False", "TRUE", "FALSE", "0", "1"])) + '')
else: else:
f.write('"bool_scalar":' + str(random.choice(["true", "false"])) + '') f.write('"bool_scalar":' + str(random.choice(["true", "false"])) + '')
if data_field == DataField.json_field:
data = {
gen_unique_str(): random.randint(-999999, 9999999),
}
f.write('"json":' + json.dumps(data) + '')
if data_field == DataField.array_bool_field:
if err_type == DataErrorType.empty_array_field:
f.write('"array_bool":[]')
elif err_type == DataErrorType.mismatch_type_array_field:
f.write('"array_bool": "mistype"')
else:
f.write('"array_bool":[' + str(random.choice(["true", "false"])) + ',' + str(random.choice(["true", "false"])) + ']')
if data_field == DataField.array_int_field:
if err_type == DataErrorType.empty_array_field:
f.write('"array_int":[]')
elif err_type == DataErrorType.mismatch_type_array_field:
f.write('"array_int": "mistype"')
else:
f.write('"array_int":[' + str(random.randint(-999999, 9999999)) + ',' + str(random.randint(-999999, 9999999)) + ']')
if data_field == DataField.array_float_field:
if err_type == DataErrorType.empty_array_field:
f.write('"array_float":[]')
elif err_type == DataErrorType.mismatch_type_array_field:
f.write('"array_float": "mistype"')
else:
f.write('"array_float":[' + str(random.random()) + ',' + str(random.random()) + ']')
if data_field == DataField.array_string_field:
if err_type == DataErrorType.empty_array_field:
f.write('"array_string":[]')
elif err_type == DataErrorType.mismatch_type_array_field:
f.write('"array_string": "mistype"')
else:
f.write('"array_string":["' + str(gen_unique_str()) + '","' + str(gen_unique_str()) + '"]')
if data_field == DataField.vec_field: if data_field == DataField.vec_field:
# vector field # vector field
if err_type == DataErrorType.one_entity_wrong_dim and i == wrong_row: if err_type == DataErrorType.one_entity_wrong_dim and i == wrong_row:
@ -133,10 +192,16 @@ def gen_row_based_json_file(row_file, str_pk, data_fields, float_vect,
vectors = gen_str_invalid_vectors(1, dim) if float_vect else gen_str_invalid_vectors(1, dim//8) vectors = gen_str_invalid_vectors(1, dim) if float_vect else gen_str_invalid_vectors(1, dim//8)
else: else:
vectors = gen_float_vectors(1, dim) if float_vect else gen_binary_vectors(1, (dim//8)) vectors = gen_float_vectors(1, dim) if float_vect else gen_binary_vectors(1, (dim//8))
f.write('"vectors":' + ",".join(str(x).replace("'", '"') for x in vectors) + '') line = '"vectors":' + ",".join(str(x).replace("'", '"') for x in vectors) + ''
f.write(line)
# not write common for the last field # not write common for the last field
if j != len(data_fields) - 1: if j != len(data_fields) - 1:
f.write(',') f.write(',')
if enable_dynamic_field:
d = {str(i+start_uid): i+start_uid, "name": fake.name(), "address": fake.address()}
d_str = json.dumps(d)
d_str = d_str[1:-1] # remove {}
f.write("," + d_str)
f.write('}') f.write('}')
f.write("\n") f.write("\n")
f.write("]") f.write("]")
@ -276,6 +341,20 @@ def gen_string_in_numpy_file(dir, data_field, rows, start=0, force=False):
return file_name return file_name
def gen_dynamic_field_in_numpy_file(dir, rows, start=0, force=False):
file_name = f"$meta.npy"
file = f"{dir}/{file_name}"
if not os.path.exists(file) or force:
# non vector columns
data = []
if rows > 0:
data = [json.dumps({str(i): i, "name": fake.name(), "address": fake.address()}) for i in range(start, rows+start)]
arr = np.array(data)
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
np.save(file, arr)
return file_name
def gen_bool_in_numpy_file(dir, data_field, rows, start=0, force=False): def gen_bool_in_numpy_file(dir, data_field, rows, start=0, force=False):
file_name = f"{data_field}.npy" file_name = f"{data_field}.npy"
file = f"{dir}/{file_name}" file = f"{dir}/{file_name}"
@ -291,6 +370,19 @@ def gen_bool_in_numpy_file(dir, data_field, rows, start=0, force=False):
return file_name return file_name
def gen_json_in_numpy_file(dir, data_field, rows, start=0, force=False):
file_name = f"{data_field}.npy"
file = f"{dir}/{file_name}"
if not os.path.exists(file) or force:
data = []
if rows > 0:
data = [json.dumps({"name": fake.name(), "address": fake.address()}) for i in range(start, rows+start)]
arr = np.array(data)
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
np.save(file, arr)
return file_name
def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False): def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False):
file_name = f"{data_field}.npy" file_name = f"{data_field}.npy"
file = f"{dir}/{file_name}" file = f"{dir}/{file_name}"
@ -307,13 +399,65 @@ def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False):
data = [i for i in range(start, start + rows)] data = [i for i in range(start, start + rows)]
elif data_field == DataField.int_field: elif data_field == DataField.int_field:
data = [random.randint(-999999, 9999999) for _ in range(rows)] data = [random.randint(-999999, 9999999) for _ in range(rows)]
# print(f"file_name: {file_name} data type: {arr.dtype}")
arr = np.array(data) arr = np.array(data)
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}") log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
np.save(file, arr) np.save(file, arr)
return file_name return file_name
def gen_vectors(float_vector, rows, dim):
vectors = []
if rows > 0:
if float_vector:
vectors = gen_float_vectors(rows, dim)
else:
vectors = gen_binary_vectors(rows, (dim // 8))
return vectors
def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128, array_length=None):
if array_length is None:
array_length = random.randint(0, 10)
data = []
if rows > 0:
if data_field == DataField.vec_field:
data = gen_vectors(float_vector=float_vector, rows=rows, dim=dim)
elif data_field == DataField.float_field:
data = [np.float32(random.random()) for _ in range(rows)]
elif data_field == DataField.double_field:
data = [np.float64(random.random()) for _ in range(rows)]
elif data_field == DataField.pk_field:
data = [np.int64(i) for i in range(start, start + rows)]
elif data_field == DataField.int_field:
data = [np.int64(random.randint(-999999, 9999999)) for _ in range(rows)]
elif data_field == DataField.string_field:
data = [gen_unique_str(str(i)) for i in range(start, rows + start)]
elif data_field == DataField.bool_field:
data = [random.choice([True, False]) for i in range(start, rows + start)]
elif data_field == DataField.json_field:
data = pd.Series([json.dumps({
gen_unique_str(): random.randint(-999999, 9999999)
}) for i in range(start, rows + start)], dtype=np.dtype("str"))
elif data_field == DataField.array_bool_field:
data = pd.Series(
[np.array([random.choice([True, False]) for _ in range(array_length)], dtype=np.dtype("bool"))
for i in range(start, rows + start)])
elif data_field == DataField.array_int_field:
data = pd.Series(
[np.array([random.randint(-999999, 9999999) for _ in range(array_length)], dtype=np.dtype("int64"))
for i in range(start, rows + start)])
elif data_field == DataField.array_float_field:
data = pd.Series(
[np.array([random.random() for _ in range(array_length)], dtype=np.dtype("float32"))
for i in range(start, rows + start)])
elif data_field == DataField.array_string_field:
data = pd.Series(
[np.array([gen_unique_str(str(i)) for _ in range(array_length)], dtype=np.dtype("str"))
for i in range(start, rows + start)])
return data
def gen_file_name(is_row_based, rows, dim, auto_id, str_pk, def gen_file_name(is_row_based, rows, dim, auto_id, str_pk,
float_vector, data_fields, file_num, file_type, err_type): float_vector, data_fields, file_num, file_type, err_type):
row_suffix = entity_suffix(rows) row_suffix = entity_suffix(rows)
@ -334,7 +478,7 @@ def gen_file_name(is_row_based, rows, dim, auto_id, str_pk,
pk = "str_pk_" pk = "str_pk_"
prefix = gen_file_prefix(is_row_based=is_row_based, auto_id=auto_id, prefix=err_type) prefix = gen_file_prefix(is_row_based=is_row_based, auto_id=auto_id, prefix=err_type)
file_name = f"{prefix}_{pk}{vt}{field_suffix}{dim}d_{row_suffix}_{file_num}{file_type}" file_name = f"{prefix}_{pk}{vt}{field_suffix}{dim}d_{row_suffix}_{file_num}_{int(time.time())}{file_type}"
return file_name return file_name
@ -381,7 +525,65 @@ def gen_json_files(is_row_based, rows, dim, auto_id, str_pk,
return files return files
def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type="", force=False): def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, dim=128, array_length=None, enable_dynamic_field=False):
data = []
for r in range(rows):
d = {}
for data_field in data_fields:
if data_field == DataField.vec_field:
# vector columns
d[data_field] = gen_vectors(float_vector=float_vector, rows=1, dim=dim)[0]
elif data_field == DataField.float_field:
d[data_field] = random.random()
elif data_field == DataField.double_field:
d[data_field] = random.random()
elif data_field == DataField.pk_field:
d[data_field] = r+start
elif data_field == DataField.int_field:
d[data_field] =random.randint(-999999, 9999999)
elif data_field == DataField.string_field:
d[data_field] = gen_unique_str(str(r + start))
elif data_field == DataField.bool_field:
d[data_field] = random.choice([True, False])
elif data_field == DataField.json_field:
d[data_field] = {str(r+start): r+start}
elif data_field == DataField.array_bool_field:
array_length = random.randint(0, 10) if array_length is None else array_length
d[data_field] = [random.choice([True, False]) for _ in range(array_length)]
elif data_field == DataField.array_int_field:
array_length = random.randint(0, 10) if array_length is None else array_length
d[data_field] = [random.randint(-999999, 9999999) for _ in range(array_length)]
elif data_field == DataField.array_float_field:
array_length = random.randint(0, 10) if array_length is None else array_length
d[data_field] = [random.random() for _ in range(array_length)]
elif data_field == DataField.array_string_field:
array_length = random.randint(0, 10) if array_length is None else array_length
d[data_field] = [gen_unique_str(str(i)) for i in range(array_length)]
if enable_dynamic_field:
d[str(r+start)] = r+start
d["name"] = fake.name()
d["address"] = fake.address()
data.append(d)
return data
def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False):
files = []
start_uid = 0
for i in range(file_nums):
file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json"
file = f"{data_source}/{file_name}"
data = gen_dict_data_by_data_field(data_fields=data_fields, rows=rows, start=start_uid, float_vector=float_vector, dim=dim, array_length=array_length, enable_dynamic_field=enable_dynamic_field)
log.info(f"data: {data}")
with open(file, "w") as f:
json.dump(data, f)
files.append(file_name)
start_uid += rows
return files
def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type="", force=False, enable_dynamic_field=False):
# gen numpy files # gen numpy files
files = [] files = []
start_uid = 0 start_uid = 0
@ -395,10 +597,15 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type=""
file_name = gen_string_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) file_name = gen_string_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force)
elif data_field == DataField.bool_field: elif data_field == DataField.bool_field:
file_name = gen_bool_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) file_name = gen_bool_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force)
elif data_field == DataField.json_field:
file_name = gen_json_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force)
else: else:
file_name = gen_int_or_float_in_numpy_file(dir=data_source, data_field=data_field, file_name = gen_int_or_float_in_numpy_file(dir=data_source, data_field=data_field,
rows=rows, force=force) rows=rows, force=force)
files.append(file_name) files.append(file_name)
if enable_dynamic_field:
file_name = gen_dynamic_field_in_numpy_file(dir=data_source, rows=rows, force=force)
files.append(file_name)
else: else:
for i in range(file_nums): for i in range(file_nums):
subfolder = gen_subfolder(root=data_source, dim=dim, rows=rows, file_num=i) subfolder = gen_subfolder(root=data_source, dim=dim, rows=rows, file_num=i)
@ -409,6 +616,52 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type=""
else: else:
file_name = gen_int_or_float_in_numpy_file(dir=dir, data_field=data_field, rows=rows, start=start_uid, force=force) file_name = gen_int_or_float_in_numpy_file(dir=dir, data_field=data_field, rows=rows, start=start_uid, force=force)
files.append(f"{subfolder}/{file_name}") files.append(f"{subfolder}/{file_name}")
if enable_dynamic_field:
file_name = gen_dynamic_field_in_numpy_file(dir=dir, rows=rows, start=start_uid, force=force)
files.append(f"{subfolder}/{file_name}")
start_uid += rows
return files
def gen_dynamic_field_data_in_parquet_file(rows, start=0):
data = []
if rows > 0:
data = pd.Series([json.dumps({str(i): i, "name": fake.name(), "address": fake.address()}) for i in range(start, rows+start)], dtype=np.dtype("str"))
return data
def gen_parquet_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False):
# gen numpy files
if err_type == "":
err_type = "none"
files = []
start_uid = 0
if file_nums == 1:
all_field_data = {}
for data_field in data_fields:
data = gen_data_by_data_field(data_field=data_field, rows=rows, start=0,
float_vector=float_vector, dim=dim, array_length=array_length)
all_field_data[data_field] = data
if enable_dynamic_field:
all_field_data["$meta"] = gen_dynamic_field_data_in_parquet_file(rows=rows, start=0)
df = pd.DataFrame(all_field_data)
log.info(f"df: \n{df}")
file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{file_nums}-error-{err_type}-{int(time.time())}.parquet"
df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow')
files.append(file_name)
else:
for i in range(file_nums):
all_field_data = {}
for data_field in data_fields:
data = gen_data_by_data_field(data_field=data_field, rows=rows, start=0,
float_vector=float_vector, dim=dim, array_length=array_length)
all_field_data[data_field] = data
if enable_dynamic_field:
all_field_data["$meta"] = gen_dynamic_field_data_in_parquet_file(rows=rows, start=0)
df = pd.DataFrame(all_field_data)
file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-error-{err_type}-{int(time.time())}.parquet"
df.to_parquet(f"{data_source}/{file_name}", engine='pyarrow')
files.append(file_name)
start_uid += rows start_uid += rows
return files return files
@ -476,6 +729,7 @@ def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket
data_fields_c = copy.deepcopy(data_fields) data_fields_c = copy.deepcopy(data_fields)
log.info(f"data_fields: {data_fields}") log.info(f"data_fields: {data_fields}")
log.info(f"data_fields_c: {data_fields_c}") log.info(f"data_fields_c: {data_fields_c}")
files = gen_json_files(is_row_based=is_row_based, rows=rows, dim=dim, files = gen_json_files(is_row_based=is_row_based, rows=rows, dim=dim,
auto_id=auto_id, str_pk=str_pk, float_vector=float_vector, auto_id=auto_id, str_pk=str_pk, float_vector=float_vector,
data_fields=data_fields_c, file_nums=file_nums, multi_folder=multi_folder, data_fields=data_fields_c, file_nums=file_nums, multi_folder=multi_folder,
@ -485,7 +739,19 @@ def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket
return files return files
def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, def prepare_bulk_insert_new_json_files(minio_endpoint="", bucket_name="milvus-bucket",
rows=100, dim=128, float_vector=True,
data_fields=[], file_nums=1, enable_dynamic_field=False,
err_type="", force=False, **kwargs):
log.info(f"data_fields: {data_fields}")
files = gen_new_json_files(float_vector=float_vector, rows=rows, dim=dim, data_fields=data_fields, file_nums=file_nums, err_type=err_type, enable_dynamic_field=enable_dynamic_field, **kwargs)
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
return files
def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, enable_dynamic_field=False,
data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False): data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False):
""" """
Generate column based files based on params in numpy format and copy them to the minio Generate column based files based on params in numpy format and copy them to the minio
@ -517,12 +783,51 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke
File name list or file name with sub-folder list File name list or file name with sub-folder list
""" """
files = gen_npy_files(rows=rows, dim=dim, float_vector=float_vector, files = gen_npy_files(rows=rows, dim=dim, float_vector=float_vector,
data_fields=data_fields, data_fields=data_fields, enable_dynamic_field=enable_dynamic_field,
file_nums=file_nums, force=force) file_nums=file_nums, force=force)
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
return files return files
def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, array_length=None,
enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False):
"""
Generate column based files based on params in parquet format and copy them to the minio
Note: each field in data_fields would be generated one parquet file.
:param rows: the number entities to be generated in the file(s)
:type rows: int
:param dim: dim of vector data
:type dim: int
:param: float_vector: generate float vectors or binary vectors
:type float_vector: boolean
:param: data_fields: data fields to be generated in the file(s):
it supports one or all of [int_pk, vectors, int, float]
Note: it does not automatically add pk field
:type data_fields: list
:param file_nums: file numbers to be generated
The file(s) would be generated in data_source folder if file_nums = 1
The file(s) would be generated in different sub-folders if file_nums > 1
:type file_nums: int
:param force: re-generate the file(s) regardless existing or not
:type force: boolean
Return: List
File name list or file name with sub-folder list
"""
files = gen_parquet_files(rows=rows, dim=dim, float_vector=float_vector, enable_dynamic_field=enable_dynamic_field,
data_fields=data_fields, array_length=array_length,
file_nums=file_nums)
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
return files
def gen_csv_file(file, float_vector, data_fields, rows, dim, start_uid): def gen_csv_file(file, float_vector, data_fields, rows, dim, start_uid):
with open(file, "w") as f: with open(file, "w") as f:
# field name # field name

View File

@ -98,6 +98,9 @@ def gen_json_field(name=ct.default_json_field_name, description=ct.default_desc,
def gen_array_field(name=ct.default_array_field_name, element_type=DataType.INT64, max_capacity=ct.default_max_capacity, def gen_array_field(name=ct.default_array_field_name, element_type=DataType.INT64, max_capacity=ct.default_max_capacity,
description=ct.default_desc, is_primary=False, **kwargs): description=ct.default_desc, is_primary=False, **kwargs):
if element_type == DataType.VARCHAR:
kwargs['max_length'] = ct.default_length
array_field, _ = ApiFieldSchemaWrapper().init_field_schema(name=name, dtype=DataType.ARRAY, array_field, _ = ApiFieldSchemaWrapper().init_field_schema(name=name, dtype=DataType.ARRAY,
element_type=element_type, max_capacity=max_capacity, element_type=element_type, max_capacity=max_capacity,
description=description, is_primary=is_primary, **kwargs) description=description, is_primary=is_primary, **kwargs)
@ -477,7 +480,7 @@ def gen_dataframe_all_data_type(nb=ct.default_nb, dim=ct.default_dim, start=0, w
if not random_primary_key: if not random_primary_key:
int64_values = pd.Series(data=[i for i in range(start, start + nb)]) int64_values = pd.Series(data=[i for i in range(start, start + nb)])
else: else:
int64_values = pd.Series(data=random.sample(range(start, start + nb), nb)) int64_values = pd.Series(data=random.sample(range(start, start + nb), nb))
int32_values = pd.Series(data=[np.int32(i) for i in range(start, start + nb)], dtype="int32") int32_values = pd.Series(data=[np.int32(i) for i in range(start, start + nb)], dtype="int32")
int16_values = pd.Series(data=[np.int16(i) for i in range(start, start + nb)], dtype="int16") int16_values = pd.Series(data=[np.int16(i) for i in range(start, start + nb)], dtype="int16")
int8_values = pd.Series(data=[np.int8(i) for i in range(start, start + nb)], dtype="int8") int8_values = pd.Series(data=[np.int8(i) for i in range(start, start + nb)], dtype="int8")
@ -985,7 +988,7 @@ def gen_search_param(index_type, metric_type="L2"):
log.error("Invalid index_type.") log.error("Invalid index_type.")
raise Exception("Invalid index_type.") raise Exception("Invalid index_type.")
log.debug(search_params) log.debug(search_params)
return search_params return search_params

View File

@ -1,6 +1,7 @@
import logging import logging
import time import time
import pytest import pytest
from pymilvus import DataType
import numpy as np import numpy as np
from pathlib import Path from pathlib import Path
from base.client_base import TestcaseBase from base.client_base import TestcaseBase
@ -11,7 +12,9 @@ from common.common_type import CaseLabel, CheckTasks
from utils.util_log import test_log as log from utils.util_log import test_log as log
from common.bulk_insert_data import ( from common.bulk_insert_data import (
prepare_bulk_insert_json_files, prepare_bulk_insert_json_files,
prepare_bulk_insert_new_json_files,
prepare_bulk_insert_numpy_files, prepare_bulk_insert_numpy_files,
prepare_bulk_insert_parquet_files,
prepare_bulk_insert_csv_files, prepare_bulk_insert_csv_files,
DataField as df, DataField as df,
) )
@ -24,8 +27,9 @@ default_multi_fields = [
df.string_field, df.string_field,
df.bool_field, df.bool_field,
df.float_field, df.float_field,
df.array_int_field
] ]
default_vec_n_int_fields = [df.vec_field, df.int_field] default_vec_n_int_fields = [df.vec_field, df.int_field, df.array_int_field]
# milvus_ns = "chaos-testing" # milvus_ns = "chaos-testing"
@ -267,6 +271,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_int64_field(name=df.pk_field, is_primary=True), cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim), cf.gen_float_vec_field(name=df.vec_field, dim=dim),
cf.gen_int32_field(name=df.int_field), cf.gen_int32_field(name=df.int_field),
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT32),
] ]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema) self.collection_wrap.init_collection(c_name, schema=schema)
@ -604,7 +609,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
@pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128]) # 128 @pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000 @pytest.mark.parametrize("entities", [1000]) # 1000
def test_with_all_field_numpy(self, auto_id, dim, entities): @pytest.mark.parametrize("enable_dynamic_field", [True, False])
def test_with_all_field_json(self, auto_id, dim, entities, enable_dynamic_field):
""" """
collection schema 1: [pk, int64, float64, string float_vector] collection schema 1: [pk, int64, float64, string float_vector]
data file: vectors.npy and uid.npy, data file: vectors.npy and uid.npy,
@ -613,26 +619,31 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
2. import data 2. import data
3. verify 3. verify
""" """
data_fields = [df.pk_field, df.int_field, df.float_field, df.double_field, df.vec_field]
fields = [ fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id), cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field), cf.gen_int64_field(name=df.int_field),
cf.gen_float_field(name=df.float_field), cf.gen_float_field(name=df.float_field),
cf.gen_double_field(name=df.double_field), cf.gen_double_field(name=df.double_field),
cf.gen_json_field(name=df.json_field),
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.vec_field, dim=dim), cf.gen_float_vec_field(name=df.vec_field, dim=dim),
] ]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)] data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_numpy_files( files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint, minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name, bucket_name=self.bucket_name,
rows=entities, rows=entities,
dim=dim, dim=dim,
data_fields=data_fields, data_fields=data_fields,
enable_dynamic_field=enable_dynamic_field,
force=True, force=True,
) )
self._connect() self._connect()
c_name = cf.gen_unique_str("bulk_insert") c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema) self.collection_wrap.init_collection(c_name, schema=schema)
# import data # import data
@ -666,9 +677,327 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
df.vec_field, df.vec_field,
param=search_params, param=search_params,
limit=1, limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results, check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1}, check_items={"nq": 1, "limit": 1},
) )
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("dim", [2]) # 128
@pytest.mark.parametrize("entities", [2]) # 1000
@pytest.mark.parametrize("enable_dynamic_field", [True])
def test_bulk_insert_all_field_with_new_json_format(self, auto_id, dim, entities, enable_dynamic_field):
"""
collection schema 1: [pk, int64, float64, string float_vector]
data file: vectors.npy and uid.npy,
Steps:
1. create collection
2. import data
3. verify
"""
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_double_field(name=df.double_field),
cf.gen_json_field(name=df.json_field),
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_new_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
data_fields=data_fields,
enable_dynamic_field=enable_dynamic_field,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
assert success
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.parametrize("enable_dynamic_field", [True, False])
def test_bulk_insert_all_field_with_numpy(self, auto_id, dim, entities, enable_dynamic_field):
"""
collection schema 1: [pk, int64, float64, string float_vector]
data file: vectors.npy and uid.npy,
note: numpy file is not supported for array field
Steps:
1. create collection
2. import data
3. verify
"""
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_double_field(name=df.double_field),
cf.gen_json_field(name=df.json_field),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_numpy_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
data_fields=data_fields,
force=True,
enable_dynamic_field=enable_dynamic_field,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
assert success
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.parametrize("file_nums", [1])
@pytest.mark.parametrize("array_len", [None, 0, 100])
@pytest.mark.parametrize("enable_dynamic_field", [True, False])
def test_bulk_insert_all_field_with_parquet(self, auto_id, dim, entities, file_nums, array_len, enable_dynamic_field):
"""
collection schema 1: [pk, int64, float64, string float_vector]
data file: vectors.parquet and uid.parquet,
Steps:
1. create collection
2. import data
3. verify
"""
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_double_field(name=df.double_field),
cf.gen_json_field(name=df.json_field),
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_parquet_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
data_fields=data_fields,
file_nums=file_nums,
array_length=array_len,
enable_dynamic_field=enable_dynamic_field,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
assert success
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.parametrize("file_nums", [0, 10])
@pytest.mark.parametrize("array_len", [1])
def test_with_wrong_parquet_file_num(self, auto_id, dim, entities, file_nums, array_len):
"""
collection schema 1: [pk, int64, float64, string float_vector]
data file: vectors.parquet and uid.parquet,
Steps:
1. create collection
2. import data
3. verify failure, because only one file is allowed
"""
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_double_field(name=df.double_field),
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_parquet_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
data_fields=data_fields,
file_nums=file_nums,
array_length=array_len,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
error = {}
if file_nums == 0:
error = {ct.err_code: 1100, ct.err_msg: "import request is empty: invalid parameter"}
if file_nums > 1:
error = {ct.err_code: 65535, ct.err_msg: "for JSON or parquet file, each task only accepts one file"}
self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files,
check_task=CheckTasks.err_res, check_items=error
)
@pytest.mark.tags(CaseLabel.L3) @pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("auto_id", [True, False])
@ -781,6 +1110,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
cf.gen_string_field(name=df.string_field, is_partition_key=(par_key_field == df.string_field)), cf.gen_string_field(name=df.string_field, is_partition_key=(par_key_field == df.string_field)),
cf.gen_bool_field(name=df.bool_field), cf.gen_bool_field(name=df.bool_field),
cf.gen_float_field(name=df.float_field), cf.gen_float_field(name=df.float_field),
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64)
] ]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema, num_partitions=10) self.collection_wrap.init_collection(c_name, schema=schema, num_partitions=10)