Add csv bulk insert test (#28189)

Signed-off-by: kuma <675613722@qq.com>
Co-authored-by: kuma <675613722@qq.com>
pull/28256/head
KumaJie 2023-11-07 20:42:22 +08:00 committed by GitHub
parent 356af86cba
commit 939ee0c257
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 446 additions and 3 deletions

View File

@ -474,8 +474,8 @@ def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket
file names list
"""
data_fields_c = copy.deepcopy(data_fields)
print(f"data_fields: {data_fields}")
print(f"data_fields_c: {data_fields_c}")
log.info(f"data_fields: {data_fields}")
log.info(f"data_fields_c: {data_fields_c}")
files = gen_json_files(is_row_based=is_row_based, rows=rows, dim=dim,
auto_id=auto_id, str_pk=str_pk, float_vector=float_vector,
data_fields=data_fields_c, file_nums=file_nums, multi_folder=multi_folder,
@ -521,4 +521,90 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke
file_nums=file_nums, force=force)
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
return files
return files
def gen_csv_file(file, float_vector, data_fields, rows, dim, start_uid):
with open(file, "w") as f:
# field name
for i in range(len(data_fields)):
f.write(data_fields[i])
if i != len(data_fields) - 1:
f.write(",")
f.write("\n")
for i in range(rows):
# field value
for j in range(len(data_fields)):
data_field = data_fields[j]
if data_field == DataField.pk_field:
f.write(str(i + start_uid))
if data_field == DataField.int_field:
f.write(str(random.randint(-999999, 9999999)))
if data_field == DataField.float_field:
f.write(str(random.random()))
if data_field == DataField.string_field:
f.write(str(gen_unique_str()))
if data_field == DataField.bool_field:
f.write(str(random.choice(["true", "false"])))
if data_field == DataField.vec_field:
vectors = gen_float_vectors(1, dim) if float_vector else gen_binary_vectors(1, dim//8)
f.write('"' + ','.join(str(x) for x in vectors) + '"')
if j != len(data_fields) - 1:
f.write(",")
f.write("\n")
def gen_csv_files(rows, dim, auto_id, float_vector, data_fields, file_nums, force):
files = []
start_uid = 0
if (not auto_id) and (DataField.pk_field not in data_fields):
data_fields.append(DataField.pk_field)
for i in range(file_nums):
file_name = gen_file_name(is_row_based=True, rows=rows, dim=dim, auto_id=auto_id, float_vector=float_vector, data_fields=data_fields, file_num=i, file_type=".csv", str_pk=False, err_type="")
file = f"{data_source}/{file_name}"
if not os.path.exists(file) or force:
gen_csv_file(file, float_vector, data_fields, rows, dim, start_uid)
start_uid += rows
files.append(file_name)
return files
def prepare_bulk_insert_csv_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, auto_id=True, float_vector=True, data_fields=[], file_nums=1, force=False):
"""
Generate row based files based on params in csv format and copy them to minio
:param minio_endpoint: the minio_endpoint of minio
:type minio_endpoint: str
:param bucket_name: the bucket name of Milvus
:type bucket_name: str
:param rows: the number entities to be generated in the file
:type rows: int
:param dim: dim of vector data
:type dim: int
:param auto_id: generate primary key data or not
:type auto_id: bool
:param float_vector: generate float vectors or binary vectors
:type float_vector: boolean
:param: data_fields: data fields to be generated in the file(s):
It supports one or all of [pk, vectors, int, float, string, boolean]
Note: it automatically adds pk field if auto_id=False
:type data_fields: list
:param file_nums: file numbers to be generated
:type file_nums: int
:param force: re-generate the file(s) regardless existing or not
:type force: boolean
"""
data_fields_c = copy.deepcopy(data_fields)
log.info(f"data_fields: {data_fields}")
log.info(f"data_fields_c: {data_fields_c}")
files = gen_csv_files(rows=rows, dim=dim, auto_id=auto_id, float_vector=float_vector, data_fields=data_fields_c, file_nums=file_nums, force=force)
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
return files

View File

@ -12,6 +12,7 @@ from utils.util_log import test_log as log
from common.bulk_insert_data import (
prepare_bulk_insert_json_files,
prepare_bulk_insert_numpy_files,
prepare_bulk_insert_csv_files,
DataField as df,
)
@ -917,6 +918,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# verify search and query
log.info(f"wait for load finished and be ready for search")
self.collection_wrap.load(_refresh=True)
time.sleep(2)
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
@ -937,3 +939,358 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
num_entities += p.num_entities
assert num_entities == entities * file_nums
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("partition_key_field", [df.int_field, df.string_field])
def test_partition_key_on_csv_file(self, auto_id, partition_key_field):
"""
collection: auto_id, customized_id
collection scheam: [pk, float_vector, int64, varchar, bool, float]
Step:
1. create collection with partition key enabled
2. import data
3. verify the data entities equal the import data and distributed by values of partition key field
4. load the collection
5. verify search successfully
6. verify query successfully
"""
dim = 12
entities = 200
files = prepare_bulk_insert_csv_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
auto_id=auto_id,
data_fields=default_multi_fields,
force=True
)
self._connect()
c_name = cf.gen_unique_str("bulk_parkey")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
cf.gen_int64_field(name=df.int_field, is_partition_key=(partition_key_field == df.int_field)),
cf.gen_string_field(name=df.string_field, is_partition_key=(partition_key_field == df.string_field)),
cf.gen_bool_field(name=df.bool_field),
cf.gen_float_field(name=df.float_field),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
assert len(self.collection_wrap.partitions) == ct.default_partition_num
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
files=files,
)
logging.info(f"bulk insert task id:{task_id}")
success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
assert success
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(10)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
nq = 2
topk = 2
search_data = cf.gen_vectors(nq, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq, "limit": topk},
)
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
# verify data was bulk inserted into different partitions
num_entities = 0
empty_partition_num = 0
for p in self.collection_wrap.partitions:
if p.num_entities == 0:
empty_partition_num += 1
num_entities += p.num_entities
assert num_entities == entities
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128])
@pytest.mark.parametrize("entities", [100])
def test_float_vector_csv(self, auto_id, dim, entities):
"""
collection: auto_id, customized_id
collection schema: [pk, float_vector]
Steps:
1. create collection
2. import data
3. verify the data entities equal the import data
4. load the collection
5. verify search successfully
6. verify query successfully
"""
files = prepare_bulk_insert_csv_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
auto_id=auto_id,
data_fields=default_vec_only_fields,
force=True
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
files=files
)
logging.info(f"bulk insert task id:{task_id}")
success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
num_entities = self.collection_wrap.num_entities
log.info(f"collection entities:{num_entities}")
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
time.sleep(2)
self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300)
res, _ = self.utility_wrap.index_building_progress(c_name)
log.info(f"index building progress: {res}")
self.collection_wrap.load()
self.collection_wrap.load(_refresh=True)
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
nq = 2
topk = 2
search_data = cf.gen_vectors(nq, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq, "limit": topk},
)
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128])
@pytest.mark.parametrize("entities", [2000])
def test_binary_vector_csv(self, auto_id, dim, entities):
"""
collection: auto_id, customized_id
collection schema: [pk, int64, binary_vector]
Step:
1. create collection
2. create index and load collection
3. import data
4. verify data entities
5. load collection
6. verify search successfully
7. verify query successfully
"""
files = prepare_bulk_insert_csv_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
auto_id=auto_id,
float_vector=False,
data_fields=default_vec_only_fields,
force=True
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_binary_vec_field(name=df.vec_field, dim=dim)
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# build index before bulk insert
binary_index_params = {
"index_type": "BIN_IVF_FLAT",
"metric_type": "JACCARD",
"params": {"nlist": 64},
}
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=binary_index_params
)
# load collection
self.collection_wrap.load()
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
files=files
)
logging.info(f"bulk insert task ids:{task_id}")
success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
assert success
time.sleep(2)
self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300)
res, _ = self.utility_wrap.index_building_progress(c_name)
log.info(f"index building progress: {res}")
# verify num entities
assert self.collection_wrap.num_entities == entities
# verify search and query
log.info(f"wait for load finished and be ready for search")
self.collection_wrap.load(_refresh=True)
time.sleep(2)
search_data = cf.gen_binary_vectors(1, dim)[1]
search_params = {"metric_type": "JACCARD", "params": {"nprobe": 10}}
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=1,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128])
@pytest.mark.parametrize("entities", [2000])
def test_partition_csv(self, auto_id, dim, entities):
"""
collection schema: [pk, int64, string, float_vector]
Step:
1. create collection and partition
2. bulid index and load partition
3. import data into the partition
4. verify num entities
5. verify index status
6. verify search and query
"""
data_fields = [df.int_field, df.string_field, df.vec_field]
files = prepare_bulk_insert_csv_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
auto_id=auto_id,
data_fields=data_fields,
force=True
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert_partition")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_int64_field(name=df.int_field),
cf.gen_string_field(name=df.string_field),
cf.gen_float_vec_field(name=df.vec_field, dim=dim)
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# create a partition
p_name = cf.gen_unique_str("bulk_insert_partition")
m_partition, _ = self.collection_wrap.create_partition(partition_name=p_name)
# build index
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
# load before bulk insert
self.collection_wrap.load(partition_names=[p_name])
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=p_name,
files = files
)
logging.info(f"bulk insert task ids:{task_id}")
success, state = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
assert success
assert m_partition.num_entities == entities
assert self.collection_wrap.num_entities == entities
log.debug(state)
time.sleep(2)
self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300)
res, _ = self.utility_wrap.index_building_progress(c_name)
log.info(f"index building progress: {res}")
log.info(f"wait for load finished and be ready for search")
self.collection_wrap.load(_refresh=True)
time.sleep(2)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
nq = 10
topk = 5
search_data = cf.gen_vectors(nq, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq, "limit": topk},
)
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)