Add cases for utility loading progress (#7682)

Signed-off-by: ThreadDao <yufen.zong@zilliz.com>
pull/7645/head^2
ThreadDao 2021-09-10 10:36:00 +08:00 committed by GitHub
parent 5a303e7672
commit 4d9a013d14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 352 additions and 48 deletions

View File

@ -15,7 +15,7 @@ class ApiUtilityWrapper:
ut = utility
def loading_progress(self, collection_name, partition_names=[],
def loading_progress(self, collection_name, partition_names=None,
using="default", check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.loading_progress, collection_name, partition_names, using])
@ -24,7 +24,7 @@ class ApiUtilityWrapper:
partition_names=partition_names, using=using).run()
return res, check_result
def wait_for_loading_complete(self, collection_name, partition_names=[], timeout=None, using="default",
def wait_for_loading_complete(self, collection_name, partition_names=None, timeout=None, using="default",
check_task=None, check_items=None):
timeout = TIMEOUT if timeout is None else timeout
@ -73,6 +73,16 @@ class ApiUtilityWrapper:
partition_name=partition_name, using=using).run()
return res, check_result
def drop_collection(self, collection_name, timeout=None, using="default", check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.drop_collection(collection_name, timeout=timeout, using=using)])
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,
collection_name=collection_name,
timeout=timeout, using=using).run()
log.debug(res)
log.debug(check_result)
return res, check_result
def list_collections(self, timeout=None, using="default", check_task=None, check_items=None):
timeout = TIMEOUT if timeout is None else timeout

View File

