Merge bulkinsert partition key enable tests from 2.2.0 branch (#27372)

Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>
pull/27382/head
yanliang567 2023-09-26 18:43:32 +08:00 committed by GitHub
parent f1257cf11b
commit 3e864d4210
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 701 additions and 361 deletions

View File

@ -188,6 +188,24 @@ class ApiUtilityWrapper:
if task.task_id in pending_task_ids: if task.task_id in pending_task_ids:
log.info(f"task {task.task_id} state transfer from pending to {task.state_name}") log.info(f"task {task.task_id} state transfer from pending to {task.state_name}")
def wait_index_build_completed(self, collection_name, timeout=None):
start = time.time()
if timeout is not None:
task_timeout = timeout
else:
task_timeout = TIMEOUT
end = time.time()
while end - start <= task_timeout:
time.sleep(0.5)
index_states, _ = self.index_building_progress(collection_name)
log.debug(f"index states: {index_states}")
if index_states["total_rows"] == index_states["indexed_rows"]:
log.info(f"index build completed")
return True
end = time.time()
log.info(f"index build timeout")
return False
def get_query_segment_info(self, collection_name, timeout=None, using="default", check_task=None, check_items=None): def get_query_segment_info(self, collection_name, timeout=None, using="default", check_task=None, check_items=None):
timeout = TIMEOUT if timeout is None else timeout timeout = TIMEOUT if timeout is None else timeout
func_name = sys._getframe().f_code.co_name func_name = sys._getframe().f_code.co_name

File diff suppressed because it is too large Load Diff

View File

@ -69,7 +69,7 @@ def gen_str_invalid_vectors(nb, dim):
def gen_binary_vectors(nb, dim): def gen_binary_vectors(nb, dim):
# binary: each int presents 8 dimension # binary: each int presents 8 dimension
# so if binary vector dimension is 16use [x, y], which x and y could be any int between 0 to 255 # so if binary vector dimension is 16use [x, y], which x and y could be any int between 0 and 255
vectors = [[random.randint(0, 255) for _ in range(dim)] for _ in range(nb)] vectors = [[random.randint(0, 255) for _ in range(dim)] for _ in range(nb)]
return vectors return vectors
@ -276,6 +276,21 @@ def gen_string_in_numpy_file(dir, data_field, rows, start=0, force=False):
return file_name return file_name
def gen_bool_in_numpy_file(dir, data_field, rows, start=0, force=False):
file_name = f"{data_field}.npy"
file = f"{dir}/{file_name}"
if not os.path.exists(file) or force:
# non vector columns
data = []
if rows > 0:
data = [random.choice([True, False]) for i in range(start, rows+start)]
arr = np.array(data)
# print(f"file_name: {file_name} data type: {arr.dtype}")
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
np.save(file, arr)
return file_name
def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False): def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False):
file_name = f"{data_field}.npy" file_name = f"{data_field}.npy"
file = f"{dir}/{file_name}" file = f"{dir}/{file_name}"
@ -378,6 +393,8 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_nums=1, err_type=""
rows=rows, dim=dim, force=force) rows=rows, dim=dim, force=force)
elif data_field == DataField.string_field: # string field for numpy not supported yet at 2022-10-17 elif data_field == DataField.string_field: # string field for numpy not supported yet at 2022-10-17
file_name = gen_string_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force) file_name = gen_string_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force)
elif data_field == DataField.bool_field:
file_name = gen_bool_in_numpy_file(dir=data_source, data_field=data_field, rows=rows, force=force)
else: else:
file_name = gen_int_or_float_in_numpy_file(dir=data_source, data_field=data_field, file_name = gen_int_or_float_in_numpy_file(dir=data_source, data_field=data_field,
rows=rows, force=force) rows=rows, force=force)
@ -468,8 +485,8 @@ def prepare_bulk_insert_json_files(minio_endpoint="", bucket_name="milvus-bucket
return files return files
def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, data_fields=[DataField.vec_field], def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128,
float_vector=True, file_nums=1, force=False): data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False):
""" """
Generate column based files based on params in numpy format and copy them to the minio Generate column based files based on params in numpy format and copy them to the minio
Note: each field in data_fields would be generated one numpy file. Note: each field in data_fields would be generated one numpy file.
@ -484,20 +501,20 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke
:type float_vector: boolean :type float_vector: boolean
:param: data_fields: data fields to be generated in the file(s): :param: data_fields: data fields to be generated in the file(s):
it support one or all of [int_pk, vectors, int, float] it supports one or all of [int_pk, vectors, int, float]
Note: it does not automatically adds pk field Note: it does not automatically add pk field
:type data_fields: list :type data_fields: list
:param file_nums: file numbers to be generated :param file_nums: file numbers to be generated
The file(s) would be generated in data_source folder if file_nums = 1 The file(s) would be generated in data_source folder if file_nums = 1
The file(s) would be generated in different subfolers if file_nums > 1 The file(s) would be generated in different sub-folders if file_nums > 1
:type file_nums: int :type file_nums: int
:param force: re-generate the file(s) regardless existing or not :param force: re-generate the file(s) regardless existing or not
:type force: boolean :type force: boolean
Return: List Return: List
File name list or file name with subfolder list File name list or file name with sub-folder list
""" """
files = gen_npy_files(rows=rows, dim=dim, float_vector=float_vector, files = gen_npy_files(rows=rows, dim=dim, float_vector=float_vector,
data_fields=data_fields, data_fields=data_fields,
@ -505,4 +522,3 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force) copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
return files return files

View File

@ -21,7 +21,8 @@ default_binary_index = {"index_type": "BIN_IVF_FLAT", "params": {"nlist": 128},
default_diskann_index = {"index_type": "DISKANN", "metric_type": "COSINE", "params": {}} default_diskann_index = {"index_type": "DISKANN", "metric_type": "COSINE", "params": {}}
default_diskann_search_params = {"metric_type": "COSINE", "params": {"search_list": 30}} default_diskann_search_params = {"metric_type": "COSINE", "params": {"search_list": 30}}
max_top_k = 16384 max_top_k = 16384
max_partition_num = 4096 # 256 max_partition_num = 4096
default_partition_num = 64 # default num_partitions for partition key feature
default_segment_row_limit = 1000 default_segment_row_limit = 1000
default_server_segment_row_limit = 1024 * 512 default_server_segment_row_limit = 1024 * 512
default_alias = "default" default_alias = "default"

View File

@ -742,3 +742,197 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
check_task=CheckTasks.check_search_results, check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1}, check_items={"nq": 1, "limit": 1},
) )
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("is_row_based", [True])
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("par_key_field", [df.int_field, df.string_field])
def test_partition_key_on_json_file(self, is_row_based, auto_id, par_key_field):
"""
collection: auto_id, customized_id
collection schema: [pk, int64, varchar, float_vector]
Steps:
1. create collection with partition key enabled
2. import data
3. verify the data entities equal the import data and distributed by values of partition key field
4. load the collection
5. verify search successfully
6. verify query successfully
"""
dim = 12
entities = 200
files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=is_row_based,
rows=entities,
dim=dim,
auto_id=auto_id,
data_fields=default_multi_fields,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_parkey")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
cf.gen_int64_field(name=df.int_field, is_partition_key=(par_key_field == df.int_field)),
cf.gen_string_field(name=df.string_field, is_partition_key=(par_key_field == df.string_field)),
cf.gen_bool_field(name=df.bool_field),
cf.gen_float_field(name=df.float_field),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
assert len(self.collection_wrap.partitions) == ct.default_partition_num
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
files=files,
)
logging.info(f"bulk insert task id:{task_id}")
success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
assert success
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(10)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
nq = 2
topk = 2
search_data = cf.gen_vectors(nq, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq, "limit": topk},
)
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
# verify data was bulk inserted into different partitions
num_entities = 0
empty_partition_num = 0
for p in self.collection_wrap.partitions:
if p.num_entities == 0:
empty_partition_num += 1
num_entities += p.num_entities
assert num_entities == entities
# verify error when tyring to bulk insert into a specific partition
# TODO: enable the error msg assert after issue #25586 fixed
err_msg = "not allow to set partition name for collection with partition key"
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=self.collection_wrap.partitions[0].name,
files=files,
check_task=CheckTasks.err_res,
check_items={"err_code": 99, "err_msg": err_msg},
)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [13])
@pytest.mark.parametrize("entities", [300])
@pytest.mark.parametrize("file_nums", [10])
def test_partition_key_on_multi_numpy_files(
self, auto_id, dim, entities, file_nums
):
"""
collection schema 1: [pk, int64, float_vector, double]
data file: .npy files in different folders
Steps:
1. create collection with partition key enabled, create index and load
2. import data
3. verify that import numpy files in a loop
"""
self._connect()
c_name = cf.gen_unique_str("bulk_ins_parkey")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_int64_field(name=df.int_field, is_partition_key=True),
cf.gen_float_field(name=df.float_field),
cf.gen_double_field(name=df.double_field),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
schema = cf.gen_collection_schema(fields=fields)
self.collection_wrap.init_collection(c_name, schema=schema)
# build index
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
# load collection
self.collection_wrap.load()
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
task_ids = []
for i in range(file_nums):
files = prepare_bulk_insert_numpy_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
data_fields=data_fields,
file_nums=1,
force=True,
)
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
task_ids.append(task_id)
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=90
)
log.info(f"bulk insert state:{success}")
assert success
log.info(f" collection entities: {self.collection_wrap.num_entities}")
assert self.collection_wrap.num_entities == entities * file_nums
# verify imported data is indexed
success = self.utility_wrap.wait_index_build_completed(c_name)
assert success
# verify search and query
log.info(f"wait for load finished and be ready for search")
self.collection_wrap.load(_refresh=True)
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=1,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
# verify data was bulk inserted into different partitions
num_entities = 0
empty_partition_num = 0
for p in self.collection_wrap.partitions:
if p.num_entities == 0:
empty_partition_num += 1
num_entities += p.num_entities
assert num_entities == entities * file_nums

View File

@ -25,6 +25,7 @@ class TestPartitionKeyParams(TestcaseBase):
schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], auto_id=True) schema = cf.gen_collection_schema(fields=[pk_field, int64_field, string_field, vector_field], auto_id=True)
c_name = cf.gen_unique_str("par_key") c_name = cf.gen_unique_str("par_key")
collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema) collection_w, _ = self.collection_wrap.init_collection(name=c_name, schema=schema)
assert len(collection_w.partitions) == ct.default_partition_num
# insert # insert
nb = 1000 nb = 1000