test: update test cases about rbac (#29088)

issue: #29087
RBAC cases fail a lot.
1. some cases are out of date, for example, the default value of db_name
has changed from "default" to "" in some apis
2. add time sleep after the action of grant or revoke, for it costs time
to take effect

Signed-off-by: nico <cheng.yuan@zilliz.com>
pull/30456/head
nico 2024-01-30 11:33:02 +08:00 committed by GitHub
parent 7c086a4608
commit 168260cba3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 339 additions and 140 deletions

View File

@ -30,10 +30,12 @@ class ApiConnectionsWrapper:
check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ, alias=alias).run()
return response, check_result
def connect(self, alias=DefaultConfig.DEFAULT_USING, user="", password="", db_name="", check_task=None, check_items=None, **kwargs):
def connect(self, alias=DefaultConfig.DEFAULT_USING, user="", password="", db_name="default", token: str = "",
check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
response, succ = api_request([self.connection.connect, alias, user, password, db_name], **kwargs)
check_result = ResponseChecker(response, func_name, check_task, check_items, succ, alias=alias, **kwargs).run()
response, succ = api_request([self.connection.connect, alias, user, password, db_name, token], **kwargs)
check_result = ResponseChecker(response, func_name, check_task, check_items, succ, alias=alias, user=user,
password=password, db_name=db_name, token=token, **kwargs).run()
return response, check_result
def has_connection(self, alias=DefaultConfig.DEFAULT_USING, check_task=None, check_items=None):

View File

@ -459,25 +459,25 @@ class ApiUtilityWrapper:
def role_name(self):
return self.role.name
def role_grant(self, object: str, object_name: str, privilege: str, db_name: str = "default", check_task=None, check_items=None, **kwargs):
def role_grant(self, object: str, object_name: str, privilege: str, db_name: str = "", check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.role.grant, object, object_name, privilege, db_name], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
def role_revoke(self, object: str, object_name: str, privilege: str, db_name: str = "default", check_task=None, check_items=None, **kwargs):
def role_revoke(self, object: str, object_name: str, privilege: str, db_name: str = "", check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.role.revoke, object, object_name, privilege, db_name], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
def role_list_grant(self, object: str, object_name: str, db_name: str = "default", check_task=None, check_items=None, **kwargs):
def role_list_grant(self, object: str, object_name: str, db_name: str = "", check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.role.list_grant, object, object_name, db_name], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()
return res, check_result
def role_list_grants(self, db_name: str = "default", check_task=None, check_items=None, **kwargs):
def role_list_grants(self, db_name: str = "", check_task=None, check_items=None, **kwargs):
func_name = sys._getframe().f_code.co_name
res, check = api_request([self.role.list_grants, db_name], **kwargs)
check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run()

View File

@ -92,6 +92,10 @@ class ResponseChecker:
# Collection interface response check
result = self.check_permission_deny(self.response, self.succ)
elif self.check_task == CheckTasks.check_auth_failure:
# connection interface response check
result = self.check_auth_failure(self.response, self.succ)
elif self.check_task == CheckTasks.check_rg_property:
# describe resource group interface response check
result = self.check_rg_property(self.response, self.func_name, self.check_items)
@ -588,3 +592,13 @@ class ResponseChecker:
log.error("[CheckFunc] Response of API is not an error: %s" % str(res))
assert False
return True
@staticmethod
def check_auth_failure(res, actual=True):
assert actual is False
if isinstance(res, Error):
assert "auth" in res.message
else:
log.error("[CheckFunc] Response of API is not an error: %s" % str(res))
assert False
return True

View File

@ -270,6 +270,7 @@ class CheckTasks:
check_merge_compact = "check_merge_compact"
check_role_property = "check_role_property"
check_permission_deny = "check_permission_deny"
check_auth_failure = "check_auth_failure"
check_value_equal = "check_value_equal"
check_rg_property = "check_resource_group_property"
check_describe_collection_property = "check_describe_collection_property"

View File

@ -4,6 +4,7 @@ from pymilvus import DefaultConfig
from base.client_base import TestcaseBase
import common.common_type as ct
import common.common_func as cf
from common.common_type import CaseLabel, CheckTasks
from common.code_mapping import ConnectionErrorMessage as cem
# CONNECT_TIMEOUT = 12
@ -1006,27 +1007,23 @@ class TestConnectUserPasswordInvalid(TestcaseBase):
excepted: connected is false
"""
self.connection_wrap.connect(host=host, port=port,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 2,
ct.err_msg: "Fail connecting to server"})
check_task=CheckTasks.check_auth_failure)
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("user", ["alice3333"])
def test_connect_with_invalid_user_connection(self, host, port, user):
def test_connect_with_invalid_user_connection(self, host, port):
"""
target: test the nonexistent to connect
method: connect with the nonexistent user
excepted: connected is false
"""
self.connection_wrap.connect(host=host, port=port, user=user, password="abc123",
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 2,
ct.err_msg: "Fail connecting to server"})
user_name = cf.gen_unique_str()
password = cf.gen_str_by_length()
self.connection_wrap.connect(host=host, port=port, user=user_name, password=password,
check_task=CheckTasks.check_auth_failure)
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("user", ["anny015"])
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING])
def test_connect_with_password_invalid(self, host, port, user, connect_name):
def test_connect_with_password_invalid(self, host, port, connect_name):
"""
target: test the wrong password when connecting
method: connect with the wrong password
@ -1037,11 +1034,11 @@ class TestConnectUserPasswordInvalid(TestcaseBase):
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# 2.create a credential
self.utility_wrap.create_user(user=user, password="qwaszx0")
user_name = cf.gen_unique_str()
password = cf.gen_str_by_length()
self.utility_wrap.create_user(user=user_name, password=password)
# 3.connect with the created user and wrong password
self.connection_wrap.disconnect(alias=connect_name)
self.connection_wrap.connect(host=host, port=port, user=user, password=ct.default_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 2,
ct.err_msg: "Fail connecting to server"})
self.connection_wrap.connect(host=host, port=port, user=user_name, password=ct.default_password,
check_task=CheckTasks.check_auth_failure)

View File

