milvus/tests/python_client/testcases/test_connection.py

906 lines
43 KiB
Python
Raw Normal View History

import pytest
import concurrent.futures
from pymilvus import DefaultConfig, Milvus
from base.client_base import TestcaseBase
from utils.utils import get_milvus, gen_invalid_ips, gen_invalid_ints, gen_invalid_uris
import common.common_type as ct
import common.common_func as cf
from common.code_mapping import ConnectionErrorMessage as cem
CONNECT_TIMEOUT = 12
class TestConnectionParams(TestcaseBase):
"""
Test case of connections interface
The author Ting.Wang
"""
@pytest.mark.tags(ct.CaseLabel.L2)
@pytest.mark.parametrize("data", ct.get_dict_without_host_port)
def test_connection_add_connection_kwargs_param_check(self, data):
"""
target: test **kwargs of add_connection
method: passing wrong parameters of **kwargs
expected: assert response is error
"""
# check param of **kwargs
self.connection_wrap.add_connection(_kwargs=data, check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0, ct.err_msg: cem.NoHostPort})
# get addr of default alias
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {'host': 'localhost', 'port': '19530'}})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING, None)]})
@pytest.mark.tags(ct.CaseLabel.L1)
def test_connection_connect_kwargs_param_check(self):
"""
target: test **kwargs of connect
method: passing wrong parameters of **kwargs
expected: assert response is error
"""
# get addr of default alias
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING)
# No check for **kwargs
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, _kwargs=[1, 2],
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0, ct.err_msg: cem.NoHostPort})
@pytest.mark.tags(ct.CaseLabel.L2)
@pytest.mark.parametrize("alias", ct.get_not_string)
def test_connection_connect_alias_param_check(self, alias):
"""
target: test connect passes wrong params of alias
method: connect passes wrong params of alias
expected: assert response is error
"""
# check for alias
self.connection_wrap.connect(alias=alias, check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0, ct.err_msg: cem.AliasType % type(alias)})
@pytest.mark.tags(ct.CaseLabel.L2)
@pytest.mark.parametrize("alias", ct.get_not_string)
def test_connection_get_alias_param_check(self, alias):
"""
target: test get connection passes wrong params of alias
method: get connection passes wrong params of alias
expected: assert response is error
"""
# check for alias
self.connection_wrap.get_connection(alias=alias, check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0, ct.err_msg: cem.AliasType % type(alias)})
@pytest.mark.tags(ct.CaseLabel.L2)
@pytest.mark.parametrize("alias", ct.get_not_string)
def test_connection_get_addr_alias_param_check(self, alias):
"""
target: test get connection addr passes wrong params of alias
method: get connection addr passes wrong params of alias
expected: assert response is error
"""
# check for alias
self.connection_wrap.get_connection_addr(alias=alias, check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0, ct.err_msg: cem.AliasType % type(alias)})
@pytest.mark.tags(ct.CaseLabel.L2)
@pytest.mark.parametrize("alias", ct.get_not_string)
def test_connection_remove_alias_param_check(self, alias):
"""
target: test remove connection passes wrong params of alias
method: remove connection passes wrong params of alias
expected: assert response is error
"""
# check for alias
self._connect()
self.connection_wrap.remove_connection(alias=alias, check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0, ct.err_msg: cem.AliasType % type(alias)})
@pytest.mark.tags(ct.CaseLabel.L2)
@pytest.mark.parametrize("alias", ct.get_not_string)
def test_connection_disconnect_alias_param_check(self, alias):
"""
target: test disconnect passes wrong params of alias
method: disconnect passes wrong params of alias
expected: assert response is error
"""
# check for alias
self._connect()
self.connection_wrap.disconnect(alias=alias, check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0, ct.err_msg: cem.AliasType % type(alias)})
class TestConnectionOperation(TestcaseBase):
"""
Test case of connections interface
The author Ting.Wang
"""
@pytest.mark.tags(ct.CaseLabel.L1)
@pytest.mark.parametrize("data, err_msg", [(ct.get_wrong_format_dict[0], cem.PortType),
(ct.get_wrong_format_dict[1], cem.HostType)])
def test_connection_add_wrong_format(self, data, err_msg):
"""
target: test add_connection, regardless of whether the connection exists
method: add existing and non-existing configurations at the same time
expected: list_connections include the configured connections
"""
# add connections
self.connection_wrap.add_connection(alias1={"host": "localhost", "port": "1"},
alias2={"port": "-1", "host": "hostlocal"},
testing=data,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0, ct.err_msg: err_msg})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING, None),
('alias1', None), ('alias2', None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {'host': 'localhost', 'port': '19530'}})
self.connection_wrap.get_connection_addr(alias="alias1", check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {"host": "localhost", "port": "1"}})
self.connection_wrap.get_connection_addr(alias="alias2", check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {"host": "hostlocal", "port": "-1"}})
self.connection_wrap.get_connection_addr(alias="testing", check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {}})
@pytest.mark.tags(ct.CaseLabel.L1)
def test_connection_add_more(self):
"""
target: test add_connection passes in multiple parameters
method: add two params of add_connection
expected: added to the connection list successfully
"""
# add connections
self.connection_wrap.add_connection(alias1={"host": "localhost", "port": "1"},
alias2={"host": "192.168.1.1", "port": "123"})
# get the object of alias
self.connection_wrap.get_connection(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr,
check_items={ct.value_content: None})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING, None),
('alias1', None), ('alias2', None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {'host': 'localhost', 'port': '19530'}})
self.connection_wrap.get_connection_addr(alias="alias1", check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {"host": "localhost", "port": "1"}})
self.connection_wrap.get_connection_addr(alias="alias2", check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {"host": "192.168.1.1", "port": "123"}})
@pytest.mark.tags(ct.CaseLabel.L1)
def test_connection_add_single_more(self):
"""
target: test add connections separately
method: add_connection twice
expected: added to the connection list successfully
"""
# add connections
self.connection_wrap.add_connection(alias1={"host": "localhost", "port": "1"})
self.connection_wrap.add_connection(alias2={"host": "192.168.1.1", "port": "123"})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING, None),
('alias1', None), ('alias2', None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {'host': 'localhost', 'port': '19530'}})
self.connection_wrap.get_connection_addr(alias="alias1", check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {"host": "localhost", "port": "1"}})
self.connection_wrap.get_connection_addr(alias="alias2", check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {"host": "192.168.1.1", "port": "123"}})
@pytest.mark.tags(ct.CaseLabel.L0)
def test_connection_add_default(self):
"""
target: add_connection passes default params successfully
method: add_connection passes default params
expected: response of add_connection is normal
"""
# add connections
self.connection_wrap.add_connection(default={'host': 'localhost', 'port': '19530'})
self.connection_wrap.add_connection(default={'port': '19530', 'host': 'localhost'})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING, None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {'host': 'localhost', 'port': '19530'}})
@pytest.mark.tags(ct.CaseLabel.L1)
def test_connection_add_cover_default(self):
"""
target: add a connection to override the default connection
method: add_connection passes alias of default and different configure
expected: the configuration was successfully overwritten
"""
# get all addr of default alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {'host': 'localhost', 'port': '19530'}})
# add connections
self.connection_wrap.add_connection(default={'host': '192.168.1.1', 'port': '12345'})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING, None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {'host': '192.168.1.1',
'port': '12345'}})
@pytest.mark.tags(ct.CaseLabel.L1)
def test_connection_get_addr_not_exist(self):
"""
target: get addr of alias that is not exist and return {}
method: get_connection_addr passes alias that is not exist
expected: response of get_connection_addr is None
"""
# get an addr that not exist and return {}
self.connection_wrap.get_connection_addr(alias=ct.Not_Exist, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {}})
@pytest.mark.skip("The maximum number of add_connection is not set")
@pytest.mark.tags(ct.CaseLabel.L2)
def test_connection_add_max(self):
"""
The maximum number of add_connection is not set
"""
pass
@pytest.mark.tags(ct.CaseLabel.L1)
def test_connection_add_after_connect(self, host, port):
"""
target: add_connection with different params after normal connect
method: then add_connection with different params
expected: add_connection failed
"""
# create connection that param of alias is not exist
self.connection_wrap.connect(alias="test_alias_name", host=host, port=port, check_task=ct.CheckTasks.ccr)
# add connection with diff params after that alias has been created
err_msg = cem.ConnDiffConf % "test_alias_name"
self.connection_wrap.add_connection(test_alias_name={"host": "localhost", "port": "1"},
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0, ct.err_msg: err_msg})
# add connection with the same params
self.connection_wrap.add_connection(test_alias_name={"host": host, "port": port})
@pytest.mark.tags(ct.CaseLabel.L1)
def test_connection_add_after_default_connect(self, host, port):
"""
target: add_connection with different params after default alias connected
method: 1. connect with default alias
2. add_connection with the same alias but different params
expected: add_connection failed
"""
# create connection that param of alias is default
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, host=host, port=port,
check_task=ct.CheckTasks.ccr)
# add connection after that alias has been created
err_msg = cem.ConnDiffConf % DefaultConfig.DEFAULT_USING
self.connection_wrap.add_connection(default={"host": "localhost", "port": "1"},
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0, ct.err_msg: err_msg})
# add connection with the same params
self.connection_wrap.add_connection(test_alias_name={"host": host, "port": port})
@pytest.mark.tags(ct.CaseLabel.L1)
def test_connection_add_after_disconnect(self, host, port):
"""
target: update connection params after connection disconnected
method: 1. connect and disconnect a connection
2. re-add connection by the same alias with different connection params
expected: re-add_connection successfully with new params
"""
# add a new connection and connect
self.connection_wrap.connect(alias="test_alias_name", host=host, port=port, check_task=ct.CheckTasks.ccr)
# disconnect the connection
self.connection_wrap.disconnect(alias="test_alias_name")
# get the connection address after it disconnected
self.connection_wrap.get_connection_addr(alias="test_alias_name", check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {"host": host, "port": port}})
# re-add connection by the same alias with different connection params
self.connection_wrap.add_connection(test_alias_name={"host": "localhost", "port": "1"})
# re-get the connection address
self.connection_wrap.get_connection_addr(alias="test_alias_name", check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {"host": "localhost", "port": "1"}})
@pytest.mark.tags(ct.CaseLabel.L1)
def test_connection_add_after_remove(self, host, port):
"""
target: add_connection after normal connectremove_connection
method: 1. connect and remove_connection
2. add connection by the same alias with different params
expected: add_connection by the same alias with different params successfully
"""
# create connection that param of alias is not exist
self.connection_wrap.connect(alias="test_alias_name", host=host, port=port, check_task=ct.CheckTasks.ccr)
# disconnect alias is exist
self.connection_wrap.remove_connection(alias="test_alias_name")
# get an addr that is not exist
self.connection_wrap.get_connection_addr(alias="test_alias_name", check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {}})
# add connection after that alias has been disconnected
self.connection_wrap.add_connection(test_alias_name={"host": "localhost", "port": "1"})
# get an addr that is exist
self.connection_wrap.get_connection_addr(alias="test_alias_name", check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {"host": "localhost", "port": "1"}})
@pytest.mark.tags(ct.CaseLabel.L1)
def test_connection_connect_alias_not_exist(self):
"""
target: connect with a non existing alias and raise error
method: connect with a non existing alias
expected: response of connect is error
"""
# create connection that param of alias is not exist
err_msg = cem.ConnLackConf % ct.Not_Exist
self.connection_wrap.connect(alias=ct.Not_Exist, check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0, ct.err_msg: err_msg})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING, None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {'host': "localhost", 'port': "19530"}})
@pytest.mark.tags(ct.CaseLabel.L1)
def test_connection_connect_default_alias_invalid(self, port):
"""
target: connect with non existing params
method: 1. add connection with non existing params
2. try to connect
expected: raise an exception
"""
# add invalid default connection
self.connection_wrap.add_connection(default={'host': "host", 'port': port})
# using default alias to create connection, the connection does not exist
err_msg = cem.FailConnect % ("host", str(port))
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: -1, ct.err_msg: err_msg})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING, None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {'host': "host", 'port': port}})
@pytest.mark.tags(ct.CaseLabel.L0)
def test_connection_connect_default_alias_effective(self, host, port):
"""
target: verify connections by default alias
method: 1. add connection with default alias
2. connect with default alias
3. list connections and get connection address
expected: 1. add connection, connect, list and get connection address successfully
"""
# add a valid default connection
self.connection_wrap.add_connection(default={'host': host, 'port': port})
# successfully created default connection
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr)
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING,
ct.Connect_Object_Name)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {'host': host, 'port': port}})
@pytest.mark.tags(ct.CaseLabel.L1)
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING])
def test_connection_connect_repeat(self, host, port, connect_name):
"""
target: connect twice and return the same object
method: connect twice
expected: return the same object of connect
"""
# add a valid default connection
self.connection_wrap.add_connection(default={'host': host, 'port': port})
# successfully created default connection
self.connection_wrap.connect(alias=connect_name, check_task=ct.CheckTasks.ccr)
# get the object of alias
res_obj1 = self.connection_wrap.get_connection(alias=connect_name, check_task=ct.CheckTasks.ccr,
check_items={ct.value_content: ct.Connect_Object_Name})[0]
# connect twice with the same params
self.connection_wrap.connect(alias=connect_name, host=host, port=port, check_task=ct.CheckTasks.ccr)
# get the object of alias
res_obj2 = self.connection_wrap.get_connection(alias=connect_name, check_task=ct.CheckTasks.ccr,
check_items={ct.value_content: ct.Connect_Object_Name})[0]
# check the response of the same alias is equal
assert res_obj1 == res_obj2
# connect twice with the different params
err_msg = cem.ConnDiffConf % "default"
self.connection_wrap.connect(alias=connect_name, host="host", port=port,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0, ct.err_msg: err_msg})
@pytest.mark.tags(ct.CaseLabel.L2)
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING, "test_alias_nme"])
def test_connection_connect_params(self, host, port, connect_name):
"""
target: connect directly via parameters and return the object of connect successfully
method: connect directly via parameters
expected: response of connect is Milvus object
"""
# successfully created default connection
self.connection_wrap.connect(alias=connect_name, host=host, port=port, check_task=ct.CheckTasks.ccr)
# get the object of alias
self.connection_wrap.get_connection(alias=connect_name, check_task=ct.CheckTasks.ccr,
check_items={ct.value_content: ct.Connect_Object_Name})
# list all connections and check the response
list_content = [(connect_name, ct.Connect_Object_Name)] if connect_name is DefaultConfig.DEFAULT_USING else \
[(DefaultConfig.DEFAULT_USING, None), (connect_name, ct.Connect_Object_Name)]
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: list_content})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=connect_name, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {'host': host, 'port': port}})
@pytest.mark.tags(ct.CaseLabel.L1)
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING, "test_alias_nme"])
def test_connection_connect_wrong_params(self, host, port, connect_name):
"""
target: connect directly via invalid parameters and raise error
method: connect directly via invalid parameters
expected: raise exception with error msg
"""
# created connection with wrong connect name
self.connection_wrap.connect(alias=connect_name, ip=host, port=port, check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0,
ct.err_msg: cem.NoHostPort})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING, None)]})
# get all addr of alias and check the response
dict_content = {'host': DefaultConfig.DEFAULT_HOST,
'port': DefaultConfig.DEFAULT_PORT} if connect_name == DefaultConfig.DEFAULT_USING else {}
self.connection_wrap.get_connection_addr(alias=connect_name, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: dict_content})
@pytest.mark.tags(ct.CaseLabel.L2)
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING, ct.Not_Exist])
def test_connection_disconnect_not_exist(self, connect_name):
"""
target: disconnect with non existing alias
method: disconnect with non existing alias
expected: check connection list is normal
"""
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING, None)]})
# disconnect alias is not exist
self.connection_wrap.disconnect(alias=connect_name)
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING, None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING,
check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {"host": "localhost", "port": "19530"}})
@pytest.mark.tags(ct.CaseLabel.L1)
def test_connection_disconnect_after_default_connect(self, host, port):
"""
target: check results after disconnect with default alias
method: 1. connect with default alias
2. get connection
3. disconnect with default alias
4. get connection
5. disconnect again
6. list connections and get connection address
expected: the connection was successfully terminated
"""
# add a valid default connection
self.connection_wrap.add_connection(default={'host': host, 'port': port})
# successfully created default connection
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr)
# get the object of alias
self.connection_wrap.get_connection(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr,
check_items={ct.value_content: ct.Connect_Object_Name})
# disconnect alias is exist
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
# get the object of alias
self.connection_wrap.get_connection(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr,
check_items={ct.value_content: None})
# disconnect twice
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING, None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {'host': host, 'port': port}})
@pytest.mark.tags(ct.CaseLabel.L1)
def test_connection_disconnect_after_connect(self, host, port):
"""
target: disconnect with customized alias and check results
method: 1. connect with customized alias
2. disconnect with the alias
3. list connections and get connection address
expected: the connection was successfully terminated
"""
test_alias_name = "test_alias_name"
# add a valid default connection
self.connection_wrap.add_connection(test_alias_name={'host': host, 'port': port})
# successfully created default connection
self.connection_wrap.connect(alias=test_alias_name, host=host, port=port, check_task=ct.CheckTasks.ccr)
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING, None),
(
test_alias_name, ct.Connect_Object_Name)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=test_alias_name, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {'host': host, 'port': port}})
# disconnect alias is exist
self.connection_wrap.disconnect(alias=test_alias_name)
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING, None),
(test_alias_name, None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=test_alias_name, check_task=ct.CheckTasks.ccr,
check_items={ct.dict_content: {'host': host, 'port': port}})
@pytest.mark.tags(ct.CaseLabel.L1)
def test_connection_remove_connection_not_exist(self):
"""
target: remove connection that is not exist and check result
method: remove connection that is not exist
expected: connection list is normal
"""
# remove the connection that is not exist
self.connection_wrap.remove_connection(alias=ct.Not_Exist)
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr,
check_items={ct.list_content: [(DefaultConfig.DEFAULT_USING, None)]})
@pytest.mark.tags(ct.CaseLabel.L1)
def test_connection_remove_default_alias(self):
"""
target: remove default alias connect and check result
method: remove default alias connect
expected: list connection and return {}
"""
# remove the connection that is not exist
self.connection_wrap.remove_connection(alias=DefaultConfig.DEFAULT_USING)
# list all connections and check the response
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr, check_items={ct.list_content: []})
@pytest.mark.tags(ct.CaseLabel.L1)
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING, "test_alias_name"])
def test_connection_remove_after_connect(self, host, port, connect_name):
"""
target: remove connection after connect and check result
method: remove connection after connect
expected: addr is None, response of list_connection still included that configure
"""
# successfully created default connection
self.connection_wrap.connect(alias=connect_name, host=host, port=port, check_task=ct.CheckTasks.ccr)
# remove the connection that is not exist
self.connection_wrap.remove_connection(alias=connect_name)
# get the object of alias
self.connection_wrap.get_connection(alias=DefaultConfig.DEFAULT_USING, check_task=ct.CheckTasks.ccr,
check_items={ct.value_content: None})
# list all connections and check the response
list_content = [] if connect_name == DefaultConfig.DEFAULT_USING else [(DefaultConfig.DEFAULT_USING, None)]
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr, check_items={ct.list_content: list_content})
@pytest.mark.tags(ct.CaseLabel.L1)
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING, "test_alias_name"])
def test_connection_remove_after_disconnect(self, host, port, connect_name):
"""
target: remove connection after disconnect and check result
method: remove connection after disconnect
expected: response of list_connection not included that configure
"""
# successfully created default connection
self.connection_wrap.connect(alias=connect_name, host=host, port=port, check_task=ct.CheckTasks.ccr)
# disconnect alias is exist
self.connection_wrap.disconnect(alias=connect_name)
# remove connection
self.connection_wrap.remove_connection(alias=connect_name)
# remove twice connection
self.connection_wrap.remove_connection(alias=connect_name)
# list all connections and check the response
list_content = [] if connect_name == DefaultConfig.DEFAULT_USING else [(DefaultConfig.DEFAULT_USING, None)]
self.connection_wrap.list_connections(check_task=ct.CheckTasks.ccr, check_items={ct.list_content: list_content})
@pytest.mark.tags(ct.CaseLabel.L1)
# @pytest.mark.parametrize("collection_name, schema", [(cf.gen_unique_str('connection_test_'),
# cf.gen_default_collection_schema())])
def test_connection_init_collection_invalid_connection(self):
"""
target: create collection with invalid connection
method: init collection with invalid connection
expected: check result
"""
# init collection failed
collection_name = cf.gen_unique_str('connection_test_')
schema = cf.gen_default_collection_schema()
self.collection_wrap.init_collection(name=collection_name, schema=schema, check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0,
ct.err_msg: cem.ConnectFirst},
_using=ct.Not_Exist)
@pytest.mark.tags(ct.CaseLabel.L1)
# @pytest.mark.parametrize("collection_name, schema", [(cf.gen_unique_str('connection_test_'),
# cf.gen_default_collection_schema())])
def test_connection_init_collection_connection(self, host, port):
"""
target: create collection then disconnection
method: connection, init collection, then disconnection
expected: check result
"""
# successfully created default connection
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, host=host, port=port,
check_task=ct.CheckTasks.ccr)
# init collection successfully
collection_name = cf.gen_unique_str('connection_test_')
schema = cf.gen_default_collection_schema()
self.collection_wrap.init_collection(name=collection_name, schema=schema, _using=DefaultConfig.DEFAULT_USING)
# remove connection
self.connection_wrap.remove_connection(alias=DefaultConfig.DEFAULT_USING)
# drop collection failed
self.collection_wrap.drop(check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 0, ct.err_msg: "should create connect first"})
# successfully created default connection
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, host=host, port=port,
check_task=ct.CheckTasks.ccr)
# drop collection success
self.collection_wrap.drop()
class TestConnect:
def local_ip(self, args):
"""
check if ip is localhost or not
"""
if not args["ip"] or args["ip"] == 'localhost' or args["ip"] == "127.0.0.1":
return True
else:
return False
@pytest.mark.tags(ct.CaseLabel.L2)
def test_close_repeatedly(self, dis_connect, args):
"""
target: test disconnect repeatedly
method: disconnect a connected client, disconnect again
expected: raise an error after disconnected
"""
with pytest.raises(Exception) as e:
dis_connect.close()
@pytest.mark.tags(ct.CaseLabel.L2)
def test_connect_uri(self, args):
"""
target: test connect with correct uri
method: uri format and value are both correct
expected: connected is True
"""
uri_value = "tcp://%s:%s" % (args["ip"], args["port"])
milvus = get_milvus(args["ip"], args["port"], uri=uri_value, handler=args["handler"])
@pytest.mark.tags(ct.CaseLabel.L2)
def test_connect_uri_null(self, args):
"""
target: test connect with null uri
method: uri set null
expected: connected is True
"""
uri_value = ""
if self.local_ip(args):
milvus = get_milvus(None, None, uri=uri_value, handler=args["handler"])
else:
with pytest.raises(Exception) as e:
milvus = get_milvus(None, None, uri=uri_value, handler=args["handler"])
@pytest.mark.tags(ct.CaseLabel.L2)
def test_connect_with_multiprocess(self, args):
"""
target: test uri connect with multiprocess
method: set correct uri, test with multiprocessing connecting
expected: all connection is connected
"""
def connect():
milvus = get_milvus(args["ip"], args["port"], handler=args["handler"])
assert milvus
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
future_results = {executor.submit(
connect): i for i in range(100)}
for future in concurrent.futures.as_completed(future_results):
future.result()
@pytest.mark.tags(ct.CaseLabel.L2)
def test_connect_repeatedly(self, args):
"""
target: test connect repeatedly
method: connect again
expected: status.code is 0, and status.message shows have connected already
"""
uri_value = "tcp://%s:%s" % (args["ip"], args["port"])
milvus = Milvus(uri=uri_value, handler=args["handler"])
milvus = Milvus(uri=uri_value, handler=args["handler"])
class TestConnectIPInvalid(object):
"""
Test connect server with invalid ip
"""
@pytest.fixture(
scope="function",
params=gen_invalid_ips()
)
def get_invalid_ip(self, request):
yield request.param
@pytest.mark.tags(ct.CaseLabel.L2)
@pytest.mark.timeout(CONNECT_TIMEOUT)
def test_connect_with_invalid_ip(self, args, get_invalid_ip):
ip = get_invalid_ip
with pytest.raises(Exception) as e:
milvus = get_milvus(ip, args["port"], args["handler"])
class TestConnectPortInvalid(object):
"""
Test connect server with invalid ip
"""
@pytest.fixture(
scope="function",
params=gen_invalid_ints()
)
def get_invalid_port(self, request):
yield request.param
@pytest.mark.tags(ct.CaseLabel.L2)
@pytest.mark.timeout(CONNECT_TIMEOUT)
def test_connect_with_invalid_port(self, args, get_invalid_port):
"""
target: test ip:port connect with invalid port value
method: set port in gen_invalid_ports
expected: connected is False
"""
port = get_invalid_port
with pytest.raises(Exception) as e:
milvus = get_milvus(args["ip"], port, args["handler"])
class TestConnectURIInvalid(object):
"""
Test connect server with invalid uri
"""
@pytest.fixture(
scope="function",
params=gen_invalid_uris()
)
def get_invalid_uri(self, request):
yield request.param
@pytest.mark.tags(ct.CaseLabel.L2)
@pytest.mark.timeout(CONNECT_TIMEOUT)
def test_connect_with_invalid_uri(self, get_invalid_uri, args):
"""
target: test uri connect with invalid uri value
method: set port in gen_invalid_uris
expected: connected is False
"""
uri_value = get_invalid_uri
with pytest.raises(Exception) as e:
milvus = get_milvus(uri=uri_value, handler=args["handler"])