test: improve concurrency and reduce import test execution time (#33356)

improve concurrency and reduce import test execution time

---------

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
pull/33656/head
zhuwenxing 2024-06-06 15:57:58 +08:00 committed by GitHub
parent 4dd0c54ca0
commit 86274f70bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 52 additions and 477 deletions

View File

@ -80,23 +80,25 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
5. verify search successfully
6. verify query successfully
"""
files = prepare_bulk_insert_json_files(
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_new_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=is_row_based,
rows=entities,
dim=dim,
auto_id=auto_id,
data_fields=default_vec_only_fields,
data_fields=data_fields,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
schema = cf.gen_collection_schema(fields=fields)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
@ -120,7 +122,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# verify imported data is available for search
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
field_name=df.float_vec_field, index_params=index_params
)
time.sleep(2)
self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300)
@ -139,7 +141,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
df.float_vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
@ -167,7 +169,16 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
"""
auto_id = False # no auto id for string_pk schema
string_pk = True
files = prepare_bulk_insert_json_files(
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
cf.gen_string_field(name=df.string_field, is_primary=True, auto_id=auto_id),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
]
schema = cf.gen_collection_schema(fields=fields)
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_new_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=is_row_based,
@ -175,15 +186,8 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
dim=dim,
auto_id=auto_id,
str_pk=string_pk,
data_fields=default_vec_only_fields,
data_fields=data_fields,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
cf.gen_string_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
@ -205,7 +209,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# verify imported data is available for search
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
field_name=df.float_vec_field, index_params=index_params
)
self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300)
res, _ = self.utility_wrap.index_building_progress(c_name)
@ -224,7 +228,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
time.sleep(2)
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
df.float_vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
@ -232,7 +236,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
)
for hits in res:
ids = hits.ids
expr = f"{df.pk_field} in {ids}"
expr = f"{df.string_field} in {ids}"
expr = expr.replace("'", '"')
results, _ = self.collection_wrap.query(expr=expr)
assert len(results) == len(ids)
@ -433,21 +437,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
bulk_insert_row = 500
direct_insert_row = 3000
dim = 128
files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=True,
rows=bulk_insert_row,
dim=dim,
data_fields=[df.pk_field, df.float_field, df.vec_field],
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_field(name=df.float_field),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
]
data = [
[i for i in range(direct_insert_row)],
@ -460,7 +455,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# build index
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
field_name=df.float_vec_field, index_params=index_params
)
# load collection
self.collection_wrap.load()
@ -468,6 +463,16 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
# insert data
self.collection_wrap.insert(data)
self.collection_wrap.num_entities
files = prepare_bulk_insert_new_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
is_row_based=True,
rows=bulk_insert_row,
dim=dim,
data_fields=[df.pk_field, df.float_field, df.float_vec_field],
force=True,
)
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
@ -503,7 +508,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
df.float_vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
@ -738,91 +743,6 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
assert "name" in fields_from_search
assert "address" in fields_from_search
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.parametrize("enable_dynamic_field", [True, False])
def test_with_all_field_json(self, auto_id, dim, entities, enable_dynamic_field):
"""
collection schema 1: [pk, int64, float64, string float_vector]
data file: vectors.npy and uid.npy,
Steps:
1. create collection
2. import data
3. verify
"""
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_string_field(name=df.string_field),
cf.gen_json_field(name=df.json_field),
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
data_fields=data_fields,
enable_dynamic_field=enable_dynamic_field,
force=True,
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
assert success
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field:
assert "name" in fields_from_search
assert "address" in fields_from_search
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True])
@pytest.mark.parametrize("dim", [128]) # 128
@ -1791,362 +1711,7 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
num_entities += p.num_entities
assert num_entities == entities * file_nums
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("partition_key_field", [df.int_field, df.string_field])
@pytest.mark.skip("import data via csv is no longer supported")
def test_partition_key_on_csv_file(self, auto_id, partition_key_field):
"""
collection: auto_id, customized_id
collection schema: [pk, float_vector, int64, varchar, bool, float]
Step:
1. create collection with partition key enabled
2. import data
3. verify the data entities equal the import data and distributed by values of partition key field
4. load the collection
5. verify search successfully
6. verify query successfully
"""
dim = 12
entities = 200
files = prepare_bulk_insert_csv_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
auto_id=auto_id,
data_fields=default_multi_fields,
force=True
)
self._connect()
c_name = cf.gen_unique_str("bulk_partition_key")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
cf.gen_int64_field(name=df.int_field, is_partition_key=(partition_key_field == df.int_field)),
cf.gen_string_field(name=df.string_field, is_partition_key=(partition_key_field == df.string_field)),
cf.gen_bool_field(name=df.bool_field),
cf.gen_float_field(name=df.float_field),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema, num_partitions=10)
assert len(self.collection_wrap.partitions) == 10
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
files=files,
)
logging.info(f"bulk insert task id:{task_id}")
success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
assert success
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(10)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
nq = 2
topk = 2
search_data = cf.gen_vectors(nq, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq, "limit": topk},
)
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
# verify data was bulk inserted into different partitions
num_entities = 0
empty_partition_num = 0
for p in self.collection_wrap.partitions:
if p.num_entities == 0:
empty_partition_num += 1
num_entities += p.num_entities
assert num_entities == entities
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128])
@pytest.mark.parametrize("entities", [100])
@pytest.mark.skip("import data via csv is no longer supported")
def test_float_vector_csv(self, auto_id, dim, entities):
"""
collection: auto_id, customized_id
collection schema: [pk, float_vector]
Steps:
1. create collection
2. import data
3. verify the data entities equal the import data
4. load the collection
5. verify search successfully
6. verify query successfully
"""
files = prepare_bulk_insert_csv_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
auto_id=auto_id,
data_fields=default_vec_only_fields,
force=True
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
files=files
)
logging.info(f"bulk insert task id:{task_id}")
success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
num_entities = self.collection_wrap.num_entities
log.info(f"collection entities:{num_entities}")
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
time.sleep(2)
self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300)
res, _ = self.utility_wrap.index_building_progress(c_name)
log.info(f"index building progress: {res}")
self.collection_wrap.load()
self.collection_wrap.load(_refresh=True)
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
nq = 2
topk = 2
search_data = cf.gen_vectors(nq, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq, "limit": topk},
)
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128])
@pytest.mark.parametrize("entities", [2000])
@pytest.mark.skip("import data via csv is no longer supported")
def test_binary_vector_csv(self, auto_id, dim, entities):
"""
collection: auto_id, customized_id
collection schema: [pk, int64, binary_vector]
Step:
1. create collection
2. create index and load collection
3. import data
4. verify data entities
5. load collection
6. verify search successfully
7. verify query successfully
"""
files = prepare_bulk_insert_csv_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
auto_id=auto_id,
float_vector=False,
data_fields=default_vec_only_fields,
force=True
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_binary_vec_field(name=df.vec_field, dim=dim)
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# build index before bulk insert
binary_index_params = {
"index_type": "BIN_IVF_FLAT",
"metric_type": "JACCARD",
"params": {"nlist": 64},
}
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=binary_index_params
)
# load collection
self.collection_wrap.load()
# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=None,
files=files
)
logging.info(f"bulk insert task ids:{task_id}")
success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
assert success
time.sleep(2)
self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300)
res, _ = self.utility_wrap.index_building_progress(c_name)
log.info(f"index building progress: {res}")
# verify num entities
assert self.collection_wrap.num_entities == entities
# verify search and query
log.info(f"wait for load finished and be ready for search")
self.collection_wrap.load(_refresh=True)
time.sleep(2)
search_data = cf.gen_binary_vectors(1, dim)[1]
search_params = {"metric_type": "JACCARD", "params": {"nprobe": 10}}
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=1,
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128])
@pytest.mark.parametrize("entities", [2000])
@pytest.mark.skip("import data via csv is no longer supported")
def test_partition_csv(self, auto_id, dim, entities):
"""
collection schema: [pk, int64, string, float_vector]
Step:
1. create collection and partition
2. build index and load partition
3. import data into the partition
4. verify num entities
5. verify index status
6. verify search and query
"""
data_fields = [df.int_field, df.string_field, df.vec_field]
files = prepare_bulk_insert_csv_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
auto_id=auto_id,
data_fields=data_fields,
force=True
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert_partition")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True),
cf.gen_int64_field(name=df.int_field),
cf.gen_string_field(name=df.string_field),
cf.gen_float_vec_field(name=df.vec_field, dim=dim)
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id)
self.collection_wrap.init_collection(c_name, schema=schema)
# create a partition
p_name = cf.gen_unique_str("bulk_insert_partition")
m_partition, _ = self.collection_wrap.create_partition(partition_name=p_name)
# build index
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
# load before bulk insert
self.collection_wrap.load(partition_names=[p_name])
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name,
partition_name=p_name,
files = files
)
logging.info(f"bulk insert task ids:{task_id}")
success, state = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt}")
assert success
assert m_partition.num_entities == entities
assert self.collection_wrap.num_entities == entities
log.debug(state)
time.sleep(2)
self.utility_wrap.wait_for_index_building_complete(c_name, timeout=300)
res, _ = self.utility_wrap.index_building_progress(c_name)
log.info(f"index building progress: {res}")
log.info(f"wait for load finished and be ready for search")
self.collection_wrap.load(_refresh=True)
time.sleep(2)
log.info(
f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}"
)
nq = 10
topk = 5
search_data = cf.gen_vectors(nq, dim)
search_params = ct.default_search_params
res, _ = self.collection_wrap.search(
search_data,
df.vec_field,
param=search_params,
limit=topk,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq, "limit": topk},
)
for hits in res:
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)

