Add test cases of connection interface (#5732)

* [skip ci] Add test cases of connection interface

Signed-off-by: wangting0128 <ting.wang@zilliz.com>

* [skip ci] Update test code

Signed-off-by: wangting0128 <ting.wang@zilliz.com>

* [skip ci] Update test codes

Signed-off-by: wangting0128 <ting.wang@zilliz.com>
pull/5728/head^2
紫晴 2021-06-10 19:19:49 +08:00 committed by GitHub
parent 9f4bb832b7
commit faf5752096
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 740 additions and 229 deletions

View File

@ -1,5 +1,6 @@
import pytest
import sys
from pymilvus_orm.default_config import DefaultConfig
sys.path.append("..")
from base.connections_wrapper import ApiConnectionsWrapper
@ -72,6 +73,10 @@ class Base:
res = self.connection_wrap.list_connections()
for i in res[0]:
self.connection_wrap.remove_connection(i[0])
# because the connection is in singleton mode, it needs to be restored to the original state after teardown
self.connection_wrap.add_connection(default={"host": DefaultConfig.DEFAULT_HOST,
"port": DefaultConfig.DEFAULT_PORT})
except Exception as e:
pass

View File

@ -12,44 +12,44 @@ class ApiConnectionsWrapper:
def __init__(self):
self.connection = Connections()
def add_connection(self, check_res=None, check_params=None, **kwargs):
def add_connection(self, check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.connection.add_connection], **kwargs)
check_result = ResponseChecker(res, func_name, check_res, check_params, check, **kwargs).run()
return res, check_result
response, is_succ = api_request([self.connection.add_connection], **kwargs)
check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ, **kwargs).run()
return response, check_result
def disconnect(self, alias, check_res=None, check_params=None):
def disconnect(self, alias, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.connection.disconnect, alias])
check_result = ResponseChecker(res, func_name, check_res, check_params, check, alias=alias).run()
return res, check_result
response, is_succ = api_request([self.connection.disconnect, alias])
check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ, alias=alias).run()
return response, check_result
def remove_connection(self, alias, check_res=None, check_params=None):
def remove_connection(self, alias, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.connection.remove_connection, alias])
check_result = ResponseChecker(res, func_name, check_res, check_params, check, alias=alias).run()
return res, check_result
response, is_succ = api_request([self.connection.remove_connection, alias])
check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ, alias=alias).run()
return response, check_result
def connect(self, alias=DefaultConfig.DEFAULT_USING, check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, succ = api_request([self.connection.connect, alias], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, succ, alias=alias, **kwargs).run()
return res, check_result
response, succ = api_request([self.connection.connect, alias], **kwargs)
check_result = ResponseChecker(response, func_name, check_task, check_items, succ, alias=alias, **kwargs).run()
return response, check_result
def get_connection(self, alias=DefaultConfig.DEFAULT_USING, check_res=None, check_params=None):
def get_connection(self, alias=DefaultConfig.DEFAULT_USING, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.connection.get_connection, alias])
check_result = ResponseChecker(res, func_name, check_res, check_params, check, alias=alias).run()
return res, check_result
response, is_succ = api_request([self.connection.get_connection, alias])
check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ, alias=alias).run()
return response, check_result
def list_connections(self, check_res=None, check_params=None):
def list_connections(self, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.connection.list_connections])
check_result = ResponseChecker(res, func_name, check_res, check_params, check).run()
return res, check_result
response, is_succ = api_request([self.connection.list_connections])
check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ).run()
return response, check_result
def get_connection_addr(self, alias, check_res=None, check_params=None):
def get_connection_addr(self, alias, check_task=None, check_items=None):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.connection.get_connection_addr, alias])
check_result = ResponseChecker(res, func_name, check_res, check_params, check, alias=alias).run()
return res, check_result
response, is_succ = api_request([self.connection.get_connection_addr, alias])
check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ, alias=alias).run()
return response, check_result

View File

