mirror of https://github.com/milvus-io/milvus.git
Add more resource group tests (#22263)
Signed-off-by: yanliang567 <yanliang.qiao@zilliz.com>pull/22295/head
parent
42e60f591e
commit
27296a023a
|
@ -68,14 +68,16 @@ class Base:
|
|||
collection_object.drop(check_task=ct.CheckTasks.check_nothing)
|
||||
|
||||
""" Clean up the rgs before disconnect """
|
||||
rgs_list = self.utility_wrap.list_resource_groups()[0]
|
||||
for rg_name in self.resource_group_list:
|
||||
rg = self.utility_wrap.describe_resource_group(name=rg_name, check_task=ct.CheckTasks.check_nothing)[0]
|
||||
if isinstance(rg, ResourceGroupInfo):
|
||||
if rg.num_available_node > 0:
|
||||
self.utility_wrap.transfer_node(source=rg_name,
|
||||
target=ct.default_resource_group_name,
|
||||
num_node=rg.num_available_node)
|
||||
self.utility_wrap.drop_resource_group(rg_name, check_task=ct.CheckTasks.check_nothing)
|
||||
if rg_name is not None and rg_name in rgs_list:
|
||||
rg = self.utility_wrap.describe_resource_group(name=rg_name, check_task=ct.CheckTasks.check_nothing)[0]
|
||||
if isinstance(rg, ResourceGroupInfo):
|
||||
if rg.num_available_node > 0:
|
||||
self.utility_wrap.transfer_node(source=rg_name,
|
||||
target=ct.default_resource_group_name,
|
||||
num_node=rg.num_available_node)
|
||||
self.utility_wrap.drop_resource_group(rg_name, check_task=ct.CheckTasks.check_nothing)
|
||||
|
||||
except Exception as e:
|
||||
log.debug(str(e))
|
||||
|
|
|
@ -5,8 +5,9 @@ from common import common_func as cf
|
|||
from common import common_type as ct
|
||||
from common.common_type import CaseLabel, CheckTasks
|
||||
from utils.util_pymilvus import *
|
||||
from utils.util_log import test_log as log
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="debugging")
|
||||
class TestResourceGroupParams(TestcaseBase):
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
def test_rg_default(self):
|
||||
|
@ -216,19 +217,33 @@ class TestResourceGroupParams(TestcaseBase):
|
|||
4. list rgs
|
||||
verify: create successfully at a max number and fail when create one more
|
||||
"""
|
||||
pass
|
||||
from pymilvus.exceptions import MilvusException
|
||||
max_rg_num = 1024 # including default rg
|
||||
try:
|
||||
for i in range(max_rg_num):
|
||||
rg_name = cf.gen_unique_str(f'rg_{i}')
|
||||
self.init_resource_group(rg_name, check_task=CheckTasks.check_nothing)
|
||||
except MilvusException:
|
||||
pass
|
||||
|
||||
rgs = self.utility_wrap.list_resource_groups()[0]
|
||||
assert len(rgs) == max_rg_num
|
||||
error = {ct.err_code: 999, ct.err_msg: 'failed to create resource group, err=resource group num reach limit'}
|
||||
self.init_resource_group(name=cf.gen_unique_str('rg'),
|
||||
check_task=CheckTasks.err_res,
|
||||
check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_drop_rg_non_existing(self):
|
||||
"""
|
||||
method: drop a rg with a non existing name
|
||||
method: drop a rg with a non-existing name
|
||||
verify: drop successfully
|
||||
"""
|
||||
self._connect()
|
||||
rgs_count = len(self.utility_wrap.list_collections()[0])
|
||||
rgs_count = len(self.utility_wrap.list_resource_groups()[0])
|
||||
rg_name = 'non_existing'
|
||||
self.utility_wrap.drop_resource_group(name=rg_name)
|
||||
assert rgs_count == len(self.utility_wrap.list_collections()[0])
|
||||
assert rgs_count == len(self.utility_wrap.list_resource_groups()[0])
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
@pytest.mark.parametrize("rg_name", ["", None])
|
||||
|
@ -285,10 +300,9 @@ class TestResourceGroupParams(TestcaseBase):
|
|||
"""
|
||||
self._connect()
|
||||
rgs_count = len(self.utility_wrap.list_resource_groups()[0])
|
||||
default_rg_init_cap = 1000000
|
||||
default_rg_init_available_node = 1
|
||||
default_rg_info = {"name": ct.default_resource_group_name,
|
||||
"capacity": default_rg_init_cap,
|
||||
"capacity": ct.default_resource_group_capacity,
|
||||
"num_available_node": default_rg_init_available_node,
|
||||
"num_loaded_replica": {},
|
||||
"num_outgoing_node": {},
|
||||
|
@ -329,7 +343,7 @@ class TestResourceGroupParams(TestcaseBase):
|
|||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_describe_rg_non_existing(self):
|
||||
"""
|
||||
method: describe an non existing rg
|
||||
method: describe a non-existing rg
|
||||
verify: fail with error msg
|
||||
"""
|
||||
self._connect()
|
||||
|
@ -347,10 +361,9 @@ class TestResourceGroupParams(TestcaseBase):
|
|||
num_outgoing_node and num_incoming_node
|
||||
"""
|
||||
self._connect()
|
||||
default_rg_init_cap = 1000000
|
||||
default_rg_init_available_node = 1
|
||||
default_rg_info = {"name": ct.default_resource_group_name,
|
||||
"capacity": default_rg_init_cap,
|
||||
"capacity": ct.default_resource_group_capacity,
|
||||
"num_available_node": default_rg_init_available_node,
|
||||
"num_loaded_replica": {},
|
||||
"num_outgoing_node": {},
|
||||
|
@ -360,7 +373,8 @@ class TestResourceGroupParams(TestcaseBase):
|
|||
check_task=ct.CheckTasks.check_rg_property,
|
||||
check_items=default_rg_info)
|
||||
|
||||
@pytest.mark.skip(reason="will cause concurrent problems")
|
||||
|
||||
@pytest.mark.skip(reason="debugging")
|
||||
class TestTransferNode(TestcaseBase):
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
def test_transfer_node_default(self):
|
||||
|
@ -397,7 +411,7 @@ class TestTransferNode(TestcaseBase):
|
|||
check_task=CheckTasks.check_rg_property,
|
||||
check_items=rg1_info)
|
||||
|
||||
# transfer a querynode to rgB
|
||||
# transfer a query node to rgB
|
||||
self.utility_wrap.transfer_node(source=rg1_name, target=rg2_name, num_node=1)
|
||||
rg2_info = {"name": rg2_name,
|
||||
"capacity": rg_init_cap + 1,
|
||||
|
@ -428,9 +442,6 @@ class TestTransferNode(TestcaseBase):
|
|||
check_items=error
|
||||
)
|
||||
self.utility_wrap.drop_resource_group(name=rg1_name)
|
||||
# clean
|
||||
self.utility_wrap.transfer_node(source=rg2_name, target=ct.default_resource_group_name, num_node=1)
|
||||
self.utility_wrap.drop_resource_group(name=rg2_name)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.parametrize("with_growing", [True, False])
|
||||
|
@ -450,7 +461,8 @@ class TestTransferNode(TestcaseBase):
|
|||
dim = 128
|
||||
collection_w, _, _, insert_ids, _ = self.init_collection_general(insert_data=True, dim=dim)
|
||||
if with_growing:
|
||||
collection_w.insert(cf.gen_default_list_data(nb=100))
|
||||
res, _ = collection_w.insert(cf.gen_default_list_data(nb=100))
|
||||
insert_ids.extend(res.primary_keys)
|
||||
nq = 5
|
||||
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
|
||||
# verify search succ
|
||||
|
@ -482,7 +494,9 @@ class TestTransferNode(TestcaseBase):
|
|||
target=rg_name,
|
||||
num_node=1)
|
||||
if with_growing:
|
||||
collection_w.insert(cf.gen_default_list_data(nb=100))
|
||||
res, _ = collection_w.insert(cf.gen_default_list_data(nb=100))
|
||||
insert_ids.extend(res.primary_keys)
|
||||
|
||||
# verify rg state
|
||||
rg_info = {"name": rg_name,
|
||||
"capacity": 1,
|
||||
|
@ -521,7 +535,9 @@ class TestTransferNode(TestcaseBase):
|
|||
target=ct.default_resource_group_name,
|
||||
num_node=1)
|
||||
if with_growing:
|
||||
collection_w.insert(cf.gen_default_list_data(nb=100))
|
||||
res, _ = collection_w.insert(cf.gen_default_list_data(nb=100))
|
||||
insert_ids.extend(res.primary_keys)
|
||||
|
||||
# verify search keeps succ
|
||||
collection_w.search(vectors[:nq],
|
||||
ct.default_float_vec_field_name,
|
||||
|
@ -553,11 +569,56 @@ class TestTransferNode(TestcaseBase):
|
|||
self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name,
|
||||
check_task=CheckTasks.check_rg_property,
|
||||
check_items=default_rg_info)
|
||||
# clean
|
||||
collection_w.release()
|
||||
collection_w.drop()
|
||||
self.utility_wrap.drop_resource_group(name=rg_name)
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="issue #22224")
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.parametrize("num_node", [0, 99, -1, 0.5, True, "str"])
|
||||
def test_transfer_node_with_wrong_num_node(self, num_node):
|
||||
"""
|
||||
Method:
|
||||
1. create rgA
|
||||
2. transfer invalid number of nodes from default to rgA
|
||||
verify fail with error
|
||||
"""
|
||||
# create rgA
|
||||
rg_name = cf.gen_unique_str('rg')
|
||||
self.init_resource_group(rg_name)
|
||||
|
||||
# transfer replicas
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: 'failed to transfer node between resource group, err=nodes not enough'}
|
||||
self.utility_wrap.transfer_node(source=ct.default_resource_group_name,
|
||||
target=rg_name,
|
||||
num_node=num_node,
|
||||
check_task=CheckTasks.err_res,
|
||||
check_items=error)
|
||||
|
||||
@pytest.mark.skip(reason="issue #22224")
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.parametrize("num_replica", [0, 99, -1, 0.5, True, "str"])
|
||||
def test_transfer_replica_with_wrong_num_replica(self, num_replica):
|
||||
"""
|
||||
Method:
|
||||
1. prepare a collection with 1 replica loaded
|
||||
2. prepare rgA
|
||||
2. transfer invalid number of replicas from default to rgA
|
||||
verify fail with error
|
||||
"""
|
||||
# prepare a collection with 1 replica loaded
|
||||
collection_w, _, _, _, _ = self.init_collection_general(insert_data=True, nb=200)
|
||||
# create rgA
|
||||
rg_name = cf.gen_unique_str('rg')
|
||||
self.init_resource_group(rg_name)
|
||||
|
||||
# transfer replicas
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: 'failed to transfer replica between resource group, err=NotHealthy'}
|
||||
self.utility_wrap.transfer_replica(source=ct.default_resource_group_name,
|
||||
target=rg_name,
|
||||
collection_name=collection_w.name,
|
||||
num_replica=num_replica,
|
||||
check_task=CheckTasks.err_res,
|
||||
check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
def test_load_collection_with_no_available_node(self):
|
||||
|
@ -575,8 +636,8 @@ class TestTransferNode(TestcaseBase):
|
|||
7. transfer query node from rgA to default rg
|
||||
verify search keeps succ
|
||||
verify rgA and default rg state
|
||||
8. load the collection with default rg
|
||||
verify fail for already loaded in other rg
|
||||
8. load the collection without rg specified
|
||||
verify load succ
|
||||
"""
|
||||
self._connect()
|
||||
dim = ct.default_dim
|
||||
|
@ -669,9 +730,7 @@ class TestTransferNode(TestcaseBase):
|
|||
check_items=default_rg_info)
|
||||
|
||||
# 8. load the collection with default rg
|
||||
# error = {ct.err_code: 5,
|
||||
# ct.err_msg: 'failed to load, err=already loaded in the other rg'}
|
||||
# collection_w.load(_resource_groups=[rg_name], check_task=CheckTasks.err_res, check_items=error)
|
||||
collection_w.load()
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.parametrize("replicas", [1, 3])
|
||||
|
@ -681,8 +740,8 @@ class TestTransferNode(TestcaseBase):
|
|||
1. create a collection and insert data
|
||||
2. release if loaded
|
||||
3. create rgA and rgB
|
||||
4. load collection 1,2,3 with rgA and rgB
|
||||
5. load collection 1,2,3 with default rg and rgA
|
||||
4. load collection with different replicas into rgA and rgB
|
||||
5. load collection with different replicas into default rg and rgA
|
||||
"""
|
||||
self._connect()
|
||||
# 1. create a collectionA and insert data
|
||||
|
@ -716,8 +775,8 @@ class TestTransferNode(TestcaseBase):
|
|||
"""
|
||||
collection_w = self.init_collection_wrap()
|
||||
collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index)
|
||||
error = {ct.err_code: 1,
|
||||
ct.err_msg: "failed to load collection, err=failed to spawn replica for collection[resource group doesn't exist]"}
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: "failed to load collection, err=failed to spawn replica for collection"}
|
||||
collection_w.load(_resource_groups=rg_names,
|
||||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
|
@ -736,8 +795,8 @@ class TestTransferNode(TestcaseBase):
|
|||
7. transfer query node from rgA to default rg
|
||||
verify search keeps succ
|
||||
verify rgA and default rg state
|
||||
8. load the partition with default rg
|
||||
verify fail for already loaded in other rg
|
||||
8. load the partition without rg specified
|
||||
verify load succ
|
||||
"""
|
||||
self._connect()
|
||||
dim = ct.default_dim
|
||||
|
@ -822,10 +881,8 @@ class TestTransferNode(TestcaseBase):
|
|||
check_task=CheckTasks.check_rg_property,
|
||||
check_items=default_rg_info)
|
||||
|
||||
# 8. load the collection with default rg
|
||||
# error = {ct.err_code: 999,
|
||||
# ct.err_msg: 'failed to load, err=already loaded in the other rg'}
|
||||
# partition_w.load(check_task=CheckTasks.err_res, check_items=error)
|
||||
# 8. load the collection without rg specified
|
||||
partition_w.load()
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.parametrize("replicas", [1, 3])
|
||||
|
@ -834,8 +891,8 @@ class TestTransferNode(TestcaseBase):
|
|||
Method:
|
||||
1. create a partition and insert data
|
||||
3. create rgA and rgB
|
||||
4. load collection 1,2,3 with rgA and rgB
|
||||
5. load collection 1,2,3 with default rg and rgA
|
||||
4. load partition with different replicas into rgA and rgB
|
||||
5. load partition with different replicas into default rg and rgA
|
||||
"""
|
||||
self._connect()
|
||||
dim = ct.default_dim
|
||||
|
@ -864,7 +921,7 @@ class TestTransferNode(TestcaseBase):
|
|||
check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.parametrize("rg_names", [[""], ["non_existing"], "不合法"])
|
||||
@pytest.mark.parametrize("rg_names", [[""], ["non_existing"], "不合法", [ct.default_resource_group_name, None]])
|
||||
def test_load_partition_with_empty_rg_name(self, rg_names):
|
||||
"""
|
||||
Method:
|
||||
|
@ -872,10 +929,9 @@ class TestTransferNode(TestcaseBase):
|
|||
2. load with empty rg name
|
||||
"""
|
||||
self._connect()
|
||||
dim = ct.default_dim
|
||||
# 1. create a partition
|
||||
collection_w = self.init_collection_wrap()
|
||||
collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index)
|
||||
collection_w.create_index(ct.default_float_vec_field_name, ct.default_index)
|
||||
partition_name = cf.gen_unique_str('par')
|
||||
partition_w = self.init_partition_wrap(collection_w, partition_name)
|
||||
|
||||
|
@ -888,6 +944,20 @@ class TestTransferNode(TestcaseBase):
|
|||
@pytest.mark.skip(reason="need 8 query nodes for this test")
|
||||
# run the multi node tests with 8 query nodes
|
||||
class TestResourceGroupMultiNodes(TestcaseBase):
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.parametrize("with_growing", [True, False])
|
||||
def test_load_with_multi_collections_one_rg(self, with_growing):
|
||||
"""
|
||||
Method:
|
||||
1. prepare collectionA and collectionB
|
||||
2. prepare one rg with 4 nodes
|
||||
3. load collectionA with 1 replica into the rg
|
||||
4. load collectionB with 3 replica into the rg
|
||||
verify the rg state
|
||||
verify the replicas state for both collections
|
||||
"""
|
||||
pass
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.parametrize("with_growing", [True, False])
|
||||
def test_load_with_replicas_and_nodes_num(self, with_growing):
|
||||
|
@ -913,7 +983,7 @@ class TestResourceGroupMultiNodes(TestcaseBase):
|
|||
res, _ = collection_w.insert(cf.gen_default_list_data(nb=nb, dim=dim, start=i*nb))
|
||||
collection_w.flush()
|
||||
insert_ids.extend(res.primary_keys)
|
||||
collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index)
|
||||
collection_w.create_index(ct.default_float_vec_field_name, ct.default_index)
|
||||
|
||||
# make growing
|
||||
if with_growing:
|
||||
|
@ -942,7 +1012,7 @@ class TestResourceGroupMultiNodes(TestcaseBase):
|
|||
# load 5 replicas in rgA and verify error msg
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'}
|
||||
collection_w.load(replica_number=5,
|
||||
collection_w.load(replica_number=num_nodes_to_rg+1,
|
||||
_resource_groups=[rg_name],
|
||||
check_task=ct.CheckTasks.err_res,
|
||||
check_items=error)
|
||||
|
@ -1053,11 +1123,6 @@ class TestResourceGroupMultiNodes(TestcaseBase):
|
|||
"ids": insert_ids.copy(),
|
||||
"limit": ct.default_limit}
|
||||
)
|
||||
collection_w.release()
|
||||
collection_w.drop()
|
||||
self.utility_wrap.transfer_node(source=rg_name,
|
||||
target=ct.default_resource_group_name,
|
||||
num_node=num_nodes_to_rg)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.parametrize("with_growing", [True, False])
|
||||
|
@ -1116,8 +1181,6 @@ class TestResourceGroupMultiNodes(TestcaseBase):
|
|||
|
||||
# load 1 replica in rgA and rgB
|
||||
replica_number = 1
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[resource group num can only be 0, 1 or same as replica number]'}
|
||||
collection_w.load(replica_number=replica_number,
|
||||
_resource_groups=[rgA_name, rgB_name],
|
||||
check_task=CheckTasks.err_res,
|
||||
|
@ -1138,6 +1201,10 @@ class TestResourceGroupMultiNodes(TestcaseBase):
|
|||
else:
|
||||
assert len(rep.group_nodes) == num_nodes_rgB # one replica for each rg
|
||||
|
||||
# verify get segment info return not empty
|
||||
segments_info = self.utility_wrap.get_query_segment_info(collection_name=collection_w.name)[0]
|
||||
assert len(segments_info) > 0
|
||||
|
||||
# make growing
|
||||
if with_growing:
|
||||
res, _ = collection_w.insert(cf.gen_default_list_data(nb=200, dim=dim, start=8 * nb))
|
||||
|
@ -1166,15 +1233,8 @@ class TestResourceGroupMultiNodes(TestcaseBase):
|
|||
"ids": insert_ids.copy(),
|
||||
"limit": ct.default_limit}
|
||||
)
|
||||
collection_w.release()
|
||||
collection_w.drop()
|
||||
self.utility_wrap.transfer_node(source=rgA_name,
|
||||
target=ct.default_resource_group_name,
|
||||
num_node=num_nodes_rgA)
|
||||
self.utility_wrap.transfer_node(source=rgB_name,
|
||||
target=ct.default_resource_group_name,
|
||||
num_node=num_nodes_rgB)
|
||||
|
||||
@pytest.mark.xfail(reason='issue #22217')
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.parametrize("with_growing", [True, False])
|
||||
def test_transfer_replica_into_same_rg(self, with_growing):
|
||||
|
@ -1182,12 +1242,16 @@ class TestResourceGroupMultiNodes(TestcaseBase):
|
|||
Method:
|
||||
1. prepare a collection with multi segments
|
||||
2. create rgA and rgB
|
||||
3. transfer 1 nodes to rgA and 2 nodes to rgB
|
||||
3. transfer 1 node to rgA and 2 nodes to rgB
|
||||
4. load 2 replicas in rgA and rgB
|
||||
5. transfer 1 replica from rgB to rgA
|
||||
verify fail ?
|
||||
verify fail with error
|
||||
6. transfer 1 replica from rgA to rgB
|
||||
verify succ?
|
||||
verify fail with error
|
||||
7. release the collection
|
||||
8. reload the collection with 2 replicas in default rg and rgA
|
||||
9. transfer 1 replica from rgA to default rg
|
||||
verify fail with error
|
||||
"""
|
||||
self._connect()
|
||||
rgA_name = cf.gen_unique_str('rgA')
|
||||
|
@ -1235,23 +1299,38 @@ class TestResourceGroupMultiNodes(TestcaseBase):
|
|||
|
||||
# transfer 1 replica from rgB to rgA
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: 'dynamically increase replica num is unsupported'}
|
||||
ct.err_msg: f'found [1] replicas of same collection in target resource group[{rgA_name}], dynamically increase replica num is unsupported'}
|
||||
self.utility_wrap.transfer_replica(source=rgB_name, target=rgA_name,
|
||||
collection_name=collection_w.name, num_replica=1,
|
||||
check_task=CheckTasks.err_res,
|
||||
check_items=error)
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: 'dynamically increase replica num is unsupported'}
|
||||
# transfer 1 replica from rgA to rgB
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: f'found [1] replicas of same collection in target resource group[{rgB_name}], dynamically increase replica num is unsupported'}
|
||||
self.utility_wrap.transfer_replica(source=rgA_name, target=rgB_name,
|
||||
collection_name=collection_w.name, num_replica=1,
|
||||
check_task=CheckTasks.err_res,
|
||||
check_items=error)
|
||||
|
||||
# release the collection
|
||||
collection_w.release()
|
||||
# reload the collection into default rg and rgA
|
||||
# TODO: not able to load replicas into default rg with customized rgs
|
||||
collection_w.load(replica_number=2, _resource_groups=[rgA_name, ct.default_resource_group_name])
|
||||
|
||||
# make growing
|
||||
if with_growing:
|
||||
res, _ = collection_w.insert(cf.gen_default_list_data(nb, dim=dim, start=7 * nb))
|
||||
insert_ids.extend(res.primary_keys)
|
||||
|
||||
# transfer 1 replica from rgA to default rg
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: f'found [1] replicas of same collection in target resource group[{ct.default_resource_group_name}], dynamically increase replica num is unsupported'}
|
||||
self.utility_wrap.transfer_replica(source=rgA_name, target=ct.default_resource_group_name,
|
||||
collection_name=collection_w.name, num_replica=1,
|
||||
check_task=CheckTasks.err_res,
|
||||
check_items=error)
|
||||
|
||||
# verify search succ
|
||||
nq = 5
|
||||
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
|
||||
|
@ -1264,18 +1343,12 @@ class TestResourceGroupMultiNodes(TestcaseBase):
|
|||
"ids": insert_ids.copy(),
|
||||
"limit": ct.default_limit}
|
||||
)
|
||||
self.utility_wrap.transfer_node(source=rgA_name,
|
||||
target=ct.default_resource_group_name,
|
||||
num_node=1)
|
||||
self.utility_wrap.transfer_node(source=rgB_name,
|
||||
target=ct.default_resource_group_name,
|
||||
num_node=2)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
def test_transfer_replica_not_enough_replicas_to_transfer(self):
|
||||
"""
|
||||
Method:
|
||||
1. prepare a collection with multi segments
|
||||
1. prepare a collection with 1 replica loaded
|
||||
2. create rgA
|
||||
3. transfer 3 nodes to rgA
|
||||
4. transfer 2 replicas to rgA
|
||||
|
@ -1285,7 +1358,6 @@ class TestResourceGroupMultiNodes(TestcaseBase):
|
|||
dim = ct.default_dim
|
||||
# create a collectionA and insert data
|
||||
collection_w, _, _, insert_ids, _ = self.init_collection_general(insert_data=True, dim=dim)
|
||||
|
||||
# create a rgA
|
||||
rg_name = cf.gen_unique_str('rg')
|
||||
self.init_resource_group(rg_name)
|
||||
|
@ -1296,16 +1368,11 @@ class TestResourceGroupMultiNodes(TestcaseBase):
|
|||
|
||||
# transfer 2 replicas to rgA
|
||||
error = {ct.err_code: 5,
|
||||
ct.err_msg: 'only found [1] replicas in source resource group[__default_resource_group]'}
|
||||
ct.err_msg: f'only found [1] replicas in source resource group[{ct.default_resource_group_name}]'}
|
||||
self.utility_wrap.transfer_replica(source=ct.default_resource_group_name, target=rg_name,
|
||||
collection_name=collection_w.name, num_replica=2,
|
||||
check_task=CheckTasks.err_res,
|
||||
check_items=error)
|
||||
collection_w.release()
|
||||
collection_w.drop()
|
||||
self.utility_wrap.transfer_node(source=rg_name,
|
||||
target=ct.default_resource_group_name,
|
||||
num_node=3)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
def test_transfer_replica_non_existing_rg(self):
|
||||
|
@ -1322,7 +1389,7 @@ class TestResourceGroupMultiNodes(TestcaseBase):
|
|||
# transfer replica to a non_existing rg
|
||||
rg_name = "non_existing"
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: "the target resource group[non_existing] doesn't exist, err=resource group doesn't exist"}
|
||||
ct.err_msg: f"the target resource group[{rg_name}] doesn't exist, err=resource group doesn't exist"}
|
||||
self.utility_wrap.transfer_replica(source=ct.default_resource_group_name,
|
||||
target=rg_name,
|
||||
collection_name=collection_w.name,
|
||||
|
@ -1332,10 +1399,97 @@ class TestResourceGroupMultiNodes(TestcaseBase):
|
|||
|
||||
# transfer replica from a non_existing rg
|
||||
error = {ct.err_code: 999,
|
||||
ct.err_msg: "the source resource group[non_existing] doesn't exist, err=resource group doesn't exist"}
|
||||
ct.err_msg: f"the source resource group[{rg_name}] doesn't exist, err=resource group doesn't exist"}
|
||||
self.utility_wrap.transfer_replica(source=rg_name,
|
||||
target=ct.default_resource_group_name,
|
||||
collection_name=collection_w.name,
|
||||
num_replica=1,
|
||||
check_task=CheckTasks.err_res,
|
||||
check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
@pytest.mark.parametrize("with_growing", [True, False])
|
||||
def test_transfer_node_(self, with_growing):
|
||||
"""
|
||||
Method:
|
||||
1. prepare a collection
|
||||
2. prepare rgA and rgB, both with 1 node
|
||||
3. load 2 replicas into rgA and rgB
|
||||
verify search succ, replica info, rg state
|
||||
4. transfer the node from rgA to rgB
|
||||
verify search succ, replica info, rg state
|
||||
"""
|
||||
dim = 128
|
||||
collection_w = self.init_collection_wrap(shards_num=4)
|
||||
insert_ids = []
|
||||
nb = 500
|
||||
for i in range(5):
|
||||
res, _ = collection_w.insert(cf.gen_default_list_data(nb=nb, dim=dim, start=i * nb))
|
||||
collection_w.flush()
|
||||
insert_ids.extend(res.primary_keys)
|
||||
collection_w.create_index(ct.default_float_vec_field_name, ct.default_index)
|
||||
|
||||
rgA_name = cf.gen_unique_str('rgA')
|
||||
self.init_resource_group(rgA_name)
|
||||
self.utility_wrap.transfer_node(source=ct.default_resource_group_name,
|
||||
target=rgA_name,
|
||||
num_node=1)
|
||||
rgB_name = cf.gen_unique_str('rgB')
|
||||
self.init_resource_group(rgB_name)
|
||||
self.utility_wrap.transfer_node(source=ct.default_resource_group_name,
|
||||
target=rgB_name,
|
||||
num_node=1)
|
||||
# make growing
|
||||
if with_growing:
|
||||
res, _ = collection_w.insert(cf.gen_default_list_data(nb, dim=dim, start=6 * nb))
|
||||
insert_ids.extend(res.primary_keys)
|
||||
|
||||
# load 2 replicas into rgA and rgB
|
||||
collection_w.load(replica_number=2, _resource_groups=[rgA_name, rgB_name])
|
||||
|
||||
replicas = collection_w.get_replicas()
|
||||
assert len(replicas[0].groups) == 2
|
||||
for rep in replicas[0].groups:
|
||||
assert rep.resource_group in [rgA_name, rgB_name]
|
||||
assert rep.num_outbound_node == {}
|
||||
assert len(rep.group_nodes) == 1 # one replica for each node
|
||||
|
||||
# TODO: verify rg state
|
||||
|
||||
nq = 5
|
||||
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
|
||||
# verify search succ
|
||||
collection_w.search(vectors[:nq],
|
||||
ct.default_float_vec_field_name,
|
||||
ct.default_search_params,
|
||||
ct.default_limit,
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": nq,
|
||||
"ids": insert_ids.copy(),
|
||||
"limit": ct.default_limit}
|
||||
)
|
||||
|
||||
# transfer node from rgA to rgB
|
||||
self.utility_wrap.transfer_node(source=rgA_name, target=rgB_name, num_node=1)
|
||||
|
||||
# TODO: verify rg state
|
||||
# TODO: verify replica state
|
||||
# replicas = collection_w.get_replicas()
|
||||
# assert len(replicas[0].groups) == 2
|
||||
# for rep in replicas[0].groups:
|
||||
# assert rep.resource_group in [rgA_name, rgB_name]
|
||||
# assert rep.num_outbound_node == {}
|
||||
# assert len(rep.group_nodes) == 1 # one replica for each node
|
||||
|
||||
nq = 5
|
||||
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
|
||||
# verify search succ
|
||||
collection_w.search(vectors[:nq],
|
||||
ct.default_float_vec_field_name,
|
||||
ct.default_search_params,
|
||||
ct.default_limit,
|
||||
check_task=CheckTasks.check_search_results,
|
||||
check_items={"nq": nq,
|
||||
"ids": insert_ids.copy(),
|
||||
"limit": ct.default_limit}
|
||||
)
|
Loading…
Reference in New Issue