test: modify milvus client rbac test cases (#39079)

Signed-off-by: nico <cheng.yuan@zilliz.com>
pull/39249/merge
nico 2025-01-16 11:39:01 +08:00 committed by GitHub
parent 8c4ba70a4c
commit 93ce4b1c97
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 112 additions and 129 deletions

View File

@ -94,6 +94,7 @@ zh_vocabularies_distribution = {
"": 0.01
}
def patch_faker_text(fake_instance, vocabularies_distribution):
"""
Monkey patch the text() method of a Faker instance to include custom vocabulary.
@ -138,15 +139,10 @@ def patch_faker_text(fake_instance, vocabularies_distribution):
return '. '.join(sentences) + '.'
# Replace the original text method with our custom one
fake_instance.text = new_text
def get_bm25_ground_truth(corpus, queries, top_k=100, language="en"):
"""
Get the ground truth for BM25 search.
@ -213,6 +209,7 @@ def custom_tokenizer(language="en"):
)
return tokenizer
def manual_check_text_match(df, word, col):
id_list = []
for i in range(len(df)):
@ -234,6 +231,7 @@ def get_top_english_tokens(counter, n=10):
english_counter = Counter(english_tokens)
return english_counter.most_common(n)
def analyze_documents(texts, language="en"):
tokenizer = custom_tokenizer(language)
@ -256,7 +254,6 @@ def analyze_documents(texts, language="en"):
# Convert token ids back to words
word_freq = Counter({id_to_word[token_id]: count for token_id, count in freq.items()})
# if language in ["zh", "cn", "chinese"], remove the long words
# this is a trick to make the text match test case verification simple, because the long word can be still split
if language in ["zh", "cn", "chinese"]:
@ -539,8 +536,6 @@ def prepare_array_test_data(data_size, hit_rate=0.005, dim=128):
log.info(f"Proportion of arrays that have the target array length: {array_length_ratio}")
log.info(f"Proportion of arrays that have the target array access: {array_access_ratio}")
train_df = pd.DataFrame(data)
target_id = {
@ -553,7 +548,6 @@ def prepare_array_test_data(data_size, hit_rate=0.005, dim=128):
}
target_id_list = [target_id[key] for key in ["contains", "contains_any", "contains_all", "equals", "array_length", "array_access"]]
filters = [
"array_contains(contains, 42)",
"array_contains_any(contains_any, [21, 37, 42])",
@ -578,9 +572,12 @@ def gen_unique_str(str_value=None):
return "test_" + prefix if str_value is None else str_value + "_" + prefix
def gen_str_by_length(length=8, letters_only=False):
def gen_str_by_length(length=8, letters_only=False, contain_numbers=False):
if letters_only:
return "".join(random.choice(string.ascii_letters) for _ in range(length))
if contain_numbers:
return "".join(random.choice(string.ascii_letters) for _ in range(length-1)) + \
"".join(random.choice(string.digits))
return "".join(random.choice(string.ascii_letters + string.digits) for _ in range(length))

View File

@ -47,8 +47,8 @@ def pytest_addoption(parser):
parser.addoption('--field_name', action='store', default="field_name", help="field_name of index")
parser.addoption('--replica_num', action='store', default=ct.default_replica_num, help="memory replica number")
parser.addoption('--minio_host', action='store', default="localhost", help="minio service's ip")
parser.addoption('--uri', action='store', default="", help="uri for high level api")
parser.addoption('--token', action='store', default="", help="token for high level api")
parser.addoption('--uri', action='store', default="", help="uri for milvus client")
parser.addoption('--token', action='store', default="root:Milvus", help="token for milvus client")
parser.addoption("--request_duration", action="store", default="10m", help="request_duration")
@ -198,6 +198,7 @@ def uri(request):
def token(request):
return request.config.getoption("--token")
@pytest.fixture
def request_duration(request):
return request.config.getoption("--request_duration")

View File

@ -1,11 +1,15 @@
import time
import numpy as np
import pytest
from base.client_v2_base import TestMilvusClientV2Base
from utils.util_log import test_log as log
from common import common_func as cf
from common import common_type as ct
from utils.util_log import test_log as log
from base.client_v2_base import TestMilvusClientV2Base
from common.common_type import CaseLabel, CheckTasks
from utils.util_pymilvus import *
prefix = "client_rbac"
user_pre = "user"
@ -37,8 +41,7 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
teardown method: drop role and user
"""
log.info("[utility_teardown_method] Start teardown utility test cases ...")
uri = f"http://{cf.param_info.param_host}:{cf.param_info.param_port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
# drop users
users, _ = self.list_users(client)
@ -52,9 +55,9 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
roles, _ = self.list_roles(client)
for role in roles:
if role not in ['admin', 'public']:
role_info, _ = self.describe_role(client, role)
if role_info:
for privilege in role_info.get("privileges", []):
res, _ = self.describe_role(client, role)
if res['privileges']:
for privilege in res['privileges']:
self.revoke_privilege(client, role, privilege["object_type"],
privilege["privilege"], privilege["object_name"])
self.drop_role(client, role)
@ -72,8 +75,8 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
# check success link
res = self.list_collections(client)[0]
assert res == []
res = self.list_databases(client)[0]
assert res != []
def test_milvus_client_connect_using_user_password(self, host, port):
"""
@ -85,8 +88,8 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
client, _ = self.init_milvus_client(uri=uri, user=ct.default_user,
password=ct.default_password)
# check success link
res = self.list_collections(client)[0]
assert res == []
res = self.list_databases(client)[0]
assert res != []
def test_milvus_client_create_user(self, host, port):
"""
@ -94,14 +97,14 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
method: create user
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, user=ct.default_user, password=ct.default_password)
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length()
self.create_user(client, user_name=user_name, password=password)
# check
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, user=user_name, password=password)
res = self.list_collections(client)[0]
res = self.list_databases(client)[0]
assert res == []
def test_milvus_client_drop_user(self, host, port):
@ -110,8 +113,7 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
method: drop user
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, user=ct.default_user, password=ct.default_password)
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length()
self.create_user(client, user_name=user_name, password=password)
@ -127,17 +129,17 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
method: create a user and update password
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, user=ct.default_user, password=ct.default_password)
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length()
self.create_user(client, user_name=user_name, password=password)
new_password = cf.gen_str_by_length()
self.update_password(client, user_name=user_name, old_password=password, new_password=new_password)
# check
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, user=user_name, password=new_password)
res = self.list_collections(client)[0]
assert len(res) == 0
res = self.list_databases(client)[0]
assert res == []
self.init_milvus_client(uri=uri, user=user_name, password=password,
check_task=CheckTasks.check_auth_failure)
@ -147,8 +149,7 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
method: create a user and list users
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
user_name1 = cf.gen_unique_str(user_pre)
user_name2 = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length()
@ -163,8 +164,7 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
method: create a user and describe the user
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length()
self.create_user(client, user_name=user_name, password=password)
@ -185,8 +185,7 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
method: create a role
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
role_name = cf.gen_unique_str(role_pre)
self.create_role(client, role_name=role_name)
@ -196,8 +195,7 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
method: create a role and drop
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
role_name = cf.gen_unique_str(role_pre)
self.create_role(client, role_name=role_name)
self.drop_role(client, role_name=role_name)
@ -208,8 +206,7 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
method: create a role and describe
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
role_name = cf.gen_unique_str(role_pre)
self.create_role(client, role_name=role_name)
# describe a role that exists
@ -221,8 +218,7 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
method: create a role and list roles
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
role_name = cf.gen_unique_str(role_pre)
self.create_role(client, role_name=role_name)
res, _ = self.list_roles(client)
@ -234,11 +230,10 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
method: create a role and a user, then grant role to the user
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
user_name = cf.gen_unique_str(user_pre)
role_name = cf.gen_unique_str(role_pre)
password = cf.gen_str_by_length()
password = cf.gen_str_by_length(contain_numbers=True)
self.create_user(client, user_name=user_name, password=password)
self.create_role(client, role_name=role_name)
self.grant_role(client, user_name=user_name, role_name=role_name)
@ -249,11 +244,10 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
method: create a role and a user, then grant role to the user, then revoke
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
user_name = cf.gen_unique_str(user_pre)
role_name = cf.gen_unique_str(role_pre)
password = cf.gen_str_by_length()
password = cf.gen_str_by_length(contain_numbers=True)
self.create_user(client, user_name=user_name, password=password)
self.create_role(client, role_name=role_name)
# revoke a user that does not exist
@ -262,7 +256,6 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
self.grant_role(client, user_name=user_name, role_name=role_name)
self.revoke_role(client, user_name=user_name, role_name=role_name)
@pytest.mark.skip("TODO: need update for new privilege")
def test_milvus_client_grant_privilege(self, host, port):
"""
target: test milvus client api grant_privilege
@ -270,31 +263,31 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
expected: succeed
"""
# prepare a collection
uri = f"http://{host}:{port}"
client_root, _ = self.init_milvus_client(uri=uri, token=root_token)
client_root = self._client()
coll_name = cf.gen_unique_str()
self.create_collection(client_root, coll_name, default_dim, consistency_level="Strong")
# create a new role and a new user ( no privilege)
user_name = cf.gen_unique_str(user_pre)
role_name = cf.gen_unique_str(role_pre)
password = cf.gen_str_by_length()
password = cf.gen_str_by_length(contain_numbers=True)
self.create_user(client_root, user_name=user_name, password=password)
self.create_role(client_root, role_name=role_name)
self.grant_role(client_root, user_name=user_name, role_name=role_name)
# check the client role has no privilege of drop collection
# check the role has no privilege of drop collection
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, user=user_name, password=password)
self.drop_collection(client, coll_name, check_task=CheckTasks.check_permission_deny)
# client_root grants the role with the privilege of drop collection to the client role
# self.init_milvus_client(uri=uri, token=root_token)
self.grant_privilege(client_root, role_name, "Global", "*", "DropCollection")
# grant the role with the privilege of drop collection
self.grant_privilege(client_root, role_name, "Global", "DropCollection", "*")
time.sleep(10)
# check the client role has privilege of drop collection
# check the role has privilege of drop collection
self.drop_collection(client, coll_name)
@pytest.mark.skip("TODO: need update for new privilege")
@pytest.mark.skip("https://github.com/milvus-io/pymilvus/issues/1908")
def test_milvus_client_revoke_privilege(self, host, port):
"""
target: test milvus client api revoke_privilege
@ -302,27 +295,27 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
expected: succeed
"""
# prepare a collection
uri = f"http://{host}:{port}"
client_root, _ = self.init_milvus_client(uri=uri, token=root_token)
client_root = self._client()
coll_name = cf.gen_unique_str()
# create a new role and a new user ( no privilege)
user_name = cf.gen_unique_str(user_pre)
role_name = cf.gen_unique_str(role_pre)
password = cf.gen_str_by_length()
password = cf.gen_str_by_length(contain_numbers=True)
self.create_user(client_root, user_name=user_name, password=password)
self.create_role(client_root, role_name=role_name)
self.grant_role(client_root, user_name=user_name, role_name=role_name)
self.grant_privilege(client_root, role_name, "Global", "*", "CreateCollection")
time.sleep(60)
self.grant_privilege(client_root, role_name, "Global", "CreateCollection", "*")
time.sleep(10)
# check the role has privilege of create collection
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, user=user_name, password=password)
self.create_collection(client, coll_name, default_dim, consistency_level="Strong")
# revoke the role with the privilege of create collection
# self.init_milvus_client(uri=uri, token=root_token)
self.revoke_privilege(client_root, role_name, "Global", "*", "CreateCollection")
self.revoke_privilege(client_root, role_name, "Global", "CreateCollection", "*")
time.sleep(10)
# check the role has no privilege of create collection
self.create_collection(client, coll_name, default_dim, consistency_level="Strong",
@ -331,8 +324,8 @@ class TestMilvusClientRbacBase(TestMilvusClientV2Base):
@pytest.mark.tags(CaseLabel.RBAC)
class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
""" Test case of rbac interface """
""" Test case of rbac interface """
def test_milvus_client_init_token_invalid(self, host, port):
"""
target: test milvus client api token invalid
@ -341,8 +334,7 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
"""
uri = f"http://{host}:{port}"
wrong_token = root_token + "kk"
self.init_milvus_client(uri=uri, token=wrong_token,
check_task=CheckTasks.check_auth_failure)
self.init_milvus_client(uri=uri, token=wrong_token, check_task=CheckTasks.check_auth_failure)
def test_milvus_client_init_username_invalid(self, host, port):
"""
@ -352,8 +344,7 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
"""
uri = f"http://{host}:{port}"
invalid_user_name = ct.default_user + "nn"
self.init_milvus_client(uri=uri, user=invalid_user_name,
password=ct.default_password,
self.init_milvus_client(uri=uri, user=invalid_user_name, password=ct.default_password,
check_task=CheckTasks.check_auth_failure)
def test_milvus_client_init_password_invalid(self, host, port):
@ -364,8 +355,7 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
"""
uri = f"http://{host}:{port}"
wrong_password = ct.default_password + "kk"
self.init_milvus_client(uri=uri, user=ct.default_user,
password=wrong_password,
self.init_milvus_client(uri=uri, user=ct.default_user, password=wrong_password,
check_task=CheckTasks.check_auth_failure)
@pytest.mark.parametrize("invalid_name", ["", "0", "n@me", "h h"])
@ -375,14 +365,11 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
method: create using a wrong username
expected: raise exception
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
error_msg = "invalid user name"
if invalid_name == "":
error_msg = "username must be not empty"
client = self._client()
self.create_user(client, invalid_name, ct.default_password,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1100, ct.err_msg: error_msg})
check_items={ct.err_code: 1100,
ct.err_msg: "invalid parameter"})
@pytest.mark.parametrize("invalid_name", [1, [], None, {}])
def test_milvus_client_create_user_type_invalid(self, host, port, invalid_name):
@ -391,12 +378,11 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
method: create using a wrong username
expected: raise exception
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
error_msg = f"`user` value {invalid_name} is illegal"
client = self._client()
self.create_user(client, invalid_name, ct.default_password,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: error_msg})
check_items={ct.err_code: 1,
ct.err_msg: f"`user` value {invalid_name} is illegal"})
def test_milvus_client_create_user_exist(self, host, port):
"""
@ -404,8 +390,7 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
method: create using a wrong username
expected: raise exception
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
self.create_user(client, "root", ct.default_password,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 65535,
@ -418,9 +403,8 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
method: create using a wrong username
expected: raise exception
"""
uri = f"http://{host}:{port}"
client = self._client()
user_name = cf.gen_unique_str(user_pre)
client, _ = self.init_milvus_client(uri=uri, token=root_token)
self.create_user(client, user_name, invalid_password,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1100,
@ -433,14 +417,12 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
method: create using a wrong username
expected: raise exception
"""
uri = f"http://{host}:{port}"
client = self._client()
user_name = cf.gen_unique_str(user_pre)
client, _ = self.init_milvus_client(uri=uri, token=root_token)
error_msg = f"`password` value {invalid_password} is illegal"
self.create_user(client, user_name, invalid_password,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1,
ct.err_msg: error_msg})
ct.err_msg: f"`password` value {invalid_password} is illegal"})
def test_milvus_client_update_password_user_not_exist(self, host, port):
"""
@ -448,12 +430,12 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
method: create a user and update password
expected: raise exception
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, user=ct.default_user, password=ct.default_password)
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length()
new_password = cf.gen_str_by_length()
self.update_password(client, user_name=user_name, old_password=password, new_password=new_password,
password = cf.gen_str_by_length(contain_numbers=True)
new_password = cf.gen_str_by_length(contain_numbers=True)
self.update_password(client, user_name=user_name, old_password=password,
new_password=new_password,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1400,
ct.err_msg: "old password not correct for %s: "
@ -465,12 +447,11 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
method: create a user and update password
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, user=ct.default_user, password=ct.default_password)
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length()
password = cf.gen_str_by_length(contain_numbers=True)
self.create_user(client, user_name=user_name, password=password)
new_password = cf.gen_str_by_length()
new_password = cf.gen_str_by_length(contain_numbers=True)
wrong_password = password + 'kk'
self.update_password(client, user_name=user_name, old_password=wrong_password,
new_password=new_password, check_task=CheckTasks.err_res,
@ -484,10 +465,9 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
method: create a user and update password
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, user=ct.default_user, password=ct.default_password)
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length()
password = cf.gen_str_by_length(contain_numbers=True)
self.create_user(client, user_name=user_name, password=password)
self.update_password(client, user_name=user_name, old_password=password, new_password=password)
@ -498,31 +478,28 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
method: create a user and update password
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, user=ct.default_user, password=ct.default_password)
client = self._client()
user_name = cf.gen_unique_str(user_pre)
password = cf.gen_str_by_length()
password = cf.gen_str_by_length(contain_numbers=True)
self.create_user(client, user_name=user_name, password=password)
self.update_password(client, user_name=user_name, old_password=password,
new_password=invalid_password, check_task=CheckTasks.err_res,
check_items={ct.err_code: 1100,
ct.err_msg: "invalid password"})
def test_milvus_client_create_role_invalid(self, host, port):
def test_milvus_client_create_role_exist(self, host, port):
"""
target: test milvus client api create_role
method: create a role using invalid name
expected: raise exception
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
role_name = cf.gen_unique_str(role_pre)
self.create_role(client, role_name=role_name)
# create existed role and system reserved role
for name in ["admin", "public", role_name]:
error_msg = f'role [name:"{name}"] already exists'
self.create_role(client, role_name=name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 65535, ct.err_msg: error_msg})
# create existed role
error_msg = f'role [name:"{role_name}"] already exists'
self.create_role(client, role_name=role_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 65535, ct.err_msg: error_msg})
def test_milvus_client_drop_role_invalid(self, host, port):
"""
@ -530,22 +507,33 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
method: create a role and drop
expected: raise exception
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
role_name = cf.gen_unique_str(role_pre)
self.drop_role(client, role_name=role_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 65535,
ct.err_msg: "not found the role, maybe the role isn't "
"existed or internal system error"})
@pytest.mark.parametrize("role_name", ["admin", "public"])
def test_milvus_client_drop_built_in_role(self, host, port, role_name):
"""
target: test milvus client api drop_role
method: create a role and drop
expected: raise exception
"""
client = self._client()
self.drop_role(client, role_name=role_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 65535,
ct.err_msg: f"the role[{role_name}] is a default role, "
f"which can't be dropped"})
def test_milvus_client_describe_role_invalid(self, host, port):
"""
target: test milvus client api describe_role
method: describe a role using invalid name
expected: raise exception
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
# describe a role that does not exist
role_not_exist = cf.gen_unique_str(role_pre)
error_msg = "not found the role, maybe the role isn't existed or internal system error"
@ -558,8 +546,7 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
method: create a role and a user, then grant role to the user
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
user_name = cf.gen_unique_str(user_pre)
role_name = cf.gen_unique_str(role_pre)
self.create_role(client, role_name=role_name)
@ -575,11 +562,10 @@ class TestMilvusClientRbacInvalid(TestMilvusClientV2Base):
method: create a role and a user, then grant role to the user
expected: succeed
"""
uri = f"http://{host}:{port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
user_name = cf.gen_unique_str(user_pre)
role_name = cf.gen_unique_str(role_pre)
password = cf.gen_str_by_length()
password = cf.gen_str_by_length(contain_numbers=True)
self.create_user(client, user_name=user_name, password=password)
self.grant_role(client, user_name=user_name, role_name=role_name,
check_task=CheckTasks.err_res,
@ -597,8 +583,7 @@ class TestMilvusClientRbacAdvance(TestMilvusClientV2Base):
teardown method: drop role and user
"""
log.info("[utility_teardown_method] Start teardown utility test cases ...")
uri = f"http://{cf.param_info.param_host}:{cf.param_info.param_port}"
client, _ = self.init_milvus_client(uri=uri, token=root_token)
client = self._client()
# drop users
users, _ = self.list_users(client)
@ -612,9 +597,9 @@ class TestMilvusClientRbacAdvance(TestMilvusClientV2Base):
roles, _ = self.list_roles(client)
for role in roles:
if role not in ['admin', 'public']:
privileges, _ = self.describe_role(client, role)
if privileges:
for privilege in privileges:
res, _ = self.describe_role(client, role)
if res['privileges']:
for privilege in res['privileges']:
self.revoke_privilege(client, role, privilege["object_type"],
privilege["privilege"], privilege["object_name"])
self.drop_role(client, role)