@ -1,9 +1,10 @@
from utils.util_log import test_log as log
from common import common_type as ct
from common.common_type import CheckTasks
from common.common_type import CheckTasks, Connect_Object_Name
# from common.code_mapping import ErrorCode, ErrorMessage
from pymilvus_orm import Collection, Partition
from utils.api_request import Error
import check.param_check as pc
class ResponseChecker:
@ -30,8 +31,8 @@ class ResponseChecker:
elif self.check_task == CheckTasks.err_res:
result = self.assert_exception(self.response, self.succ, self.check_items)
elif self.check_task == CheckTasks.check_list_count and self.check_items is not None:
result = self.check_list_count(self.response, self.func_name, self.check_items)
elif self.check_task == CheckTasks.check_connection_result:
result = self.check_value_equal(self.response, self.func_name, self.check_items)
elif self.check_task == CheckTasks.check_collection_property:
result = self.check_collection_property(self.response, self.func_name, self.check_items)
@ -64,18 +65,35 @@ class ResponseChecker:
return True
@staticmethod
def check_list_count(res, func_name, params):
if not isinstance(res, list):
log.error("[CheckFunc] Response of API is not a list: %s" % str(res))
assert False
def check_value_equal(res, func_name, params):
""" check response of connection interface that result is normal """
if func_name == "list_connections":
list_count = params.get("list_count", None)
if not str(list_count).isdigit():
log.error("[CheckFunc] Check param of list_count is not a number: %s" % str(list_count))
if not isinstance(res, list):
log.error("[CheckFunc] Response of list_connections is not a list: %s" % str(res))
assert False
assert len(res) == int(list_count)
list_content = params.get("list_content", None)
if not isinstance(list_content, list):
log.error("[CheckFunc] Check param of list_content is not a list: %s" % str(list_content))
assert False
new_res = pc.get_connect_object_name(res)
assert pc.list_equal_check(new_res, list_content)
if func_name == "get_connection_addr":
dict_content = params.get("dict_content", None)
assert pc.dict_equal_check(res, dict_content)
if func_name == "connect":
class_obj = Connect_Object_Name
res_obj = type(res).__name__
assert res_obj == class_obj
if func_name == "get_connection":
value_content = params.get("value_content", None)
res_obj = type(res).__name__ if res is not None else None
assert res_obj == value_content
return True

View File

@ -1,5 +1,6 @@
import pytest
import sys
import operator
sys.path.append("..")
from utils.util_log import test_log as log
@ -40,5 +41,78 @@ def exist_check(param, _list):
return True
else:
log.error("[EXIST_CHECK] Param(%s) is not in (%s)" % (param, _list))
log.error("[EXIST_CHECK] Param(%s) is not in (%s)." % (param, _list))
return False
def dict_equal_check(dict1, dict2):
if not isinstance(dict1, dict) or not isinstance(dict2, dict):
log.error("[DICT_EQUAL_CHECK] Type of dict(%s) or dict(%s) is not a dict." % (str(dict1), str(dict2)))
return False
return operator.eq(dict1, dict2)
def list_de_duplication(_list):
if not isinstance(_list, list):
log.error("[LIST_DE_DUPLICATION] Type of list(%s) is not a list." % str(_list))
return _list
# de-duplication of _list
result = list(set(_list))
# Keep the order of the elements unchanged
result.sort(key=_list.index)
log.debug("[LIST_DE_DUPLICATION] %s after removing the duplicate elements, the list becomes %s" % (str(_list), str(result)))
return result
def list_equal_check(param1, param2):
check_result = True
if len(param1) == len(param1):
_list1 = list_de_duplication(param1)
_list2 = list_de_duplication(param2)
if len(_list1) == len(_list2):
for i in _list1:
if i not in _list2:
check_result = False
break
else:
check_result = False
else:
check_result = False
if check_result is False:
log.error("[LIST_EQUAL_CHECK] List(%s) and list(%s) are not equal." % (str(param1), str(param2)))
return check_result
def get_connect_object_name(_list):
""" get the name of the objects that returned by the connection """
if not isinstance(_list, list):
log.error("[GET_CONNECT_OBJECT_NAME] Type of list(%s) is not a list." % str(_list))
return _list
new_list = []
for i in _list:
if not isinstance(i, tuple):
log.error("[GET_CONNECT_OBJECT_NAME] The element:%s of the list is not tuple, please check manually."
% str(i))
return _list
if len(i) != 2:
log.error("[GET_CONNECT_OBJECT_NAME] The length of the tuple:%s is not equal to 2, please check manually."
% str(i))
return _list
if i[1] is not None:
_obj_name = type(i[1]).__name__
new_list.append((i[0], _obj_name))
else:
new_list.append(i)
log.debug("[GET_CONNECT_OBJECT_NAME] list:%s is reset to list:%s" % (str(_list), str(new_list)))
return new_list

View File

@ -35,6 +35,8 @@ float_vec_field_desc = "float vector type field"
binary_vec_field_desc = "binary vector type field"
Not_Exist = "Not_Exist"
Connect_Object_Name = "Milvus"
err_code = "err_code"
err_msg = "err_msg"
@ -77,7 +79,7 @@ class CheckTasks:
""" The name of the method used to check the result """
false = False
err_res = "error_response"
check_list_count = "check_list_count"
check_connection_result = "check_connection_result"
check_collection_property = "check_collection_property"
check_partition_property = "check_partition_property"
check_search_results = "check_search_results"

View File