@ -297,7 +297,6 @@ class TestQueryBase(TestcaseBase):
self.collection_wrap.query(term_expr, check_task=CheckTasks.check_query_results, check_items={exp_res: res})
@pytest.mark.tag(CaseLabel.L1)
@pytest.mark.xfail(reason="issue #7544")
def test_query_expr_random_values(self):
"""
target: test query with random filter values
@ -313,12 +312,12 @@ class TestQueryBase(TestcaseBase):
self.collection_wrap.load()
# random_values = [random.randint(0, ct.default_nb) for _ in range(4)]
random_values = [0, 2, 4, 0]
term_expr = f'{ct.default_int64_field_name} not in {random_values}'
random_values = [0, 2, 4, 3]
term_expr = f'{ct.default_int64_field_name} in {random_values}'
res = df.iloc[random_values, :1].to_dict('records')
self.collection_wrap.query(term_expr, check_task=CheckTasks.check_query_results, check_items={exp_res: res})
@pytest.mark.xfail(reason="issue #7553")
@pytest.mark.tag(CaseLabel.L1)
def test_query_expr_not_in_random(self):
self._connect()
df = cf.gen_default_dataframe_data(nb=50)

View File

@ -13,6 +13,8 @@ default_field_name = ct.default_float_vec_field_name
default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
default_dim = ct.default_dim
default_nb = ct.default_nb
num_loaded_entities = "num_loaded_entities"
num_total_entities = "num_total_entities"
class TestUtilityParams(TestcaseBase):
@ -42,11 +44,21 @@ class TestUtilityParams(TestcaseBase):
def get_support_metric_field(self, request):
yield request.param
@pytest.fixture(scope="function", params=ct.get_invalid_strs)
def get_invalid_partition_names(self, request):
if isinstance(request.param, list):
if len(request.param) == 0:
pytest.skip("empty is valid for partition")
if request.param is None:
pytest.skip("None is valid for partition")
yield request.param
"""
******************************************************************
# The followings are invalid cases
******************************************************************
"""
@pytest.mark.tags(CaseLabel.L1)
def test_has_collection_name_invalid(self, get_invalid_collection_name):
"""
@ -97,9 +109,17 @@ class TestUtilityParams(TestcaseBase):
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "Invalid"})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.xfail(reason="exception not MilvusException")
def test_drop_collection_name_invalid(self, get_invalid_collection_name):
self._connect()
error = f'`collection_name` value {get_invalid_collection_name} is illegal'
self.utility_wrap.drop_collection(get_invalid_collection_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: error})
# TODO: enable
@pytest.mark.tags(CaseLabel.L1)
def _test_list_collections_using_invalid(self):
def test_list_collections_using_invalid(self):
"""
target: test list_collections with invalid using
method: input invalid name
@ -108,7 +128,7 @@ class TestUtilityParams(TestcaseBase):
self._connect()
using = "empty"
ut = ApiUtilityWrapper()
ex, _ = ut.list_collections(using=using,
ex, _ = ut.list_collections(using=using, check_task=CheckTasks.err_res,
check_items={ct.err_code: 0, ct.err_msg: "should create connect"})
@pytest.mark.tags(CaseLabel.L1)
@ -170,6 +190,109 @@ class TestUtilityParams(TestcaseBase):
log.error(str(ex))
assert "invalid" or "illegal" in str(ex)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("invalid_c_name", ["12-s", "12 s", "(mn)", "中文", "%$#"])
def test_loading_progress_invalid_collection_name(self, invalid_c_name):
"""
target: test loading progress with invalid collection name
method: input invalid collection name
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
df = cf.gen_default_dataframe_data(nb=ct.default_nb)
self.collection_wrap.construct_from_dataframe(c_name, df, primary_field=ct.default_int64_field_name)
self.collection_wrap.load()
error = {ct.err_code: 1, ct.err_msg: "Invalid collection name: {}".format(invalid_c_name)}
self.utility_wrap.loading_progress(invalid_c_name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_loading_progress_not_existed_collection_name(self):
"""
target: test loading progress with invalid collection name
method: input invalid collection name
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
df = cf.gen_default_dataframe_data(nb=ct.default_nb)
self.collection_wrap.construct_from_dataframe(c_name, df, primary_field=ct.default_int64_field_name)
self.collection_wrap.load()
error = {ct.err_code: 1, ct.err_msg: "describe collection failed: can't find collection"}
self.utility_wrap.loading_progress("not_existed_name", check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tag(CaseLabel.L1)
@pytest.mark.xfail("issue #7613")
def test_loading_progress_invalid_partition_names(self, get_invalid_partition_names):
"""
target: test loading progress with invalid partition names
method: input invalid partition names
expected: raise an exception
"""
collection_w = self.init_collection_general(prefix)[0]
partition_names = get_invalid_partition_names
err_msg = {ct.err_code: 0, ct.err_msg: "`partition_name_array` value {} is illegal".format(partition_names)}
collection_w.load()
self.utility_wrap.loading_progress(collection_w.name, partition_names,
check_task=CheckTasks.err_res, check_items=err_msg)
@pytest.mark.tag(CaseLabel.L1)
@pytest.mark.xfail("issue #7613")
@pytest.mark.parametrize("partition_names", [[ct.default_tag], [ct.default_partition_name, ct.default_tag]])
def test_loading_progress_not_existed_partitions(self, partition_names):
"""
target: test loading progress with not existed partitions
method: input all or part not existed partition names
expected: raise exception
"""
collection_w = self.init_collection_general(prefix)[0]
log.debug(collection_w.num_entities)
collection_w.load()
err_msg = {ct.err_code: 0, ct.err_msg: "can't find partition"}
self.utility_wrap.loading_progress(collection_w.name, partition_names,
check_task=CheckTasks.err_res, check_items=err_msg)
@pytest.mark.tags(CaseLabel.L1)
def test_wait_for_loading_collection_not_existed(self):
"""
target: test wait for loading
method: input collection not created before
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
self.utility_wrap.wait_for_loading_complete(
c_name,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "can't find collection"})
@pytest.mark.tags(CaseLabel.L1)
def test_wait_for_loading_partition_not_existed(self):
"""
target: test wait for loading
method: input partition not created before
expected: raise exception
"""
self._connect()
collection_w = self.init_collection_wrap()
self.utility_wrap.wait_for_loading_complete(
collection_w.name, partition_names=[ct.default_tag],
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: f'partitionID of partitionName:{ct.default_tag} can not be find'})
def _test_drop_collection_not_existed(self):
"""
target: test drop an not existed collection
method: drop a not created collection
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
error = {ct.err_code: 0, ct.err_msg: "describe collection failed: can't find collection:"}
self.utility_wrap.drop_collection(c_name, check_task=CheckTasks.err_res, check_items=error)
# with pytest.raises(Exception) as e:
# self.utility_wrap.drop_collection(c_name)
@pytest.mark.tags(CaseLabel.L1)
def test_calc_distance_left_vector_invalid_type(self, get_invalid_vector_dict):
"""
@ -364,6 +487,7 @@ class TestUtilityParams(TestcaseBase):
"err_msg": "collection {} was not "
"loaded into memory)".format(collection_w.name)})
class TestUtilityBase(TestcaseBase):
""" Test case of index interface """
@ -465,7 +589,18 @@ class TestUtilityBase(TestcaseBase):
res, _ = self.utility_wrap.has_partition(c_name, p_name)
assert res is False
#@pytest.mark.xfail(reason="issue #5667")
@pytest.mark.tags(CaseLabel.L2)
def test_has_default_partition(self):
"""
target: test has_partition with '_default' partition
method: input collection name and partition name created before
expected: True
"""
c_name = cf.gen_unique_str(prefix)
self.init_collection_wrap(name=c_name)
res, _ = self.utility_wrap.has_partition(c_name, ct.default_partition_name)
assert res is True
@pytest.mark.tags(CaseLabel.L1)
def test_list_collections(self):
"""
@ -500,9 +635,9 @@ class TestUtilityBase(TestcaseBase):
self._connect()
c_name = cf.gen_unique_str(prefix)
self.utility_wrap.index_building_progress(
c_name,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "can't find collection"})
c_name,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "can't find collection"})
@pytest.mark.tags(CaseLabel.L1)
def test_index_process_collection_empty(self):
@ -512,9 +647,11 @@ class TestUtilityBase(TestcaseBase):
expected: no exception raised
"""
c_name = cf.gen_unique_str(prefix)
self.init_collection_wrap(name=c_name)
error = {ct.err_code: 1, ct.err_msg: "no index is created"}
self.utility_wrap.index_building_progress(c_name, check_task=CheckTasks.err_res, check_items=error)
cw = self.init_collection_wrap(name=c_name)
self.index_wrap.init_index(cw.collection, default_field_name, default_index_params)
res, _ = self.utility_wrap.index_building_progress(c_name)
exp_res = {'total_rows': 0, 'indexed_rows': 0}
assert res == exp_res
@pytest.mark.tags(CaseLabel.L1)
def test_index_process_collection_insert_no_index(self):
@ -535,37 +672,43 @@ class TestUtilityBase(TestcaseBase):
def test_index_process_collection_index(self):
"""
target: test building_process
method: insert 1200 entities, build and call building_process
expected: 1200 entity indexed
method: 1.insert 1024 (because minSegmentSizeToEnableIndex=1024)
2.build(server does create index) and call building_process
expected: indexed_rows=0
"""
nb = 1200
c_name = cf.gen_unique_str(prefix)
cw = self.init_collection_wrap(name=c_name)
data = cf.gen_default_list_data(nb)
cw.insert(data=data)
error = {ct.err_code: 1, ct.err_msg: "no index is created"}
self.utility_wrap.index_building_progress(c_name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_index_process_collection_indexing(self):
"""
target: test building_process
method: call building_process during building
expected: 1200 entity indexed
"""
nb = 1200
nb = 1024
c_name = cf.gen_unique_str(prefix)
cw = self.init_collection_wrap(name=c_name)
data = cf.gen_default_list_data(nb)
cw.insert(data=data)
cw.create_index(default_field_name, default_index_params)
res, _ = self.utility_wrap.index_building_progress(c_name)
for _ in range(2):
assert "indexed_rows" in res
assert res["indexed_rows"] <= nb
assert res["indexed_rows"] >= 0
assert "total_rows" in res
assert res["total_rows"] == nb
assert res['indexed_rows'] == 0
assert res['total_rows'] == nb
@pytest.mark.tags(CaseLabel.L1)
def test_index_process_collection_indexing(self):
"""
target: test building_process
method: 1.insert 2048 entities to ensure that server will build
2.call building_process during building
expected: 2048 or less entities indexed
"""
nb = 2048
c_name = cf.gen_unique_str(prefix)
cw = self.init_collection_wrap(name=c_name)
data = cf.gen_default_list_data(nb)
cw.insert(data=data)
cw.create_index(default_field_name, default_index_params)
res, _ = self.utility_wrap.index_building_progress(c_name)
assert (0 < res['indexed_rows'] <= nb)
assert res['total_rows'] == nb
# for _ in range(2):
# assert "indexed_rows" in res
# assert res["indexed_rows"] <= nb
# assert res["indexed_rows"] >= 0
# assert "total_rows" in res
# assert res["total_rows"] == nb
@pytest.mark.tags(CaseLabel.L1)
def test_wait_index_collection_not_existed(self):
@ -577,9 +720,9 @@ class TestUtilityBase(TestcaseBase):
self._connect()
c_name = cf.gen_unique_str(prefix)
self.utility_wrap.wait_for_index_building_complete(
c_name,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "can't find collection"})
c_name,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "can't find collection"})
@pytest.mark.tags(CaseLabel.L1)
def test_wait_index_collection_empty(self):
@ -590,11 +733,12 @@ class TestUtilityBase(TestcaseBase):
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
self.init_collection_wrap(name=c_name)
res, _ = self.utility_wrap.wait_for_index_building_complete(
c_name,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "no index is created"})
cw = self.init_collection_wrap(name=c_name)
cw.create_index(default_field_name, default_index_params)
assert self.utility_wrap.wait_for_index_building_complete(c_name)[0]
res, _ = self.utility_wrap.index_building_progress(c_name)
exp_res = {'total_rows': 0, 'indexed_rows': 0}
assert res == exp_res
@pytest.mark.tags(CaseLabel.L1)
def test_wait_index_collection_index(self):
@ -614,6 +758,156 @@ class TestUtilityBase(TestcaseBase):
res, _ = self.utility_wrap.index_building_progress(c_name)
assert res["indexed_rows"] == nb
@pytest.mark.tag(CaseLabel.L1)
def test_loading_progress_without_loading(self):
"""
target: test loading progress without loading
method: insert and flush data, call loading_progress without loading
expected: loaded entities is 0
"""
collection_w = self.init_collection_wrap()
df = cf.gen_default_dataframe_data()
collection_w.insert(df)
assert collection_w.num_entities == ct.default_nb
exp_res = {num_loaded_entities: 0, num_total_entities: ct.default_nb}
res, _ = self.utility_wrap.loading_progress(collection_w.name)
assert res == exp_res
@pytest.mark.tag(CaseLabel.L1)
@pytest.mark.parametrize("nb", [ct.default_nb, 5000])
def test_loading_progress_collection(self, nb):
"""
target: test loading progress
method: 1.insert flush and load 2.call loading_progress
expected: all entities is loafed, because load is synchronous
"""
# create, insert default_nb, flush and load
collection_w = self.init_collection_general(prefix, insert_data=True, nb=nb)[0]
res, _ = self.utility_wrap.loading_progress(collection_w.name)
assert res[num_total_entities] == nb
assert res[num_loaded_entities] == nb
@pytest.mark.tag(CaseLabel.L1)
@pytest.mark.xfail(reason="pymilvus issue #702")
def test_loading_progress_with_async_load(self):
"""
target: test loading progress with async collection load
method: 1.load collection with async=True 2.loading_progress
expected: loading part entities
"""
collection_w = self.init_collection_wrap()
df = cf.gen_default_dataframe_data()
collection_w.insert(df)
assert collection_w.num_entities == ct.default_nb
collection_w.load(_async=True)
res, _ = self.utility_wrap.loading_progress(collection_w.name)
assert (0 < res[num_loaded_entities] <= ct.default_nb)
@pytest.mark.tag(CaseLabel.L1)
def test_loading_progress_empty_collection(self):
"""
target: test loading_progress on a empty collection
method: 1.create collection and no insert 2.loading_progress
expected: 0 entities is loaded
"""
collection_w = self.init_collection_wrap()
collection_w.load()
res, _ = self.utility_wrap.loading_progress(collection_w.name)
exp_res = {num_loaded_entities: 0, num_total_entities: 0}
assert exp_res == res
@pytest.mark.tag(CaseLabel.L1)
def test_loading_progress_after_release(self):
"""
target: test loading progress without loading
method: insert and flush data, call loading_progress without loading
expected: loaded entities is 0
"""
collection_w = self.init_collection_general(prefix, insert_data=True)[0]
collection_w.release()
exp_res = {num_loaded_entities: 0, num_total_entities: ct.default_nb}
res, _ = self.utility_wrap.loading_progress(collection_w.name)
assert res == exp_res
@pytest.mark.tag(CaseLabel.L2)
def test_loading_progress_with_release_partition(self):
"""
target: test loading progress after release part partitions
method: 1.insert data into two partitions and flush
2.load collection and release onr partition
expected: loaded one partition entities
"""
half = ct.default_nb
# insert entities into two partitions, collection flush and load
collection_w, partition_w, _, _ = self.insert_entities_into_two_partitions_in_half(half)
partition_w.release()
res = self.utility_wrap.loading_progress(collection_w.name)[0]
assert res[num_total_entities] == half * 2
assert res[num_loaded_entities] == half
@pytest.mark.tag(CaseLabel.L2)
def test_loading_progress_with_load_partition(self):
"""
target: test loading progress after load partition
method: 1.insert data into two partitions and flush
2.load one partition and loading progress
expected: loaded one partition entities
"""
half = ct.default_nb
collection_w, partition_w, _, _ = self.insert_entities_into_two_partitions_in_half(half)
collection_w.release()
partition_w.load()
res = self.utility_wrap.loading_progress(collection_w.name)[0]
assert res[num_total_entities] == half * 2
assert res[num_loaded_entities] == half
@pytest.mark.tag(CaseLabel.L1)
def test_loading_progress_with_partition(self):
"""
target: test loading progress with partition
method: 1.insert data into two partitions and flush, and load
2.loading progress with one partition
expected: loaded one partition entities
"""
half = ct.default_nb
collection_w, partition_w, _, _ = self.insert_entities_into_two_partitions_in_half(half)
res = self.utility_wrap.loading_progress(collection_w.name, partition_names=[partition_w.name])[0]
assert res[num_total_entities] == half
assert res[num_loaded_entities] == half
@pytest.mark.tags(CaseLabel.L1)
def test_wait_loading_collection_empty(self):
"""
target: test wait_for_loading
method: input empty collection
expected: no exception raised
"""
self._connect()
cw = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
cw.load()
self.utility_wrap.wait_for_loading_complete(cw.name)
res, _ = self.utility_wrap.loading_progress(cw.name)
exp_res = {num_total_entities: 0, num_loaded_entities: 0}
assert res == exp_res
@pytest.mark.xfail(reason="pymilvus issue #702")
@pytest.mark.tag(CaseLabel.L1)
def test_wait_for_loading_complete(self):
"""
target: test wait for loading collection
method: insert 10000 entities and wait for loading complete
expected: after loading complete, loaded entities is 10000
"""
nb = 6000
collection_w = self.init_collection_wrap()
df = cf.gen_default_dataframe_data(nb)
collection_w.insert(df)
assert collection_w.num_entities == nb
collection_w.load(_async=True)
self.utility_wrap.wait_for_loading_complete(collection_w.name)
res, _ = self.utility_wrap.loading_progress(collection_w.name)
assert res[num_loaded_entities] == nb
@pytest.mark.tags(CaseLabel.L1)
def test_calc_distance_default(self):
"""
@ -706,7 +1000,7 @@ class TestUtilityBase(TestcaseBase):
vectors_l = vectors[:middle]
vectors_r = []
for i in range(middle):
vectors_r.append(vectors[middle+i])
vectors_r.append(vectors[middle + i])
op_l = {"ids": insert_ids[:middle], "collection": collection_w.name,
"field": default_field_name}
op_r = {"ids": insert_ids[middle:], "collection": collection_w.name,
@ -902,6 +1196,7 @@ class TestUtilityBase(TestcaseBase):
"metric": metric,
"sqrt": sqrt})
class TestUtilityAdvanced(TestcaseBase):
""" Test case of index interface """