mirror of https://github.com/milvus-io/milvus.git
Add negtive tescases for load balance (#16460)
Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>pull/16414/head
parent
d5a0e8b75f
commit
88d11b958b
|
@ -1493,3 +1493,114 @@ class TestUtilityAdvanced(TestcaseBase):
|
|||
des_sealed_segment_ids += segment_distribution[des_node_id]["sealed"]
|
||||
# assert sealed_segment_ids is subset of des_sealed_segment_ids
|
||||
assert set(sealed_segment_ids).issubset(des_sealed_segment_ids)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_load_balance_with_src_node_not_exist(self):
|
||||
"""
|
||||
target: test load balance of collection
|
||||
method: init a collection and load balance with src_node not exist
|
||||
expected: raise exception
|
||||
"""
|
||||
# init a collection
|
||||
c_name = cf.gen_unique_str(prefix)
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
ms = MilvusSys()
|
||||
nb = 3000
|
||||
df = cf.gen_default_dataframe_data(nb)
|
||||
collection_w.insert(df)
|
||||
# get sealed segments
|
||||
collection_w.num_entities
|
||||
# get growing segments
|
||||
collection_w.insert(df)
|
||||
collection_w.load()
|
||||
# prepare load balance params
|
||||
res, _ = self.utility_wrap.get_query_segment_info(c_name)
|
||||
segment_distribution = cf.get_segment_distribution(res)
|
||||
all_querynodes = [node["identifier"] for node in ms.query_nodes]
|
||||
all_querynodes = sorted(all_querynodes,
|
||||
key=lambda x: len(segment_distribution[x]["sealed"])
|
||||
if x in segment_distribution else 0, reverse=True)
|
||||
# set src_node_id as the id of indexnode's id, which is not exist for querynode
|
||||
invalid_src_node_id = [node["identifier"] for node in ms.index_nodes][0]
|
||||
src_node_id = all_querynodes[0]
|
||||
dst_node_ids = all_querynodes[1:]
|
||||
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
|
||||
# load balance
|
||||
self.utility_wrap.load_balance(invalid_src_node_id, dst_node_ids, sealed_segment_ids,
|
||||
check_task=CheckTasks.err_res,
|
||||
check_items={ct.err_code: 1, ct.err_msg: "is not exist to balance"})
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_load_balance_with_all_dst_node_not_exist(self):
|
||||
"""
|
||||
target: test load balance of collection
|
||||
method: init a collection and load balance with all dst_node not exist
|
||||
expected: raise exception
|
||||
"""
|
||||
# init a collection
|
||||
c_name = cf.gen_unique_str(prefix)
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
ms = MilvusSys()
|
||||
nb = 3000
|
||||
df = cf.gen_default_dataframe_data(nb)
|
||||
collection_w.insert(df)
|
||||
# get sealed segments
|
||||
collection_w.num_entities
|
||||
# get growing segments
|
||||
collection_w.insert(df)
|
||||
collection_w.load()
|
||||
# prepare load balance params
|
||||
res, _ = self.utility_wrap.get_query_segment_info(c_name)
|
||||
segment_distribution = cf.get_segment_distribution(res)
|
||||
all_querynodes = [node["identifier"] for node in ms.query_nodes]
|
||||
all_querynodes = sorted(all_querynodes,
|
||||
key=lambda x: len(segment_distribution[x]["sealed"])
|
||||
if x in segment_distribution else 0, reverse=True)
|
||||
src_node_id = all_querynodes[0]
|
||||
# add indexnode's id, which is not exist for querynode, to dst_node_ids
|
||||
dst_node_ids = [node["identifier"] for node in ms.index_nodes]
|
||||
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
|
||||
# load balance
|
||||
self.utility_wrap.load_balance(src_node_id, dst_node_ids, sealed_segment_ids,
|
||||
check_task=CheckTasks.err_res,
|
||||
check_items={ct.err_code: 1, ct.err_msg: "no available queryNode to allocate"})
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_load_balance_with_one_sealed_segment_id_not_exist(self):
|
||||
"""
|
||||
target: test load balance of collection
|
||||
method: init a collection and load balance with one of sealed segment ids not exist
|
||||
expected: raise exception
|
||||
"""
|
||||
# init a collection
|
||||
c_name = cf.gen_unique_str(prefix)
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
ms = MilvusSys()
|
||||
nb = 3000
|
||||
df = cf.gen_default_dataframe_data(nb)
|
||||
collection_w.insert(df)
|
||||
# get sealed segments
|
||||
collection_w.num_entities
|
||||
# get growing segments
|
||||
collection_w.insert(df)
|
||||
collection_w.load()
|
||||
# prepare load balance params
|
||||
res, _ = self.utility_wrap.get_query_segment_info(c_name)
|
||||
segment_distribution = cf.get_segment_distribution(res)
|
||||
all_querynodes = [node["identifier"] for node in ms.query_nodes]
|
||||
all_querynodes = sorted(all_querynodes,
|
||||
key=lambda x: len(segment_distribution[x]["sealed"])
|
||||
if x in segment_distribution else 0, reverse=True)
|
||||
src_node_id = all_querynodes[0]
|
||||
dst_node_ids = all_querynodes[1:]
|
||||
dst_node_ids.append([node["identifier"] for node in ms.index_nodes][0])
|
||||
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
|
||||
# add a segment id which is not exist or a growing segment
|
||||
if len(segment_distribution[src_node_id]["growing"]) > 0:
|
||||
sealed_segment_ids.append(segment_distribution[src_node_id]["growing"][0])
|
||||
else:
|
||||
sealed_segment_ids.append(max(segment_distribution[src_node_id]["sealed"]) + 1)
|
||||
# load balance
|
||||
self.utility_wrap.load_balance(src_node_id, dst_node_ids, sealed_segment_ids,
|
||||
check_task=CheckTasks.err_res,
|
||||
check_items={ct.err_code: 1, ct.err_msg: "is not exist"})
|
||||
|
|
Loading…
Reference in New Issue