@ -11,6 +11,7 @@ def pytest_addoption(parser):
parser.addoption("--tag", action="store", default="all", help="only run tests matching the tag.")
parser.addoption('--dry_run', action='store_true', default=False, help="")
parser.addoption('--partition_name', action='store', default="partition_name", help="name of partition")
parser.addoption('--connect_name', action='store', default="connect_name", help="name of connect")
parser.addoption('--descriptions', action='store', default="partition_des", help="descriptions of partition")
parser.addoption('--collection_name', action='store', default="collection_name", help="name of collection")
parser.addoption('--search_vectors', action='store', default="search_vectors", help="vectors of search")
@ -59,6 +60,11 @@ def dry_run(request):
return request.config.getoption("--dry_run")
@pytest.fixture
def connect_name(request):
return request.config.getoption("--connect_name")
@pytest.fixture
def partition_name(request):
return request.config.getoption("--partition_name")

View File

@ -5,6 +5,7 @@ from pymilvus_orm.default_config import DefaultConfig
from base.client_base import TestcaseBase
from utils.util_log import test_log as log
from common.common_type import *
from check.param_check import *
class TestConnectionParams(TestcaseBase):
@ -13,80 +14,114 @@ class TestConnectionParams(TestcaseBase):
The author Ting.Wang
"""
@pytest.mark.skip("No check for **kwargs")
@pytest.mark.xfail(reason="Issue #5684")
@pytest.mark.tags(CaseLabel.L3)
def test_connection_kwargs_param_check(self):
def test_connection_add_connection_kwargs_param_check(self):
"""
target: test **kwargs of connection
target: test **kwargs of add_connection
method: passing wrong parameters of **kwargs
expected: assert response is error
"""
# No check for **kwargs
res = self.connection_wrap.add_connection(_kwargs=[1, 2])
log.info(res[0])
self.connection_wrap.add_connection(_kwargs=[1, 2])
res = self.connection_wrap.get_connection_addr(alias='default')
assert res[0] == {}
# get addr of default alias
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING)
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None)]})
@pytest.mark.xfail(reason="Issue #5723")
@pytest.mark.tags(CaseLabel.L3)
def test_connection_connect_kwargs_param_check(self):
"""
target: test **kwargs of connect
method: passing wrong parameters of **kwargs
expected: assert response is error
"""
# get addr of default alias
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING)
# No check for **kwargs
res = self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, _kwargs=[1, 2])
log.info(res[0].args[0])
assert res[0].args[0] == "Fail connecting to server on localhost:19530. Timeout"
assert "Fail connecting to server" in res[0].args[0]
@pytest.mark.skip("No check for alias")
@pytest.mark.xfail(reason="Feature #5725")
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("alias", get_invalid_strs)
def test_connection_create_alias_param_check(self, alias):
def test_connection_connect_alias_param_check(self, alias):
"""
target: test create connection with wrong params of alias
method: create connection with wrong params of alias
target: test connect passes wrong params of alias
method: connect passes wrong params of alias
expected: assert response is error
"""
# No check for alias
res = self.connection_wrap.connect(alias=alias)
log.info(res[0])
@pytest.mark.skip("No check for alias")
@pytest.mark.xfail(reason="Feature #5725")
@pytest.mark.parametrize("alias", get_invalid_strs)
@pytest.mark.tags(CaseLabel.L3)
def test_connection_get_alias_param_check(self, alias):
"""
target: test get connection with wrong params of alias
method: get connection with wrong params of alias
target: test get connection passes wrong params of alias
method: get connection passes wrong params of alias
expected: assert response is error
"""
# not check for alias
res = self.connection_wrap.get_connection(alias=alias)
log.info(res[0])
@pytest.mark.skip("No check for alias")
@pytest.mark.xfail(reason="Feature #5725")
@pytest.mark.parametrize("alias", get_invalid_strs)
@pytest.mark.tags(CaseLabel.L3)
def test_connection_get_addr_alias_param_check(self, alias):
"""
target: test get connection addr with wrong params of alias
method: get connection addr with wrong params of alias
target: test get connection addr passes wrong params of alias
method: get connection addr passes wrong params of alias
expected: assert response is error
"""
# not check for alias
res = self.connection_wrap.get_connection_addr(alias=alias)
log.info(res[0])
@pytest.mark.skip("No check for alias")
@pytest.mark.xfail(reason="Feature #5725")
@pytest.mark.parametrize("alias", get_invalid_strs)
@pytest.mark.tags(CaseLabel.L3)
def test_connection_remove_alias_param_check(self, alias):
"""
target: test remove connection with wrong params of alias
method: remove connection with wrong params of alias
target: test remove connection passes wrong params of alias
method: remove connection passes wrong params of alias
expected: assert response is error
"""
# not check for alias
self._connect()
res = self.connection_wrap.remove_connection(alias=alias)
log.info(res[0])
@pytest.mark.xfail(reason="Feature #5725")
@pytest.mark.parametrize("alias", get_invalid_strs)
@pytest.mark.tags(CaseLabel.L3)
def test_connection_disconnect_alias_param_check(self, alias):
"""
target: test disconnect passes wrong params of alias
method: disconnect passes wrong params of alias
expected: assert response is error
"""
# not check for alias
self._connect()
res = self.connection_wrap.disconnect(alias=alias)
log.info(res[0])
class TestConnectionOperation(TestcaseBase):
"""
@ -94,202 +129,574 @@ class TestConnectionOperation(TestcaseBase):
The author Ting.Wang
"""
@pytest.mark.xfail(reason="#5684")
@pytest.mark.tags(CaseLabel.L3)
def test_connection_configure_repeat(self, host, port):
def test_connection_add_wrong_format(self):
"""
target: test connection configure four times
method: connection configure twice with the same params
expected: assert the configuration is successful
target: test add_connection, regardless of whether the connection exists
method: add existing and non-existing configurations at the same time
expected: list_connections include the configured connections
"""
self.connection_wrap.add_connection(default={"host": host, "port": port}, dev={"host": host, "port": port})
assert self.connection_wrap.list_connections()[0] == [('default', None), ('dev', None)]
assert self.connection_wrap.get_connection_addr(alias='default')[0] == {"host": host, "port": port}
self.connection_wrap.add_connection(default={"host": host, "port": port}, dev={"host": host, "port": port})
assert self.connection_wrap.list_connections()[0] == [('default', None), ('dev', None)]
# add connections
self.connection_wrap.add_connection(alias1={"host": "localhost", "port": "1"},
alias2={"port": "-1", "host": "hostlocal"},
testing={"": ""})
self.connection_wrap.add_connection(default1={"host": host, "port": port})
assert self.connection_wrap.list_connections()[0] == [('default', None), ('dev', None), ('default1', None)]
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None),
('alias1', None), ('alias2', None),
('testing', None)]})
self.connection_wrap.add_connection()
assert self.connection_wrap.list_connections()[0] == [('default', None), ('dev', None), ('default1', None)]
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': 'localhost', 'port': '19530'}})
self.connection_wrap.get_connection_addr(alias="alias1", check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {"host": "localhost", "port": "1"}})
self.connection_wrap.get_connection_addr(alias="alias2", check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {"host": "hostlocal", "port": "-1"}})
self.connection_wrap.get_connection_addr(alias="testing", check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {"": ""}})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_add_more(self):
"""
target: test add_connection passes in multiple parameters
method: add two params of add_connection
expected: added to the connection list successfully
"""
# add connections
self.connection_wrap.add_connection(alias1={"host": "localhost", "port": "1"},
alias2={"host": "192.168.1.1", "port": "123"})
# get the object of alias
self.connection_wrap.get_connection(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"value_content": None})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None),
('alias1', None), ('alias2', None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': 'localhost', 'port': '19530'}})
self.connection_wrap.get_connection_addr(alias="alias1", check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {"host": "localhost", "port": "1"}})
self.connection_wrap.get_connection_addr(alias="alias2", check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {"host": "192.168.1.1", "port": "123"}})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_add_single_more(self):
"""
target: test add connections separately
method: add_connection twice
expected: added to the connection list successfully
"""
# add connections
self.connection_wrap.add_connection(alias1={"host": "localhost", "port": "1"})
self.connection_wrap.add_connection(alias2={"host": "192.168.1.1", "port": "123"})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None),
('alias1', None), ('alias2', None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': 'localhost', 'port': '19530'}})
self.connection_wrap.get_connection_addr(alias="alias1", check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {"host": "localhost", "port": "1"}})
self.connection_wrap.get_connection_addr(alias="alias2", check_task=CheckTasks.check_connection_result,
check_items={
"dict_content": {"host": "192.168.1.1", "port": "123"}})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_add_default(self):
"""
target: add_connection passes default params successfully
method: add_connection passes default params
expected: response of add_connection is normal
"""
# add connections
self.connection_wrap.add_connection(default={'host': 'localhost', 'port': '19530'})
self.connection_wrap.add_connection(default={'port': '19530', 'host': 'localhost'})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': 'localhost', 'port': '19530'}})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_add_cover_default(self):
"""
target: add a connection to override the default connection
method: add_connection passes alias of default and different configure
expected: the configuration was successfully overwritten
"""
# get all addr of default alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': 'localhost', 'port': '19530'}})
# add connections
self.connection_wrap.add_connection(default={'host': '192.168.1.1', 'port': '12345'})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': '192.168.1.1', 'port': '12345'}})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_get_addr_not_exist(self):
"""
target: get addr of alias that is not exist and return {}
method: get_connection_addr passes alias that is not exist
expected: response of get_connection_addr is None
"""
# get an addr that not exist and return {}
self.connection_wrap.get_connection_addr(alias=Not_Exist, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {}})
@pytest.mark.skip("The maximum number of add_connection is not set")
def test_connection_add_max(self):
"""
The maximum number of add_connection is not set
"""
pass
@pytest.mark.tags(CaseLabel.L3)
def test_connection_add_after_connect(self, host, port):
"""
target: add_connect passes different params after normal connect
method: normal connection then add_connect passes different params
expected: add_connect failed
"""
# create connection that param of alias is not exist
self.connection_wrap.connect(alias="test_alias_name", host=host, port=port, check_task=CheckTasks.check_connection_result)
# add connection with diff params after that alias has been created
err_msg = "alias of 'test_alias_name' already creating connections, "\
+ "but the configure is not the same as passed in."
self.connection_wrap.add_connection(test_alias_name={"host": "localhost", "port": "1"},
check_task=CheckTasks.err_res,
check_items={"err_code": -1, "err_msg": err_msg})
# add connection with the same params
self.connection_wrap.add_connection(test_alias_name={"host": host, "port": port})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_add_after_default_connect(self, host, port):
"""
target: add_connect passes different params after normal connect passes default alias
method: normal connection then add_connect passes different params
expected: add_connect failed
"""
# create connection that param of alias is default
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, host=host, port=port,
check_task=CheckTasks.check_connection_result)
# add connection after that alias has been created
err_msg = "alias of 'test_alias_name' already creating connections, " \
+ "but the configure is not the same as passed in."
self.connection_wrap.add_connection(default={"host": "localhost", "port": "1"},
check_task=CheckTasks.err_res,
check_items={"err_code": -1, "err_msg": err_msg})
# add connection with the same params
self.connection_wrap.add_connection(test_alias_name={"host": host, "port": port})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_add_after_disconnect(self, host, port):
"""
target: add_connect after normal connectdisconnect
method: normal connect, disconnect then add connect passes the same alias
expected: add_connect successfully
"""
# create connection that param of alias is not exist
self.connection_wrap.connect(alias="test_alias_name", host=host, port=port, check_task=CheckTasks.check_connection_result)
# disconnect alias is exist
self.connection_wrap.disconnect(alias="test_alias_name")
# get an addr that is exist
self.connection_wrap.get_connection_addr(alias="test_alias_name", check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {"host": host, "port": port}})
# add connection after that alias has been disconnected
self.connection_wrap.add_connection(test_alias_name={"host": "localhost", "port": "1"})
# get an addr that is exist
self.connection_wrap.get_connection_addr(alias="test_alias_name", check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {"host": "localhost", "port": "1"}})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_add_after_remove(self, host, port):
"""
target: add_connect after normal connectremove_connection
method: normal connect, remove_connection then add connect passes the same alias
expected: add_connect successfully
"""
# create connection that param of alias is not exist
self.connection_wrap.connect(alias="test_alias_name", host=host, port=port, check_task=CheckTasks.check_connection_result)
# disconnect alias is exist
self.connection_wrap.remove_connection(alias="test_alias_name")
# get an addr that is not exist
self.connection_wrap.get_connection_addr(alias="test_alias_name", check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {}})
# add connection after that alias has been disconnected
self.connection_wrap.add_connection(test_alias_name={"host": "localhost", "port": "1"})
# get an addr that is exist
self.connection_wrap.get_connection_addr(alias="test_alias_name", check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {"host": "localhost", "port": "1"}})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_connect_alias_not_exist(self):
"""
target: connect passes alias that is not exist and raise error
method: connect passes alias that is not exist
expected: response of connect is error
"""
# create connection that param of alias is not exist
err_msg = "You need to pass in the configuration of the connection named '%s'" % Not_Exist
self.connection_wrap.connect(alias=Not_Exist, check_task=CheckTasks.err_res, check_items={"err_code": -1,
"err_msg": err_msg})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': "localhost", 'port': "19530"}})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_connect_default_alias_invalid(self, port):
"""
target: connect passes configure is not exist and raise error
method: connect passes configure is not exist
expected: response of connect is error
"""
# add invalid default connection
self.connection_wrap.add_connection(default={'host': "host", 'port': port})
# using default alias to create connection, the connection does not exist
err_msg = "Fail connecting to server on localhost:19530. Timeout"
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.err_res,
check_items={"err_code": -1, "err_msg": err_msg})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': "host", 'port': port}})
@ pytest.mark.tags(CaseLabel.L3)
def test_connection_connect_default_alias_effective(self, host, port):
"""
target: connect passes useful configure that adds by add_connect
method: connect passes configure that add by add_connect
expected: connect successfully
"""
# add a valid default connection
self.connection_wrap.add_connection(default={'host': host, 'port': port})
# successfully created default connection
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result)
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING,
Connect_Object_Name)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': host, 'port': port}})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING])
def test_connection_connect_repeat(self, host, port, connect_name):
"""
target: connect twice and return the same object
method: connect twice
expected: return the same object of connect
"""
# add a valid default connection
self.connection_wrap.add_connection(default={'host': host, 'port': port})
# successfully created default connection
self.connection_wrap.connect(alias=connect_name, check_task=CheckTasks.check_connection_result)
# get the object of alias
res_obj1 = self.connection_wrap.get_connection(alias=connect_name, check_task=CheckTasks.check_connection_result,
check_items={"value_content": Connect_Object_Name})[0]
# connect twice with the same params
self.connection_wrap.connect(alias=connect_name, host=host, port=port,
check_task=CheckTasks.check_connection_result)
# get the object of alias
res_obj2 = self.connection_wrap.get_connection(alias=connect_name, check_task=CheckTasks.check_connection_result,
check_items={"value_content": Connect_Object_Name})[0]
# check the response of the same alias is equal
assert res_obj1 == res_obj2
# connect twice with the different params
err_msg = "The connection named default already creating," \
+ " but passed parameters don't match the configured parameters,"
self.connection_wrap.connect(alias=connect_name, host="host", port=port,
check_task=CheckTasks.err_res, check_items={"err_code": -1, "err_msg": err_msg})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING, "test_alias_nme"])
def test_connection_connect_params(self, host, port, connect_name):
"""
target: connect directly via parameters and return the object of connect successfully
method: connect directly via parameters
expected: response of connect is Milvus object
"""
# successfully created default connection
self.connection_wrap.connect(alias=connect_name, host=host, port=port, check_task=CheckTasks.check_connection_result)
# get the object of alias
self.connection_wrap.get_connection(alias=connect_name, check_task=CheckTasks.check_connection_result,
check_items={"value_content": Connect_Object_Name})
# list all connections and check the response
list_content = [(connect_name, Connect_Object_Name)] if connect_name is DefaultConfig.DEFAULT_USING else\
[(DefaultConfig.DEFAULT_USING, None), (connect_name, Connect_Object_Name)]
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": list_content})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=connect_name, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': host, 'port': port}})
@pytest.mark.xfail(reason="5697")
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING, "test_alias_nme"])
def test_connection_connect_wrong_params(self, host, port, connect_name):
"""
target: connect directly via wrong parameters and raise error
method: connect directly via wrong parameters
expected: response of connect is error
"""
# created connection with wrong connect name
self.connection_wrap.connect(alias=connect_name, ip=host, port=port, check_task=CheckTasks.err_res,
check_items={"err_code": -1,
"err_msg": "Param is not complete. Please invoke as follow:"})
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None)]})
# get all addr of alias and check the response
dict_content = {'host': host, 'port': port} if connect_name == DefaultConfig.DEFAULT_USING else {}
self.connection_wrap.get_connection_addr(alias=connect_name, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": dict_content})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING, Not_Exist])
def test_connection_disconnect_not_exist(self, connect_name):
"""
target: disconnect passes alias that is not exist
method: disconnect passes alias that is not exist
expected: check connection list is normal
"""
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None)]})
# disconnect alias is not exist
self.connection_wrap.disconnect(alias=connect_name)
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {"host": "localhost", "port": "19530"}})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_disconnect_after_default_connect(self, host, port):
"""
target: disconnect default connect and check result
method: disconnect default connect
expected: the connection was successfully terminated
"""
# add a valid default connection
self.connection_wrap.add_connection(default={'host': host, 'port': port})
# successfully created default connection
self.connection_wrap.connect(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result)
# get the object of alias
self.connection_wrap.get_connection(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"value_content": Connect_Object_Name})
# disconnect alias is exist
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
# get the object of alias
self.connection_wrap.get_connection(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"value_content": None})
# disconnect twice
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=DefaultConfig.DEFAULT_USING, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': host, 'port': port}})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_disconnect_after_connect(self, host, port):
"""
target: disconnect test connect and check result
method: disconnect test connect
expected: the connection was successfully terminated
"""
test_alias_name = "test_alias_name"
# add a valid default connection
self.connection_wrap.add_connection(test_alias_name={'host': host, 'port': port})
# successfully created default connection
self.connection_wrap.connect(alias=test_alias_name, host=host, port=port,
check_task=CheckTasks.check_connection_result)
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None),
(test_alias_name, Connect_Object_Name)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=test_alias_name, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': host, 'port': port}})
# disconnect alias is exist
self.connection_wrap.disconnect(alias=test_alias_name)
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None),
(test_alias_name, None)]})
# get all addr of alias and check the response
self.connection_wrap.get_connection_addr(alias=test_alias_name, check_task=CheckTasks.check_connection_result,
check_items={"dict_content": {'host': host, 'port': port}})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_remove_connection_not_exist(self):
"""
target: test remove connection that is not exist
method: 1remove connection that is not exist
2create connection with default alias
3remove connection that is not exist
expected: assert alias of Not_exist is not exist
target: remove connection that is not exist and check result
method: remove connection that is not exist
expected: connection list is normal
"""
res = self.connection_wrap.remove_connection(alias=Not_Exist, check_res="")
assert res[0].args[0] == "There is no connection with alias '%s'." % Not_Exist
self._connect()
# remove the connection that is not exist
self.connection_wrap.remove_connection(alias=Not_Exist)
res = self.connection_wrap.remove_connection(alias=Not_Exist, check_res="")
assert res[0].args[0] == "There is no connection with alias '%s'." % Not_Exist
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": [(DefaultConfig.DEFAULT_USING, None)]})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_remove_connection_repeat(self):
def test_connection_remove_default_alias(self):
"""
target: test remove connection twice
method: remove connection twice
expected: assert the second response is an error
target: remove default alias connect and check result
method: remove default alias connect
expected: list connection and return {}
"""
self._connect()
self.connection_wrap.remove_connection(alias='default')
# remove the connection that is not exist
self.connection_wrap.remove_connection(alias=DefaultConfig.DEFAULT_USING)
res = self.connection_wrap.remove_connection(alias='default', check_res='')
assert res[0].args[0] == "There is no connection with alias 'default'."
# list all connections and check the response
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": []})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_normal_remove_connection_repeat(self, host, port):
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING, "test_alias_name"])
def test_connection_remove_after_connect(self, host, port, connect_name):
"""
target: test remove connection twice
method: remove connection twice
expected: assert the responses are True
target: remove connection after connect and check result
method: remove connection after connect
expected: addr is None, response of list_connection still included that configure
"""
self.connection_wrap.add_connection(default={"host": host, "port": port}, dev={"host": host, "port": port})
self.connection_wrap.connect(alias='default')
res = self.connection_wrap.get_connection_addr(alias='default')
assert res[0]["host"] == host
assert res[0]["port"] == port
self.connection_wrap.connect(alias='dev')
# successfully created default connection
self.connection_wrap.connect(alias=connect_name, host=host, port=port,
check_task=CheckTasks.check_connection_result)
self.connection_wrap.remove_connection(alias='default')
self.connection_wrap.remove_connection(alias='dev')
# remove the connection that is not exist
self.connection_wrap.remove_connection(alias=connect_name)
# get the object of alias
self.connection_wrap.get_connection(alias=DefaultConfig.DEFAULT_USING,
check_task=CheckTasks.check_connection_result,
check_items={"value_content": None})
# list all connections and check the response
list_content = [] if connect_name == DefaultConfig.DEFAULT_USING else [(DefaultConfig.DEFAULT_USING, None)]
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": list_content})
@pytest.mark.tags(CaseLabel.L3)
def test_connection_remove_connection_100_repeat(self):
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING, "test_alias_name"])
def test_connection_remove_after_disconnect(self, host, port, connect_name):
"""
target: test delete the same connection 100 times
method: delete the same connection 100 times
expected: assert the remaining 99 delete errors
target: remove connection after disconnect and check result
method: remove connection after disconnect
expected: response of list_connection not included that configure
"""
self._connect()
self.connection_wrap.remove_connection(alias='default')
for i in range(100):
res = self.connection_wrap.remove_connection(alias='default', check_res='')
assert res[0].args[0] == "There is no connection with alias 'default'."
# successfully created default connection
self.connection_wrap.connect(alias=connect_name, host=host, port=port,
check_task=CheckTasks.check_connection_result)
@pytest.mark.tags(CaseLabel.L3)
def test_connection_configure_remove_connection(self, host, port):
"""
target: test remove configure alias
method: remove configure alias
expected: assert res is err
"""
self.connection_wrap.add_connection(default={"host": host, "port": port})
alias_name = 'default'
# disconnect alias is exist
self.connection_wrap.disconnect(alias=connect_name)
res = self.connection_wrap.remove_connection(alias=alias_name, check_res='')
assert res[0].args[0] == "There is no connection with alias '%s'." % alias_name
# remove connection
self.connection_wrap.remove_connection(alias=connect_name)
@pytest.mark.skip("error res")
@pytest.mark.tags(CaseLabel.L3)
def test_connection_create_connection_remove_configure(self, host, port):
"""
target: test create connection before remove configure
method: create connection before reset configure
expected: assert res
"""
alias_name = "default"
self.connection_wrap.connect(alias=alias_name, host=host, port=port)
self.connection_wrap.add_connection()
self.connection_wrap.get_connection(alias=alias_name)
# remove twice connection
self.connection_wrap.remove_connection(alias=connect_name)
@pytest.mark.skip("error res")
@pytest.mark.tags(CaseLabel.L3)
def test_connection_create_connection_reset_configure(self, host, port):
"""
target: test params of create connection are different with configure
method: params of create connection are different with configure
expected: assert res
"""
alias_name = "default"
self.connection_wrap.connect(alias=alias_name, host=host, port=port)
self.connection_wrap.add_connection(default={'host': host, 'port': port})
self.connection_wrap.get_connection(alias=alias_name)
@pytest.mark.skip("res needs to be confirmed")
@pytest.mark.tags(CaseLabel.L3)
def test_connection_create_connection_diff_configure(self, host, port):
"""
target: test params of create connection are different with configure
method: params of create connection are different with configure
expected: assert res
"""
# error
self.connection_wrap.add_connection(default={"host": 'host', "port": port})
res = self.connection_wrap.connect(alias="default", host=host, port=port, check_task='')
log.info(res[0])
res = self.connection_wrap.connect(alias="default", host=host, port=port, check_task='')
log.info(res[0])
@pytest.mark.tags(CaseLabel.L3)
def test_connection_create_connection_repeat(self, host, port):
"""
target: test create connection twice
method: create connection twice
expected: res is True
"""
self._connect()
self.connection_wrap.get_connection(alias='default')
self.connection_wrap.connect(alias='default', host=host, port=port)
self.connection_wrap.get_connection(alias='default')
@pytest.mark.tags(CaseLabel.L3)
def test_connection_create_connection_not_exist(self, port):
"""
target: test create connection is not exist
method: create connection with not exist link
expected: assert res is wrong
"""
self.connection_wrap.get_connection(alias='default', check_res=CheckTasks.false)
res = self.connection_wrap.connect(alias="default", host='host', port=port, check_task='')
assert res[0].args[0] == "Fail connecting to server on host:19530. Timeout"
@pytest.mark.tags(CaseLabel.L3)
def test_connection_create_remove(self, host, port):
"""
target: test create and remove connection twice
method: create and remove connection twice
expected: assert res is correct
"""
alias_name = "default"
self.connection_wrap.connect(alias=alias_name, host=host, port=port)
self.connection_wrap.get_connection(alias=alias_name)
self.connection_wrap.remove_connection(alias=alias_name)
self.connection_wrap.get_connection(alias=alias_name, check_res=CheckTasks.false)
self.connection_wrap.connect(alias=alias_name, host=host, port=port)
self.connection_wrap.get_connection(alias=alias_name)
@pytest.mark.tags(CaseLabel.L3)
def test_connection_list_configure(self, host, port):
"""
target: test list connection of configure
method: list connection of configure
expected: assert res is correct
"""
self.connection_wrap.add_connection(default={"host": host, "port": port}, dev={"host": host, "port": port})
self.connection_wrap.connect(alias="default")
assert self.connection_wrap.list_connections()[0] == ['default', 'dev']
@pytest.mark.skip("Behavior to be determined")
@pytest.mark.tags(CaseLabel.L3)
def test_connection_list_create_configure(self, host, port):
"""
target: test list connection of configure
method: list connection of configure
expected: assert res is correct
"""
self.connection_wrap.connect(alias="default1", host=host, port=port)
self.connection_wrap.add_connection(default={"host": host, "port": port}, dev={"host": host, "port": port})
log.info(self.connection_wrap.list_connections()[0])
assert self.connection_wrap.list_connections()[0] == ['default', 'dev']
# list all connections and check the response
list_content = [] if connect_name == DefaultConfig.DEFAULT_USING else [(DefaultConfig.DEFAULT_USING, None)]
self.connection_wrap.list_connections(check_task=CheckTasks.check_connection_result,
check_items={"list_content": list_content})

View File

@ -20,7 +20,7 @@ def api_request_catch():
return res, True
except Exception as e:
log.error(traceback.format_exc())
log.error("[Milvus API Exception]%s: %s" % (str(func), str(e)[0:log_row_length]))
log.error("(api_res) [Milvus API Exception]%s: %s" % (str(func), str(e)[0:log_row_length]))
return Error(e), False
return inner_wrapper
@ -37,7 +37,6 @@ def api_request(_list, **kwargs):
if len(_list) > 1:
for a in _list[1:]:
arg.append(a)
log.debug("(api_req)[%s] Parameters are arg: %s, kwargs: %s"
% (str(func), str(arg)[0:log_row_length], str(kwargs)))
log.info("(api_req)[%s] Parameters ars arg: %s, kwargs: %s" % (str(func), str(arg), str(kwargs)))
return func(*arg, **kwargs)
return False, False