View File

@ -74,10 +74,10 @@ cd ${ROOT}/tests/python_client
if [[ "${MILVUS_HELM_RELEASE_NAME}" != *"msop"* ]]; then
if [[ -n "${TEST_TIMEOUT:-}" ]]; then
timeout "${TEST_TIMEOUT}" pytest testcases/test_bulk_insert.py --timeout=300 --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME} \
timeout "${TEST_TIMEOUT}" pytest testcases/test_bulk_insert.py --timeout=300 -n 6 --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME} \
--html=${CI_LOG_PATH}/report_bulk_insert.html --self-contained-html
else
pytest testcases/test_bulk_insert.py --timeout=300 --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME} \
pytest testcases/test_bulk_insert.py --timeout=300 -n 6 --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --minio_host ${MINIO_SERVICE_NAME} \
--html=${CI_LOG_PATH}/report_bulk_insert.html --self-contained-html
fi
fi
@ -133,3 +133,13 @@ else
pytest --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} \
--html=${CI_LOG_PATH}/report.html --self-contained-html ${@:-}
fi
# # Run concurrent test with 5 processes
# if [[ -n "${TEST_TIMEOUT:-}" ]]; then
# timeout "${TEST_TIMEOUT}" pytest testcases/test_concurrent.py --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --count 5 -n 5 \
# --html=${CI_LOG_PATH}/report_concurrent.html --self-contained-html
# else
# pytest testcases/test_concurrent.py --host ${MILVUS_SERVICE_NAME} --port ${MILVUS_SERVICE_PORT} --count 5 -n 5 \
# --html=${CI_LOG_PATH}/report_concurrent.html --self-contained-html
# fi