@ -3,6 +3,7 @@ import pytest
from base.client_base import TestcaseBase
from common.common_type import CheckTasks, CaseLabel
from common.common_func import param_info
from common import common_func as cf
from common import common_type as ct
from utils.util_log import test_log as log
@ -14,6 +15,11 @@ prefix = "db"
class TestDatabaseParams(TestcaseBase):
""" Test case of database """
def setup_method(self, method):
param_info.param_user = ct.default_user
param_info.param_password = ct.default_password
super().setup_method(method)
def teardown_method(self, method):
"""
teardown method: drop collection and db
@ -99,14 +105,27 @@ class TestDatabaseParams(TestcaseBase):
dbs_afrer_drop, _ = self.database_wrap.list_database()
assert db_name not in dbs_afrer_drop
def test_create_db_invalid_name(self, get_invalid_string):
@pytest.mark.parametrize("get_invalid_string", ct.get_invalid_strs[6:])
def test_create_db_invalid_name_value(self, get_invalid_string):
"""
target: test create db with invalid name
method: create db with invalid name
expected: error
"""
self._connect()
error = {ct.err_code: 1, ct.err_msg: "Invalid database name"}
error = {ct.err_code: 802, ct.err_msg: "invalid database name[database=%s]" % get_invalid_string}
self.database_wrap.create_database(db_name=get_invalid_string, check_task=CheckTasks.err_res,
check_items=error)
@pytest.mark.parametrize("get_invalid_string", ct.get_invalid_strs[:6])
def test_create_db_invalid_name_type(self, get_invalid_string):
"""
target: test create db with invalid name
method: create db with invalid name
expected: error
"""
self._connect()
error = {ct.err_code: 1, ct.err_msg: "invalid database name[database=%s]" % get_invalid_string}
self.database_wrap.create_database(db_name=get_invalid_string, check_task=CheckTasks.err_res,
check_items=error)
@ -131,7 +150,34 @@ class TestDatabaseParams(TestcaseBase):
error = {ct.err_code: 1, ct.err_msg: "database already exist: default"}
self.database_wrap.create_database(ct.default_db, check_task=CheckTasks.err_res, check_items=error)
def test_drop_db_invalid_name(self, get_invalid_string):
@pytest.mark.parametrize("get_invalid_string", ct.get_invalid_strs[6:])
def test_drop_db_invalid_name_value(self, get_invalid_string):
"""
target: test drop db with invalid name
method: drop db with invalid name
expected: exception
"""
self._connect()
# create db
db_name = cf.gen_unique_str(prefix)
self.database_wrap.create_database(db_name)
# drop db
self.database_wrap.drop_database(db_name=get_invalid_string, check_task=CheckTasks.err_res,
check_items={ct.err_code: 802, ct.err_msg: "invalid database name"})
# created db is exist
self.database_wrap.create_database(db_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 65535,
ct.err_msg: "database already exist: %s" % db_name})
self.database_wrap.drop_database(db_name)
dbs, _ = self.database_wrap.list_database()
assert db_name not in dbs
@pytest.mark.parametrize("get_invalid_string", ct.get_invalid_strs[:6])
def test_drop_db_invalid_name_type(self, get_invalid_string):
"""
target: test drop db with invalid name
method: drop db with invalid name
@ -149,7 +195,8 @@ class TestDatabaseParams(TestcaseBase):
# created db is exist
self.database_wrap.create_database(db_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "db existed"})
check_items={ct.err_code: 65535,
ct.err_msg: "database already exist: %s" % db_name})
self.database_wrap.drop_database(db_name)
dbs, _ = self.database_wrap.list_database()
@ -213,13 +260,21 @@ class TestDatabaseParams(TestcaseBase):
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
# using db with invalid name
error = {ct.err_code: 800, ct.err_msg: "database not found[database=%s]" % invalid_db_name}
if invalid_db_name == "中文":
error = {ct.err_code: 1, ct.err_msg: "<metadata was invalid: [('dbname', '中文')"}
self.database_wrap.using_database(db_name=invalid_db_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "database not exist"})
check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
class TestDatabaseOperation(TestcaseBase):
def setup_method(self, method):
param_info.param_user = ct.default_user
param_info.param_password = ct.default_password
super().setup_method(method)
def teardown_method(self, method):
"""
teardown method: drop collection and db
@ -274,8 +329,8 @@ class TestDatabaseOperation(TestcaseBase):
self.database_wrap.create_database(cf.gen_unique_str(prefix))
# there are ct.max_database_num-1 dbs (default is not included)
error = {ct.err_code: 1,
ct.err_msg: f"database number ({ct.max_database_num + 1}) exceeds max configuration ({ct.max_database_num})"}
error = {ct.err_code: 801,
ct.err_msg: f"exceeded the limit number of database[limit={ct.max_database_num}]"}
self.database_wrap.create_database(cf.gen_unique_str(prefix), check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.skip(reason="https://github.com/milvus-io/milvus/issues/24182")
@ -448,7 +503,9 @@ class TestDatabaseOperation(TestcaseBase):
# drop db
self.database_wrap.drop_database(db_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "can not drop default database"})
check_items={ct.err_code: 65535,
ct.err_msg: "database:%s not empty, must drop all "
"collections before drop database" % db_name})
# drop collection and drop db
collection_w.drop()
@ -497,7 +554,8 @@ class TestDatabaseOperation(TestcaseBase):
# verify current db
self.utility_wrap.list_collections(check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "database not exist:"})
check_items={ct.err_code: 800,
ct.err_msg: "database not found[database=%s]" % db_name})
self.database_wrap.list_database()
self.database_wrap.using_database(ct.default_db)
@ -622,7 +680,8 @@ class TestDatabaseOtherApi(TestcaseBase):
teardown method: drop collection and db
"""
log.info("[database_teardown_method] Start teardown database test cases ...")
param_info.param_user = ct.default_user
param_info.param_password = ct.default_password
self._connect()
# clear db
@ -661,11 +720,14 @@ class TestDatabaseOtherApi(TestcaseBase):
@pytest.mark.parametrize("invalid_db_name", ["12-s", "12 s", "(mn)", "中文", "%$#"])
def test_connect_invalid_db_name_2(self, host, port, invalid_db_name):
# connect with invalid db
error = {ct.err_code: 800, ct.err_msg: "database not found[database=%s]" % invalid_db_name}
if invalid_db_name == "中文":
error = {ct.err_code: 1, ct.err_msg: "<metadata was invalid: [('dbname', '中文')"}
self.connection_wrap.connect(host=host, port=port, db_name=invalid_db_name,
user=ct.default_user, password=ct.default_password,
secure=cf.param_info.param_secure,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "database not found:"})
check_items=error)
def test_connect_not_existed_db(self, host, port):
"""
@ -873,11 +935,11 @@ class TestDatabaseOtherApi(TestcaseBase):
# assert set(vec_res[0].keys()) == {ct.default_int64_field_name}
# query iterator
self.collection_wrap.query_iterator(f"{ct.default_int64_field_name} <= 3000", limit=ct.default_limit * 10,
self.collection_wrap.query_iterator(expr=f"{ct.default_int64_field_name} <= 3000", batch_size=ct.default_limit * 10,
partition_names=[partition_name],
check_task=CheckTasks.check_query_iterator,
check_items={"count": 1000,
"limit": ct.default_limit * 10})
"batch_size": ct.default_limit * 10})
def prepare_data_for_db_search(self):
"""
@ -885,6 +947,8 @@ class TestDatabaseOtherApi(TestcaseBase):
:return:
:rtype:
"""
param_info.param_user = ct.default_user
param_info.param_password = ct.default_password
self._connect()
# create a db

View File

@ -2284,16 +2284,15 @@ class TestUtilityUserPassword(TestcaseBase):
method: delete user with username and connect with the wrong user then list collections
expected: deleted successfully
"""
user = "xiaoai"
user_name = cf.gen_unique_str(prefix)
password = cf.gen_str_by_length()
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.create_user(user=user, password="abc123")
self.utility_wrap.delete_user(user=user)
self.utility_wrap.create_user(user=user_name, password=password)
self.utility_wrap.delete_user(user=user_name)
self.connection_wrap.disconnect(alias=connect_name)
self.connection_wrap.connect(host=host, port=port, user=user, password="abc123",
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 2,
ct.err_msg: "Fail connecting to server"})
self.connection_wrap.connect(host=host, port=port, user=user_name, password=password,
check_task=CheckTasks.check_auth_failure)
@pytest.mark.tags(ct.CaseLabel.RBAC)
def test_delete_user_with_invalid_username(self, host, port):
@ -2338,11 +2337,11 @@ class TestUtilityInvalidUserPassword(TestcaseBase):
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.create_user(user=user, password=ct.default_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 5})
check_items={ct.err_code: 1100,
ct.err_msg: "invalid parameter"})
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("user", ["alice123w"])
def test_create_user_with_existed_username(self, host, port, user):
def test_create_user_with_existed_username(self, host, port):
"""
target: test the user when create user
method: create a user, and then create a user with the same username
@ -2353,15 +2352,18 @@ class TestUtilityInvalidUserPassword(TestcaseBase):
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# 2.create the first user successfully
self.utility_wrap.create_user(user=user, password=ct.default_password)
user_name = cf.gen_unique_str(prefix)
self.utility_wrap.create_user(user=user_name, password=ct.default_password)
# 3.create the second user with the same username
self.utility_wrap.create_user(user=user, password=ct.default_password,
check_task=ct.CheckTasks.err_res, check_items={ct.err_code: 29})
self.utility_wrap.create_user(user=user_name, password=ct.default_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 65535,
ct.err_msg: "user already exists: %s" % user_name})
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("password", ["12345"])
def test_create_user_with_invalid_password(self, host, port, password):
@pytest.mark.parametrize("invalid_password", ["12345"])
def test_create_user_with_invalid_password(self, host, port, invalid_password):
"""
target: test the password when create user
method: make the length of user exceed the limitation [6, 256]
@ -2369,14 +2371,15 @@ class TestUtilityInvalidUserPassword(TestcaseBase):
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = "alice"
self.utility_wrap.create_user(user=user, password=password,
check_task=ct.CheckTasks.err_res, check_items={ct.err_code: 5})
user_name = cf.gen_unique_str(prefix)
self.utility_wrap.create_user(user=user_name, password=invalid_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 1100,
ct.err_msg: "invalid password length: invalid parameter"
"[5 out of range 6 <= value <= 256]"})
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("user", ["hobo89"])
@pytest.mark.parametrize("old_password", ["qwaszx0"])
def test_reset_password_with_invalid_username(self, host, port, user, old_password):
def test_reset_password_with_invalid_username(self, host, port):
"""
target: test the wrong user when resetting password
method: create a user, and then reset the password with wrong username
@ -2387,18 +2390,21 @@ class TestUtilityInvalidUserPassword(TestcaseBase):
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# 2.create a user
self.utility_wrap.create_user(user=user, password=old_password)
user_name = cf.gen_unique_str(prefix)
old_password = cf.gen_str_by_length()
new_password = cf.gen_str_by_length()
self.utility_wrap.create_user(user=user_name, password=old_password)
# 3.reset password with the wrong username
self.utility_wrap.reset_password(user="hobo", old_password=old_password, new_password="qwaszx1",
self.utility_wrap.reset_password(user="hobo", old_password=old_password, new_password=new_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 30})
check_items={ct.err_code: 1400,
ct.err_msg: "old password not correct for hobo: "
"not authenticated"})
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("user", ["demo"])
@pytest.mark.parametrize("old_password", ["qwaszx0"])
@pytest.mark.parametrize("new_password", ["12345"])
def test_reset_password_with_invalid_new_password(self, host, port, user, old_password, new_password):
def test_reset_password_with_invalid_new_password(self, host, port, new_password):
"""
target: test the new password when resetting password
method: create a user, and then set a wrong new password
@ -2409,32 +2415,38 @@ class TestUtilityInvalidUserPassword(TestcaseBase):
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# 2.create a user
self.utility_wrap.create_user(user=user, password=old_password)
user_name = cf.gen_unique_str(prefix)
old_password = cf.gen_str_by_length()
self.utility_wrap.create_user(user=user_name, password=old_password)
# 3.reset password with the wrong new password
self.utility_wrap.reset_password(user=user, old_password=old_password, new_password=new_password,
self.utility_wrap.reset_password(user=user_name, old_password=old_password, new_password=new_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 5})
check_items={ct.err_code: 1100,
ct.err_msg: "invalid password length: invalid parameter"
"[5 out of range 6 <= value <= 256]"})
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("user", ["genny001"])
def test_reset_password_with_invalid_old_password(self, host, port, user):
def test_reset_password_with_invalid_old_password(self, host, port):
"""
target: test the old password when resetting password
method: create a credential, and then reset with a wrong old password
excepted: reset is false
"""
user_name = cf.gen_unique_str(prefix)
old_password = cf.gen_str_by_length()
new_password = cf.gen_str_by_length()
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.create_user(user=user, password="qwaszx0")
self.utility_wrap.reset_password(user=user, old_password="waszx0", new_password="123456",
self.utility_wrap.create_user(user=user_name, password=old_password)
self.utility_wrap.reset_password(user=user_name, old_password="waszx0", new_password=new_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 30})
check_items={ct.err_code: 1400,
ct.err_msg: "old password not correct for %s: "
"not authenticated" % user_name})
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("user", ["hobo233"])
@pytest.mark.parametrize("old_password", ["qwaszx0"])
def test_update_password_with_invalid_username(self, host, port, user, old_password):
def test_update_password_with_invalid_username(self, host, port):
"""
target: test the wrong user when resetting password
method: create a user, and then reset the password with wrong username
@ -2445,18 +2457,21 @@ class TestUtilityInvalidUserPassword(TestcaseBase):
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# 2.create a user
self.utility_wrap.create_user(user=user, password=old_password)
user_name = cf.gen_unique_str(prefix)
old_password = cf.gen_str_by_length()
new_password = cf.gen_str_by_length()
self.utility_wrap.create_user(user=user_name, password=old_password)
# 3.reset password with the wrong username
self.utility_wrap.update_password(user="hobo", old_password=old_password, new_password="qwaszx1",
self.utility_wrap.update_password(user="hobo", old_password=old_password, new_password=new_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 30})
check_items={ct.err_code: 1400,
ct.err_msg: "old password not correct for hobo:"
" not authenticated"})
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("user", ["demo001"])
@pytest.mark.parametrize("old_password", ["qwaszx0"])
@pytest.mark.parametrize("new_password", ["12345"])
def test_update_password_with_invalid_new_password(self, host, port, user, old_password, new_password):
def test_update_password_with_invalid_new_password(self, host, port, new_password):
"""
target: test the new password when resetting password
method: create a user, and then set a wrong new password
@ -2467,27 +2482,35 @@ class TestUtilityInvalidUserPassword(TestcaseBase):
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# 2.create a user
self.utility_wrap.create_user(user=user, password=old_password)
user_name = cf.gen_unique_str(prefix)
old_password = cf.gen_str_by_length()
self.utility_wrap.create_user(user=user_name, password=old_password)
# 3.reset password with the wrong new password
self.utility_wrap.update_password(user=user, old_password=old_password, new_password=new_password,
self.utility_wrap.update_password(user=user_name, old_password=old_password, new_password=new_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 5})
check_items={ct.err_code: 1100,
ct.err_msg: "invalid password length: invalid parameter[5 out "
"of range 6 <= value <= 256]"})
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("user", ["genny"])
def test_update_password_with_invalid_old_password(self, host, port, user):
def test_update_password_with_invalid_old_password(self, host, port):
"""
target: test the old password when resetting password
method: create a credential, and then reset with a wrong old password
excepted: reset is false
"""
user_name = cf.gen_unique_str(prefix)
old_password = cf.gen_str_by_length()
new_password = cf.gen_str_by_length()
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.create_user(user=user, password="qwaszx0")
self.utility_wrap.update_password(user=user, old_password="waszx0", new_password="123456",
self.utility_wrap.create_user(user=user_name, password=old_password)
self.utility_wrap.update_password(user=user_name, old_password="waszx0", new_password=new_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 30})
check_items={ct.err_code: 1400,
ct.err_msg: "old password not correct for %s"
": not authenticated" % user_name})
@pytest.mark.tags(ct.CaseLabel.RBAC)
def test_delete_user_root(self, host, port):
@ -2499,7 +2522,9 @@ class TestUtilityInvalidUserPassword(TestcaseBase):
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.delete_user(user=ct.default_user, check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 31})
check_items={ct.err_code: 1401,
ct.err_msg: "root user cannot be deleted: "
"privilege not permitted"})
class TestUtilityRBAC(TestcaseBase):
@ -2534,6 +2559,16 @@ class TestUtilityRBAC(TestcaseBase):
role_groups, _ = self.utility_wrap.list_roles(False)
assert len(role_groups.groups) == 2
# drop database
databases, _ = self.database_wrap.list_database()
for db_name in databases:
self.database_wrap.using_database(db_name)
for c_name in self.utility_wrap.list_collections()[0]:
self.utility_wrap.drop_collection(c_name)
if db_name != ct.default_db:
self.database_wrap.drop_database(db_name)
super().teardown_method(method)
def init_db_kwargs(self, with_db):
@ -2731,7 +2766,7 @@ class TestUtilityRBAC(TestcaseBase):
def test_role_grant_collection_insert(self, host, port):
"""
target: test grant role collection insert privilege
method: create one role and tow collections, grant one collection insert privilege
method: create one role and two collections, grant one collection insert privilege
expected: assert grant privilege success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
@ -2747,18 +2782,17 @@ class TestUtilityRBAC(TestcaseBase):
check_items={exp_name: r_name})
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(user)
time.sleep(60)
self.init_collection_wrap(name=c_name)
self.init_collection_wrap(name=c_name_2)
collection_w1 = self.init_collection_wrap(name=c_name)
collection_w2 = self.init_collection_wrap(name=c_name_2)
# verify user default privilege
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr)
collection_w = self.init_collection_wrap(name=c_name)
data = cf.gen_default_list_data(ct.default_nb)
collection_w.insert(data=data, check_task=CheckTasks.check_permission_deny)
collection_w2 = self.init_collection_wrap(name=c_name_2)
data = cf.gen_default_dataframe_data()
collection_w1.insert(data=data, check_task=CheckTasks.check_permission_deny)
collection_w2.insert(data=data, check_task=CheckTasks.check_permission_deny)
# grant user collection insert privilege
@ -2767,19 +2801,18 @@ class TestUtilityRBAC(TestcaseBase):
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role(r_name)
self.utility_wrap.role_grant("Collection", c_name, "Insert")
time.sleep(60)
# verify user specific collection insert privilege
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr)
collection_w = self.init_collection_wrap(name=c_name)
collection_w.insert(data=data)
collection_w1.insert(data=data)
# verify grant scope
index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
collection_w.create_index(ct.default_float_vec_field_name, index_params,
check_task=CheckTasks.check_permission_deny)
collection_w2 = self.init_collection_wrap(name=c_name_2)
collection_w1.create_index(ct.default_float_vec_field_name, index_params,
check_task=CheckTasks.check_permission_deny)
collection_w2.insert(data=data, check_task=CheckTasks.check_permission_deny)
@pytest.mark.tags(CaseLabel.RBAC)
@ -2800,6 +2833,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.init_role("public")
self.utility_wrap.role_add_user(user)
self.utility_wrap.role_revoke("Collection", c_name, "Insert")
time.sleep(60)
data = cf.gen_default_list_data(ct.default_nb)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
@ -2885,6 +2919,7 @@ class TestUtilityRBAC(TestcaseBase):
# grant user collection insert privilege
self.utility_wrap.role_grant("Collection", c_name, "Insert", **db_kwargs)
time.sleep(60)
self.utility_wrap.role_list_grants(**db_kwargs)
# verify user specific collection insert privilege
@ -2901,6 +2936,7 @@ class TestUtilityRBAC(TestcaseBase):
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role(r_name)
self.utility_wrap.role_revoke("Collection", c_name, "Insert", **db_kwargs)
time.sleep(60)
# verify revoke is success
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
@ -2933,26 +2969,48 @@ class TestUtilityRBAC(TestcaseBase):
# grant user Global CreateCollection privilege
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "CreateCollection", **db_kwargs)
time.sleep(60)
# verify user specific Global CreateCollection privilege
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
collection_w = self.init_collection_wrap(name=c_name)
schema = cf.gen_default_collection_schema()
_, create_res = self.collection_wrap.init_collection(name=c_name, schema=schema,
check_task=CheckTasks.check_nothing)
retry_times = 6
while not create_res and retry_times > 0:
time.sleep(10)
_, create_res = self.collection_wrap.init_collection(name=c_name, schema=schema,
check_task=CheckTasks.check_nothing)
retry_times -= 1
# revoke privilege
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
db_name = db_kwargs.get("db_name", ct.default_db)
self.database_wrap.using_database(db_name)
assert c_name in self.utility_wrap.list_collections()[0]
self.utility_wrap.init_role(r_name)
self.utility_wrap.role_revoke("Global", "*", "CreateCollection", **db_kwargs)
time.sleep(60)
# verify revoke is success
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
collection_w = self.init_collection_wrap(name=c_name_2,
check_task=CheckTasks.check_permission_deny)
_, create_res = self.collection_wrap.init_collection(name=c_name_2, schema=schema,
check_task=CheckTasks.check_nothing)
retry_times = 6
while create_res and retry_times > 0:
time.sleep(10)
_, create_res = self.collection_wrap.init_collection(name=c_name_2, schema=schema,
check_task=CheckTasks.check_nothing)
retry_times -= 1
self.collection_wrap.init_collection(name=cf.gen_unique_str(prefix), schema=schema,
check_task=CheckTasks.check_permission_deny)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
@ -2980,7 +3038,9 @@ class TestUtilityRBAC(TestcaseBase):
# grant user User UpdateUser privilege
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("User", "*", "UpdateUser", **db_kwargs)
time.sleep(60)
self.utility_wrap.role_revoke("User", "*", "UpdateUser", **db_kwargs)
time.sleep(60)
# verify revoke is success
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
@ -3007,7 +3067,6 @@ class TestUtilityRBAC(TestcaseBase):
user2 = cf.gen_unique_str(prefix)
u2, _ = self.utility_wrap.create_user(user=user2, password=password)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(user)
@ -3021,6 +3080,7 @@ class TestUtilityRBAC(TestcaseBase):
for grant_item in grant_list:
self.utility_wrap.role_grant(grant_item["object"], grant_item["object_name"], grant_item["privilege"],
**db_kwargs)
time.sleep(60)
# list grants with default user
g_list, _ = self.utility_wrap.role_list_grants(**db_kwargs)
@ -3034,14 +3094,12 @@ class TestUtilityRBAC(TestcaseBase):
g_list, _ = self.utility_wrap.role_list_grants(**db_kwargs)
assert len(g_list.groups) == len(grant_list)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user2,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
# user2 can not list grants of role
self.utility_wrap.role_list_grants(**db_kwargs,
check_task=CheckTasks.check_permission_deny)
self.utility_wrap.role_list_grants(**db_kwargs, check_task=CheckTasks.check_permission_deny)
@pytest.mark.tags(CaseLabel.RBAC)
def test_drop_role_which_bind_user(self, host, port):
@ -3125,6 +3183,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.create_role()
self.utility_wrap.role_grant("Collection", c_name, "Search")
self.utility_wrap.role_grant("Collection", c_name, "Insert")
time.sleep(60)
g_list, _ = self.utility_wrap.role_list_grant("Collection", c_name)
assert len(g_list.groups) == 2
@ -3133,6 +3192,8 @@ class TestUtilityRBAC(TestcaseBase):
assert g.object_name == c_name
assert g.privilege in ["Search", "Insert"]
self.utility_wrap.role_revoke(g.object, g.object_name, g.privilege)
time.sleep(60)
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
@ -3150,6 +3211,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.create_role()
self.utility_wrap.role_grant("Global", "*", "CreateCollection")
self.utility_wrap.role_grant("Global", "*", "All")
time.sleep(60)
g_list, _ = self.utility_wrap.role_list_grant("Global", "*")
assert len(g_list.groups) == 2
@ -3158,6 +3220,8 @@ class TestUtilityRBAC(TestcaseBase):
assert g.object_name == "*"
assert g.privilege in ["CreateCollection", "All"]
self.utility_wrap.role_revoke(g.object, g.object_name, g.privilege)
time.sleep(60)
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
@ -3176,6 +3240,7 @@ class TestUtilityRBAC(TestcaseBase):
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
@ -3213,6 +3278,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.role_grant("Collection", c_name, "Load", **db_kwargs)
self.utility_wrap.role_grant("Collection", c_name, "GetLoadingProgress", **db_kwargs)
time.sleep(60)
log.debug(self.utility_wrap.role_list_grants(**db_kwargs))
self.database_wrap.using_database(db_name)
@ -3250,6 +3316,7 @@ class TestUtilityRBAC(TestcaseBase):
db_name = db_kwargs.get("db_name", ct.default_db)
self.utility_wrap.role_grant("Collection", c_name, "Release", **db_kwargs)
time.sleep(60)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=c_name)
@ -3328,6 +3395,7 @@ class TestUtilityRBAC(TestcaseBase):
# with db
self.utility_wrap.role_grant("Collection", c_name, "Insert", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
@ -3361,6 +3429,7 @@ class TestUtilityRBAC(TestcaseBase):
# with db
self.utility_wrap.role_grant("Collection", c_name, "Delete", **db_kwargs)
time.sleep(60)
data = cf.gen_default_list_data(ct.default_nb)
mutation_res, _ = collection_w.insert(data=data)
@ -3397,6 +3466,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.role_grant("Collection", c_name, "CreateIndex", **db_kwargs)
self.utility_wrap.role_grant("Collection", c_name, "Flush", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
@ -3429,6 +3499,7 @@ class TestUtilityRBAC(TestcaseBase):
collection_w.create_index(ct.default_float_vec_field_name)
self.utility_wrap.role_grant("Collection", c_name, "DropIndex", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
@ -3464,6 +3535,7 @@ class TestUtilityRBAC(TestcaseBase):
collection_w.load()
self.utility_wrap.role_grant("Collection", c_name, "Search", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
@ -3476,6 +3548,7 @@ class TestUtilityRBAC(TestcaseBase):
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
@pytest.mark.skip("will be modified soon, now flush will fail for GetFlushState")
def test_verify_collection_flush_privilege(self, host, port, with_db):
"""
target: verify grant collection flush privilege
@ -3498,11 +3571,12 @@ class TestUtilityRBAC(TestcaseBase):
db_name = db_kwargs.get("db_name", ct.default_db)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=c_name)
self.utility_wrap.role_grant("Collection", c_name, "Flush", **db_kwargs)
self.utility_wrap.role_grant("Collection", c_name, "Flush", db_name=db_name)
time.sleep(120)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
password=password, check_task=ct.CheckTasks.ccr, db_name=db_name)
collection_w.flush()
@pytest.mark.tags(CaseLabel.RBAC)
@ -3535,6 +3609,7 @@ class TestUtilityRBAC(TestcaseBase):
collection_w.load()
self.utility_wrap.role_grant("Collection", c_name, "Query", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
@ -3564,6 +3639,7 @@ class TestUtilityRBAC(TestcaseBase):
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "All", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
@ -3579,7 +3655,9 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(user_test)
self.utility_wrap.role_grant("Collection", c_name, "Insert")
time.sleep(60)
self.utility_wrap.role_revoke("Collection", c_name, "Insert")
time.sleep(60)
self.utility_wrap.role_remove_user(user_test)
self.utility_wrap.delete_user(user=user_test)
@ -3607,6 +3685,7 @@ class TestUtilityRBAC(TestcaseBase):
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "CreateCollection", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
@ -3635,6 +3714,7 @@ class TestUtilityRBAC(TestcaseBase):
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "DropCollection", **db_kwargs)
time.sleep(60)
collection_w = self.init_collection_wrap(name=c_name)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
@ -3663,6 +3743,7 @@ class TestUtilityRBAC(TestcaseBase):
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "CreateOwnership", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
@ -3694,6 +3775,7 @@ class TestUtilityRBAC(TestcaseBase):
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "DropOwnership", **db_kwargs)
time.sleep(60)
user_test = cf.gen_unique_str(prefix)
password_test = cf.gen_unique_str(prefix)
@ -3730,6 +3812,7 @@ class TestUtilityRBAC(TestcaseBase):
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "SelectOwnership", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
@ -3769,6 +3852,7 @@ class TestUtilityRBAC(TestcaseBase):
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "ManageOwnership", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
@ -3777,6 +3861,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.role_add_user(user_test)
self.utility_wrap.role_remove_user(user_test)
self.utility_wrap.role_grant("Collection", c_name, "Search")
time.sleep(60)
self.utility_wrap.role_revoke("Collection", c_name, "Search")
@pytest.mark.tags(CaseLabel.RBAC)
@ -3807,6 +3892,7 @@ class TestUtilityRBAC(TestcaseBase):
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("User", "*", "UpdateUser", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
@ -3841,6 +3927,7 @@ class TestUtilityRBAC(TestcaseBase):
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("User", "*", "SelectUser", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
@ -3913,6 +4000,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.role_grant("Collection", "*", "Load", **db_kwargs)
self.utility_wrap.role_grant("Collection", "*", "GetLoadingProgress", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
@ -3947,6 +4035,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.role_add_user(user)
self.utility_wrap.role_grant("Collection", "*", "*", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
@ -3996,6 +4085,7 @@ class TestUtilityRBAC(TestcaseBase):
password=password, check_task=ct.CheckTasks.ccr)
# Collection permission deny
time.sleep(60)
collection_w.load(check_task=CheckTasks.check_permission_deny)
collection_w.release(check_task=CheckTasks.check_permission_deny)
collection_w.compact(check_task=CheckTasks.check_permission_deny)
@ -4096,6 +4186,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("will be modified soon, now flush will fail for GetFlushState")
def test_grant_db_collections(self, host, port):
"""
target: test grant collection privilege with db
@ -4120,6 +4211,7 @@ class TestUtilityRBAC(TestcaseBase):
# grant role collection flush privilege
user, pwd, role = self.init_user_with_privilege("Collection", collection_w.name, "Flush", db_name)
self.utility_wrap.role_grant("Collection", collection_w.name, "GetStatistics", db_name)
time.sleep(60)
# re-connect with new user and default db
self.connection_wrap.disconnect(alias=ct.default_alias)
@ -4154,16 +4246,18 @@ class TestUtilityRBAC(TestcaseBase):
# grant role collection flush privilege
user, pwd, role = self.init_user_with_privilege("Global", "*", "*", db_name)
time.sleep(60)
# re-connect with new user and default db
self.connection_wrap.disconnect(alias=ct.default_alias)
self.connection_wrap.connect(host=host, port=port, user=user, password=pwd,
db_name=ct.default_db, secure=cf.param_info.param_secure,
check_task=ct.CheckTasks.ccr)
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# verify user list grants with different db
self.utility_wrap.role_list_grants(check_task=CheckTasks.check_permission_deny)
self.database_wrap.using_database(ct.default_db)
self.utility_wrap.describe_resource_group(ct.default_resource_group_name,
check_task=CheckTasks.check_permission_deny)
# set using db to db_name and verify grants
self.database_wrap.using_database(db_name)
self.utility_wrap.role_list_grants()
@ -4189,6 +4283,7 @@ class TestUtilityRBAC(TestcaseBase):
# grant role collection flush privilege
user, pwd, role = self.init_user_with_privilege("User", "*", "SelectUser", db_name)
time.sleep(60)
# re-connect with new user and default db
self.connection_wrap.disconnect(alias=ct.default_alias)
@ -4223,10 +4318,12 @@ class TestUtilityRBAC(TestcaseBase):
# grant role collection flush privilege
user, pwd, role = self.init_user_with_privilege("Collection", collection_w.name, "Flush", db_name)
time.sleep(60)
# revoke privilege with default db
self.utility_wrap.role_revoke("Collection", collection_w.name, "Flush", ct.default_db)
self.utility_wrap.role_revoke("Collection", collection_w.name, "Flush", db_name)
time.sleep(60)
# re-connect with new user and db
self.connection_wrap.disconnect(alias=ct.default_alias)
@ -4258,6 +4355,7 @@ class TestUtilityRBAC(TestcaseBase):
# grant role collection * All privilege
_, _, role_name = self.init_user_with_privilege("Global", "*", "All", db_name)
time.sleep(60)
log.debug(f"role name: {role_name}")
# list grant with db and verify
@ -4297,6 +4395,7 @@ class TestUtilityRBAC(TestcaseBase):
# grant role collection flush privilege
self.init_user_with_privilege("Global", "*", "All", db_name)
self.utility_wrap.role_grant("User", "*", "UpdateUser", db_name)
time.sleep(60)
# list grants with db and verify
grants, _ = self.utility_wrap.role_list_grants(db_name=db_name)
@ -4331,6 +4430,7 @@ class TestUtilityRBAC(TestcaseBase):
# grant global privilege to default db
tmp_user, tmp_pwd, tmp_role = self.init_user_with_privilege("User", "*", "SelectUser", ct.default_db)
time.sleep(60)
# re-connect
self.connection_wrap.disconnect(ct.default_alias)
@ -4384,6 +4484,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.role_grant(grant_item["object"], grant_item["object_name"], grant_item["privilege"],
**db_kwargs)
time.sleep(60)
self.init_collection_wrap(name=c_name)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
@ -4405,6 +4506,7 @@ class TestUtilityRBAC(TestcaseBase):
self.utility_wrap.drop_alias(alias_name,
check_task=CheckTasks.check_permission_deny)
class TestUtilityNegativeRbac(TestcaseBase):
def teardown_method(self, method):
@ -4437,6 +4539,16 @@ class TestUtilityNegativeRbac(TestcaseBase):
role_groups, _ = self.utility_wrap.list_roles(False)
assert len(role_groups.groups) == 2
# drop database
databases, _ = self.database_wrap.list_database()
for db_name in databases:
self.database_wrap.using_database(db_name)
for c_name in self.utility_wrap.list_collections()[0]:
self.utility_wrap.drop_collection(c_name)
if db_name != ct.default_db:
self.database_wrap.drop_database(db_name)
super().teardown_method(method)
@pytest.fixture(scope="function", params=ct.get_invalid_strs)
@ -4464,7 +4576,7 @@ class TestUtilityNegativeRbac(TestcaseBase):
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role(name)
error = {"err_code": 5}
error = {"err_code": 1100, "err_msg": "invalid parameter"}
self.utility_wrap.create_role(check_task=CheckTasks.err_res, check_items=error)
# get roles
role_groups, _ = self.utility_wrap.list_roles(False)
@ -4493,8 +4605,8 @@ class TestUtilityNegativeRbac(TestcaseBase):
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
assert self.utility_wrap.role_is_exist()[0]
error = {"err_code": 35,
"err_msg": "fail to create role"}
error = {"err_code": 65535,
"err_msg": "role [name:\"%s\"] already exists" % r_name}
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role(check_task=CheckTasks.err_res, check_items=error)
self.utility_wrap.role_drop()
@ -4513,8 +4625,9 @@ class TestUtilityNegativeRbac(TestcaseBase):
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(name)
assert self.utility_wrap.role_is_exist()[0]
error = {"err_code": 5,
"err_msg": "the role[%s] is a default role, which can\'t be dropped" % name}
error = {"err_code": 1401,
"err_msg": "the role[%s] is a default role, which can't be dropped: "
"privilege not permitted" % name}
self.utility_wrap.role_drop(check_task=CheckTasks.err_res, check_items=error)
assert self.utility_wrap.role_is_exist()[0]
@ -4551,8 +4664,8 @@ class TestUtilityNegativeRbac(TestcaseBase):
self.utility_wrap.init_role(r_name)
assert not self.utility_wrap.role_is_exist()[0]
error = {"err_code": 37,
"err_msg": "fail to check the role name"}
error = {"err_code": 65535,
"err_msg": "not found the role, maybe the role isn't existed or internal system error"}
self.utility_wrap.role_add_user(user, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@ -4570,13 +4683,14 @@ class TestUtilityNegativeRbac(TestcaseBase):
self.utility_wrap.create_role()
assert self.utility_wrap.role_is_exist()[0]
error = {"err_code": 37,
"err_msg": "fail to check the username"}
self.utility_wrap.role_remove_user(user, check_task=CheckTasks.err_res, check_items=error)
error = {"err_code": 65535,
"err_msg": "not found the user, maybe the user isn't existed or internal system error"}
self.utility_wrap.role_remove_user(user)
self.utility_wrap.role_add_user(user, check_task=CheckTasks.err_res, check_items=error)
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("issue #29025")
@pytest.mark.parametrize("name", ["admin", "public"])
def test_remove_root_from_default_role(self, name, host, port):
"""
@ -4593,11 +4707,12 @@ class TestUtilityNegativeRbac(TestcaseBase):
self.utility_wrap.role_remove_user("root", check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("issue #29023")
def test_remove_user_from_unbind_role(self, host, port):
"""
target: remove user from unbind role
method: create new role and new user, remove user from unbind role
expected: fail to remove
expected: fail to remove
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
@ -4635,13 +4750,14 @@ class TestUtilityNegativeRbac(TestcaseBase):
self.utility_wrap.init_role(r_name)
assert not self.utility_wrap.role_is_exist()[0]
error = {"err_code": 37,
"err_msg": "fail to check the role name"}
error = {"err_code": 65535,
"err_msg": "not found the role, maybe the role isn't existed or internal system error"}
self.utility_wrap.role_remove_user(user, check_task=CheckTasks.err_res, check_items=error)
users, _ = self.utility_wrap.role_get_users()
assert user not in users
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("issue #29023")
def test_remove_not_exist_user_from_role(self, host, port):
"""
target: remove not exist user from role
@ -4694,8 +4810,8 @@ class TestUtilityNegativeRbac(TestcaseBase):
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
error = {"err_code": 42,
"err_msg": "there is no value on key = by-dev/meta/root-coord/credential/roles/%s" % r_name}
error = {"err_code": 65535,
"err_msg": "not found the role, maybe the role isn't existed or internal system error"}
self.utility_wrap.role_list_grants(check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@ -4711,7 +4827,7 @@ class TestUtilityNegativeRbac(TestcaseBase):
o_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
error = {"err_code": 42,
error = {"err_code": 65535,
"err_msg": f"not found the object type[name: {o_name}], supported the object types: [Global User "
f"Collection]"}
self.utility_wrap.role_list_grant(o_name, "*", check_task=CheckTasks.err_res, check_items=error)
@ -4730,8 +4846,9 @@ class TestUtilityNegativeRbac(TestcaseBase):
o_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
error = {"err_code": 41,
"err_msg": "the object type in the object entity[name: %s] is invalid" % o_name}
error = {"err_code": 65535,
"err_msg": "not found the object type[name: %s], supported the object types: "
"[Global User Collection]" % o_name}
self.utility_wrap.role_grant(o_name, "*", "*", check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@ -4747,7 +4864,7 @@ class TestUtilityNegativeRbac(TestcaseBase):
p_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
error = {"err_code": 41, "err_msg": "the privilege name[%s] in the privilege entity is invalid" % p_name}
error = {"err_code": 65535, "err_msg": "not found the privilege name[%s]" % p_name}
self.utility_wrap.role_grant("Global", "*", p_name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@ -4763,8 +4880,9 @@ class TestUtilityNegativeRbac(TestcaseBase):
o_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
error = {"err_code": 41,
"err_msg": "the object type in the object entity[name: %s] is invalid" % o_name}
error = {"err_code": 65535,
"err_msg": "not found the object type[name: %s], supported the object types: "
"[Collection Global User]" % o_name}
self.utility_wrap.role_revoke(o_name, "*", "*", check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@ -4780,7 +4898,7 @@ class TestUtilityNegativeRbac(TestcaseBase):
p_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
error = {"err_code": 41, "err_msg": "the privilege name[%s] in the privilege entity is invalid" % p_name}
error = {"err_code": 65535, "err_msg": "not found the privilege name[%s]" % p_name}
self.utility_wrap.role_revoke("Global", "*", p_name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@ -4836,6 +4954,7 @@ class TestUtilityNegativeRbac(TestcaseBase):
# grant role privilege: collection not existed -> no error
user, pwd, role = self.init_user_with_privilege("Collection", "rbac", "Flush", db_name=db_b)
time.sleep(60)
# grant role privilege: db_a collection provilege with database db_b
self.utility_wrap.role_grant("Collection", collection_w.name, "Flush", db_name=db_b)
@ -4855,7 +4974,7 @@ class TestUtilityNegativeRbac(TestcaseBase):
# collection flush with db_b permission
self.database_wrap.using_database(db_b)
collection_w.flush(check_task=CheckTasks.err_res,
check_items={ct.err_code: 4, ct.err_msg: "collection not found"})
check_items={ct.err_code: 100, ct.err_msg: "collection not found"})
self.database_wrap.using_database(db_a)
collection_w.flush(check_task=CheckTasks.check_permission_deny)
@ -4877,7 +4996,7 @@ class TestUtilityNegativeRbac(TestcaseBase):
self.utility_wrap.role_revoke("Global", "*", "All", db_name)
self.database_wrap.using_database(db_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "database not exist"})
check_items={ct.err_code: 800, ct.err_msg: "database not exist"})
self.database_wrap.create_database(db_name)
self.utility_wrap.role_grant("Global", "*", "All", db_name)
self.database_wrap.using_database(db_name)
@ -4924,6 +5043,7 @@ class TestUtilityNegativeRbac(TestcaseBase):
self.utility_wrap.create_user(username, pwd)
self.utility_wrap.init_role("admin")
self.utility_wrap.role_add_user(username)
time.sleep(60)
# create db_a and create collection in db_a
db_a = cf.gen_unique_str("a")
@ -4970,6 +5090,7 @@ class TestUtilityNegativeRbac(TestcaseBase):
self.utility_wrap.create_user(p_username, p_pwd)
self.utility_wrap.init_role("public")
self.utility_wrap.role_add_user(p_username)
time.sleep(60)
# re-connect with new user and db
self.connection_wrap.disconnect(alias=ct.default_alias)
@ -5013,6 +5134,7 @@ class TestUtilityNegativeRbac(TestcaseBase):
# grant role privilege: collection not existed in the db -> no error
user, pwd, role = self.init_user_with_privilege("Collection", coll_name, "Flush", db_name)
time.sleep(60)
# re-connect with new user and granted db
self.connection_wrap.disconnect(alias=ct.default_alias)
@ -5021,7 +5143,7 @@ class TestUtilityNegativeRbac(TestcaseBase):
# operate collection in the granted db
collection_w.flush(check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "CollectionNotExists"})
check_items={ct.err_code: 100, ct.err_msg: "CollectionNotExists"})
# operate collection in the default db
self.database_wrap.using_database(ct.default_db)
@ -5051,7 +5173,6 @@ class TestUtilityNegativeRbac(TestcaseBase):
self.utility_wrap.create_role(check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L3)
class TestUtilityFlushAll(TestcaseBase):