milvus/tests/python_client/testcases/test_utility.py

6578 lines
317 KiB
Python

import threading
import time
import pytest
from pymilvus import DefaultConfig
from pymilvus.exceptions import MilvusException
from base.client_base import TestcaseBase
from base.collection_wrapper import ApiCollectionWrapper
from base.utility_wrapper import ApiUtilityWrapper
from common.common_params import FieldParams, DefaultVectorIndexParams, DefaultVectorSearchParams
from utils.util_log import test_log as log
from common import common_func as cf
from common import common_type as ct
from common.common_type import CaseLabel, CheckTasks
from common.milvus_sys import MilvusSys
from pymilvus.grpc_gen.common_pb2 import SegmentState
import random
from pymilvus.client.types import ResourceGroupConfig
import copy
prefix = "utility"
default_schema = cf.gen_default_collection_schema()
default_int64_field_name = ct.default_int64_field_name
default_field_name = ct.default_float_vec_field_name
default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
default_dim = ct.default_dim
default_nb = ct.default_nb
num_loaded_entities = "num_loaded_entities"
num_total_entities = "num_total_entities"
loading_progress = "loading_progress"
num_loaded_partitions = "num_loaded_partitions"
not_loaded_partitions = "not_loaded_partitions"
exp_name = "name"
exp_schema = "schema"
class TestUtilityParams(TestcaseBase):
""" Test case of index interface """
@pytest.fixture(scope="function", params=["JACCARD", "Superstructure", "Substructure"])
def get_not_support_metric(self, request):
yield request.param
@pytest.fixture(scope="function", params=["metric_type", "metric"])
def get_support_metric_field(self, request):
yield request.param
@pytest.fixture(scope="function", params=ct.get_not_string)
def get_invalid_type_collection_name(self, request):
yield request.param
@pytest.fixture(scope="function", params=ct.invalid_resource_names)
def get_invalid_value_collection_name(self, request):
yield request.param
"""
******************************************************************
# The followings are invalid cases
******************************************************************
"""
@pytest.mark.tags(CaseLabel.L2)
def test_has_collection_name_type_invalid(self, get_invalid_type_collection_name):
"""
target: test has_collection with error collection name
method: input invalid name
expected: raise exception
"""
self._connect()
c_name = get_invalid_type_collection_name
self.utility_wrap.has_collection(c_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 999,
ct.err_msg: f"`collection_name` value {c_name} is illegal"})
@pytest.mark.tags(CaseLabel.L2)
def test_has_collection_name_value_invalid(self, get_invalid_value_collection_name):
"""
target: test has_collection with error collection name
method: input invalid name
expected: raise exception
"""
self._connect()
c_name = get_invalid_value_collection_name
error = {ct.err_code: 999, ct.err_msg: f"Invalid collection name: {c_name}"}
if c_name in [None, ""]:
error = {ct.err_code: 999, ct.err_msg: f"`collection_name` value {c_name} is illegal"}
self.utility_wrap.has_collection(c_name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_has_partition_collection_name_type_invalid(self, get_invalid_type_collection_name):
"""
target: test has_partition with error collection name
method: input invalid name
expected: raise exception
"""
self._connect()
c_name = get_invalid_type_collection_name
p_name = cf.gen_unique_str(prefix)
self.utility_wrap.has_partition(c_name, p_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 999,
ct.err_msg: f"`collection_name` value {c_name} is illegal"})
@pytest.mark.tags(CaseLabel.L2)
def test_has_partition_collection_name_value_invalid(self, get_invalid_value_collection_name):
"""
target: test has_partition with error collection name
method: input invalid name
expected: raise exception
"""
self._connect()
c_name = get_invalid_value_collection_name
p_name = cf.gen_unique_str(prefix)
error = {ct.err_code: 999, ct.err_msg: f"Invalid collection name: {c_name}"}
if c_name in [None, ""]:
error = {ct.err_code: 999, ct.err_msg: f"`collection_name` value {c_name} is illegal"}
self.utility_wrap.has_partition(c_name, p_name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_has_partition_name_type_invalid(self, get_invalid_type_collection_name):
"""
target: test has_partition with error partition name
method: input invalid name
expected: raise exception
"""
self._connect()
ut = ApiUtilityWrapper()
c_name = cf.gen_unique_str(prefix)
p_name = get_invalid_type_collection_name
ut.has_partition(c_name, p_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 999,
ct.err_msg: f"`partition_name` value {p_name} is illegal"})
@pytest.mark.tags(CaseLabel.L2)
def test_has_partition_name_value_invalid(self, get_invalid_value_collection_name):
"""
target: test has_partition with error partition name
method: input invalid name
expected: raise exception
"""
self._connect()
ut = ApiUtilityWrapper()
c_name = cf.gen_unique_str(prefix)
p_name = get_invalid_value_collection_name
if p_name == "12name":
pytest.skip("partition name 12name is legal")
if p_name == "n-ame":
pytest.skip("https://github.com/milvus-io/milvus/issues/39432")
error = {ct.err_code: 999, ct.err_msg: f"Invalid partition name: {p_name}"}
if p_name in [None]:
error = {ct.err_code: 999, ct.err_msg: f"`partition_name` value {p_name} is illegal"}
elif p_name in [" ", ""]:
error = {ct.err_code: 999, ct.err_msg: "Invalid partition name: . Partition name should not be empty."}
ut.has_partition(c_name, p_name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_drop_collection_name_type_invalid(self, get_invalid_type_collection_name):
self._connect()
c_name = get_invalid_type_collection_name
self.utility_wrap.drop_collection(c_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 999,
ct.err_msg: f"`collection_name` value {c_name} is illegal"})
@pytest.mark.tags(CaseLabel.L2)
def test_drop_collection_name_value_invalid(self, get_invalid_value_collection_name):
self._connect()
c_name = get_invalid_value_collection_name
if c_name in [None, ""]:
error = {ct.err_code: 999, ct.err_msg: f"`collection_name` value {c_name} is illegal"}
self.utility_wrap.drop_collection(c_name, check_task=CheckTasks.err_res, check_items=error)
else:
self.utility_wrap.drop_collection(c_name)
# TODO: enable
@pytest.mark.tags(CaseLabel.L2)
def test_list_collections_using_invalid(self):
"""
target: test list_collections with invalid using
method: input invalid name
expected: raise exception
"""
self._connect()
using = "empty"
ut = ApiUtilityWrapper()
ex, _ = ut.list_collections(using=using, check_task=CheckTasks.err_res,
check_items={ct.err_code: 0, ct.err_msg: "should create connect"})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("invalid_name", ct.invalid_resource_names)
def test_index_process_invalid_name(self, invalid_name):
"""
target: test building_process
method: input invalid name
expected: raise exception
"""
self._connect()
error = {ct.err_code: 999, ct.err_msg: f"Invalid collection name: {invalid_name}"}
if invalid_name in [None, ""]:
error = {ct.err_code: 999, ct.err_msg: "collection name should not be empty"}
self.utility_wrap.index_building_progress(collection_name=invalid_name,
check_task=CheckTasks.err_res, check_items=error)
# TODO: not support index name
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("invalid_index_name", ct.invalid_resource_names)
def test_index_process_invalid_index_name(self, invalid_index_name):
"""
target: test building_process
method: input invalid index name
expected: raise exception
"""
self._connect()
collection_w = self.init_collection_wrap()
error = {ct.err_code: 999, ct.err_msg: "index not found"}
self.utility_wrap.index_building_progress(collection_name=collection_w.name, index_name=invalid_index_name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip("not ready")
def test_wait_index_invalid_name(self, get_invalid_type_collection_name):
"""
target: test wait_index
method: input invalid name
expected: raise exception
"""
pass
# self._connect()
# c_name = get_invalid_collection_name
# ut = ApiUtilityWrapper()
# if isinstance(c_name, str) and c_name:
# ex, _ = ut.wait_for_index_building_complete(c_name,
# check_items={ct.err_code: 1,
# ct.err_msg: "Invalid collection name"})
@pytest.mark.tags(CaseLabel.L1)
def _test_wait_index_invalid_index_name(self, get_invalid_index_name):
"""
target: test wait_index
method: input invalid index name
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
index_name = get_invalid_index_name
ut = ApiUtilityWrapper()
ex, _ = ut.wait_for_index_building_complete(c_name, index_name)
log.error(str(ex))
assert "invalid" or "illegal" in str(ex)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("invalid_c_name", ["12-s", "12 s", "(mn)", "中文", "%$#"])
def test_loading_progress_invalid_collection_name(self, invalid_c_name):
"""
target: test loading progress with invalid collection name
method: input invalid collection name
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
df = cf.gen_default_dataframe_data()
self.collection_wrap.construct_from_dataframe(c_name, df, primary_field=ct.default_int64_field_name)
self.collection_wrap.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
self.collection_wrap.load()
error = {ct.err_code: 1, ct.err_msg: "Invalid collection name: {}".format(invalid_c_name)}
self.utility_wrap.loading_progress(invalid_c_name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_loading_progress_not_existed_collection_name(self):
"""
target: test loading progress with invalid collection name
method: input invalid collection name
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
df = cf.gen_default_dataframe_data()
self.collection_wrap.construct_from_dataframe(c_name, df, primary_field=ct.default_int64_field_name)
self.collection_wrap.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
self.collection_wrap.load()
error = {ct.err_code: 100, ct.err_msg: "collection not found[database=default][collection=not_existed_name]"}
self.utility_wrap.loading_progress("not_existed_name", check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.parametrize("partition_name", ct.invalid_resource_names)
def test_loading_progress_invalid_partition_names(self, partition_name):
"""
target: test loading progress with invalid partition names
method: input invalid partition names
expected: raise an exception
"""
if partition_name == "":
pytest.skip("https://github.com/milvus-io/milvus/issues/38223")
collection_w = self.init_collection_general(prefix, nb=10)[0]
partition_names = [partition_name]
collection_w.load()
err_msg = {ct.err_code: 999, ct.err_msg: "partition not found"}
if partition_name is None:
err_msg = {ct.err_code: 999, ct.err_msg: "is illegal"}
self.utility_wrap.loading_progress(collection_w.name, partition_names,
check_task=CheckTasks.err_res, check_items=err_msg)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("partition_names", [[ct.default_tag], [ct.default_partition_name, ct.default_tag]])
def test_loading_progress_not_existed_partitions(self, partition_names):
"""
target: test loading progress with not existed partitions
method: input all or part not existed partition names
expected: raise exception
"""
collection_w = self.init_collection_general(prefix, nb=10)[0]
collection_w.load()
err_msg = {ct.err_code: 15, ct.err_msg: f"partition not found"}
self.utility_wrap.loading_progress(collection_w.name, partition_names,
check_task=CheckTasks.err_res, check_items=err_msg)
@pytest.mark.tags(CaseLabel.L2)
def test_wait_for_loading_collection_not_existed(self):
"""
target: test wait for loading
method: input collection not created before
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
self.utility_wrap.wait_for_loading_complete(
c_name,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 100, ct.err_msg: "collection not found[database=default]"
"[collection={}]".format(c_name)})
@pytest.mark.tags(CaseLabel.L2)
def test_wait_for_loading_partition_not_existed(self):
"""
target: test wait for loading
method: input partition not created before
expected: raise exception
"""
self._connect()
collection_w = self.init_collection_wrap()
self.utility_wrap.wait_for_loading_complete(
collection_w.name, partition_names=[ct.default_tag],
check_task=CheckTasks.err_res,
check_items={ct.err_code: 200, ct.err_msg: f'partition not found[partition={ct.default_tag}]'})
@pytest.mark.tags(CaseLabel.L2)
def test_drop_collection_not_existed(self):
"""
target: test drop an not existed collection
method: drop a not created collection
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
# error = {ct.err_code: 1, ct.err_msg: f"DescribeCollection failed: can't find collection: {c_name}"}
# self.utility_wrap.drop_collection(c_name, check_task=CheckTasks.err_res, check_items=error)
# @longjiquan: dropping collection should be idempotent.
self.utility_wrap.drop_collection(c_name)
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_left_vector_invalid_type(self, get_invalid_vector_dict):
"""
target: test calculated distance with invalid vectors
method: input invalid vectors type
expected: raise exception
"""
self._connect()
invalid_vector = get_invalid_vector_dict
if not isinstance(invalid_vector, dict):
self.utility_wrap.calc_distance(invalid_vector, invalid_vector,
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": "vectors_left value {} "
"is illegal".format(invalid_vector)})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_left_vector_invalid_value(self, get_invalid_vector_dict):
"""
target: test calculated distance with invalid vectors
method: input invalid vectors value
expected: raise exception
"""
self._connect()
invalid_vector = get_invalid_vector_dict
if isinstance(invalid_vector, dict):
self.utility_wrap.calc_distance(invalid_vector, invalid_vector,
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": "vectors_left value {} "
"is illegal".format(invalid_vector)})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_right_vector_invalid_type(self, get_invalid_vector_dict):
"""
target: test calculated distance with invalid vectors
method: input invalid vectors type
expected: raise exception
"""
self._connect()
invalid_vector = get_invalid_vector_dict
vector = cf.gen_vectors(default_nb, default_dim)
op_l = {"float_vectors": vector}
if not isinstance(invalid_vector, dict):
self.utility_wrap.calc_distance(op_l, invalid_vector,
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": "vectors_right value {} "
"is illegal".format(invalid_vector)})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip(reason="calc_distance interface is no longer supported")
def test_calc_distance_right_vector_invalid_value(self, get_invalid_vector_dict):
"""
target: test calculated distance with invalid vectors
method: input invalid vectors value
expected: raise exception
"""
self._connect()
invalid_vector = get_invalid_vector_dict
vector = cf.gen_vectors(default_nb, default_dim)
op_l = {"float_vectors": vector}
if isinstance(invalid_vector, dict):
self.utility_wrap.calc_distance(op_l, invalid_vector,
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": "vectors_right value {} "
"is illegal".format(invalid_vector)})
@pytest.mark.tags(CaseLabel.L1)
def test_rename_collection_old_invalid_type(self, get_invalid_type_collection_name):
"""
target: test rename_collection when the type of old collection name is not valid
method: input not invalid collection name
expected: raise exception
"""
self._connect()
collection_w, vectors, _, insert_ids, _ = self.init_collection_general(prefix)
old_collection_name = get_invalid_type_collection_name
new_collection_name = cf.gen_unique_str(prefix)
self.utility_wrap.rename_collection(old_collection_name, new_collection_name,
check_task=CheckTasks.err_res,
check_items={"err_code": 999,
"err_msg": "`collection_name` value {} is illegal".format(
old_collection_name)})
@pytest.mark.tags(CaseLabel.L1)
def test_rename_collection_old_invalid_value(self, get_invalid_value_collection_name):
"""
target: test rename_collection when the value of old collection name is not valid
method: input not invalid collection name
expected: raise exception
"""
self._connect()
collection_w, vectors, _, insert_ids, _ = self.init_collection_general(prefix)
old_collection_name = get_invalid_value_collection_name
new_collection_name = cf.gen_unique_str(prefix)
error = {"err_code": 4, "err_msg": "collection not found"}
if old_collection_name in [None, ""]:
error = {"err_code": 999, "err_msg": "is illegal"}
self.utility_wrap.rename_collection(old_collection_name, new_collection_name,
check_task=CheckTasks.err_res,
check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_rename_collection_new_invalid_type(self, get_invalid_type_collection_name):
"""
target: test rename_collection when the type of new collection name is not valid
method: input not invalid collection name
expected: raise exception
"""
self._connect()
collection_w, vectors, _, insert_ids, _ = self.init_collection_general(prefix)
old_collection_name = collection_w.name
new_collection_name = get_invalid_type_collection_name
self.utility_wrap.rename_collection(old_collection_name, new_collection_name,
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": "`collection_name` value {} is "
"illegal".format(new_collection_name)})
@pytest.mark.tags(CaseLabel.L2)
def test_rename_collection_new_invalid_value(self, get_invalid_value_collection_name):
"""
target: test rename_collection when the value of new collection name is not valid
method: input not invalid collection name
expected: raise exception
"""
self._connect()
collection_w, vectors, _, insert_ids, _ = self.init_collection_general(prefix)
old_collection_name = collection_w.name
new_collection_name = get_invalid_value_collection_name
error = {"err_code": 1100, "err_msg": "Invalid collection name"}
if new_collection_name in [None, ""]:
error = {"err_code": 999, "err_msg": f"`collection_name` value {new_collection_name} is illegal"}
self.utility_wrap.rename_collection(old_collection_name, new_collection_name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_rename_collection_not_existed_collection(self):
"""
target: test rename_collection when the collection name is not existed
method: input not existing collection name
expected: raise exception
"""
self._connect()
collection_w, vectors, _, insert_ids, _ = self.init_collection_general(prefix)
old_collection_name = "test_collection_non_exist"
new_collection_name = cf.gen_unique_str(prefix)
self.utility_wrap.rename_collection(old_collection_name, new_collection_name,
check_task=CheckTasks.err_res,
check_items={"err_code": 100,
"err_msg": "collection not found[database=1][collection"
"={}]".format(old_collection_name)})
@pytest.mark.tags(CaseLabel.L1)
def test_rename_collection_existed_collection_name(self):
"""
target: test rename_collection when the collection name is existed
method: input existing collection name
expected: raise exception
"""
self._connect()
collection_w, vectors, _, insert_ids, _ = self.init_collection_general(prefix)
old_collection_name = collection_w.name
self.utility_wrap.rename_collection(old_collection_name, old_collection_name,
check_task=CheckTasks.err_res,
check_items={"err_code": 65535,
"err_msg": "duplicated new collection name default:{}"
" with other collection name or"
" alias".format(collection_w.name)})
@pytest.mark.tags(CaseLabel.L1)
def test_rename_collection_existed_collection_alias(self):
"""
target: test rename_collection when the collection alias is existed
method: input existing collection alias
expected: raise exception
"""
self._connect()
collection_w, vectors, _, insert_ids, _ = self.init_collection_general(prefix)
old_collection_name = collection_w.name
alias = "test_alias"
self.utility_wrap.create_alias(old_collection_name, alias)
self.utility_wrap.rename_collection(old_collection_name, alias,
check_task=CheckTasks.err_res,
check_items={"err_code": 65535,
"err_msg": f"cannot rename collection to an existing alias: {alias}"})
@pytest.mark.tags(CaseLabel.L1)
def test_rename_collection_using_alias(self):
"""
target: test rename_collection using alias
method: rename collection using alias name
expected: raise exception
"""
self._connect()
collection_w = self.init_collection_general(prefix)[0]
old_collection_name = collection_w.name
alias = cf.gen_unique_str(prefix + "alias")
new_collection_name = cf.gen_unique_str(prefix + "new")
self.utility_wrap.create_alias(old_collection_name, alias)
self.utility_wrap.rename_collection(alias, new_collection_name,
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": "unsupported use an alias to "
"rename collection, alias:{}".format(alias)})
class TestUtilityBase(TestcaseBase):
""" Test case of index interface """
@pytest.fixture(scope="function", params=["metric_type", "metric"])
def metric_field(self, request):
yield request.param
@pytest.fixture(scope="function", params=[True, False])
def sqrt(self, request):
yield request.param
@pytest.fixture(scope="function", params=["L2", "IP"])
def metric(self, request):
yield request.param
@pytest.fixture(scope="function", params=["HAMMING", "JACCARD"])
def metric_binary(self, request):
yield request.param
@pytest.mark.tags(CaseLabel.L1)
def test_has_collection(self):
"""
target: test has_collection with collection name
method: input collection name created before
expected: True
"""
cw = self.init_collection_wrap()
res, _ = self.utility_wrap.has_collection(cw.name)
assert res is True
@pytest.mark.tags(CaseLabel.L2)
def test_has_collection_not_created(self):
"""
target: test has_collection with collection name which is not created
method: input random collection name
expected: False
"""
c_name = cf.gen_unique_str(prefix)
_ = self.init_collection_wrap()
res, _ = self.utility_wrap.has_collection(c_name)
assert res is False
@pytest.mark.tags(CaseLabel.L1)
def test_has_collection_after_drop(self):
"""
target: test has_collection with collection name droped before
method: input random collection name
expected: False
"""
c_name = cf.gen_unique_str(prefix)
cw = self.init_collection_wrap(name=c_name)
res, _ = self.utility_wrap.has_collection(c_name)
assert res is True
cw.drop()
res, _ = self.utility_wrap.has_collection(c_name)
assert res is False
@pytest.mark.tags(CaseLabel.L1)
def test_has_partition(self):
"""
target: test has_partition with partition name
method: input collection name and partition name created before
expected: True
"""
c_name = cf.gen_unique_str(prefix)
p_name = cf.gen_unique_str(prefix)
cw = self.init_collection_wrap(name=c_name)
self.init_partition_wrap(cw, p_name)
res, _ = self.utility_wrap.has_partition(c_name, p_name)
assert res is True
@pytest.mark.tags(CaseLabel.L2)
def test_has_partition_not_created(self):
"""
target: test has_partition with partition name
method: input collection name, and partition name not created before
expected: True
"""
c_name = cf.gen_unique_str(prefix)
p_name = cf.gen_unique_str()
self.init_collection_wrap(name=c_name)
res, _ = self.utility_wrap.has_partition(c_name, p_name)
assert res is False
@pytest.mark.tags(CaseLabel.L1)
def test_has_partition_after_drop(self):
"""
target: test has_partition with partition name
method: input collection name, and partition name dropped
expected: True
"""
c_name = cf.gen_unique_str(prefix)
p_name = cf.gen_unique_str()
cw = self.init_collection_wrap(name=c_name)
pw = self.init_partition_wrap(cw, p_name)
res, _ = self.utility_wrap.has_partition(c_name, p_name)
assert res is True
pw.drop()
res, _ = self.utility_wrap.has_partition(c_name, p_name)
assert res is False
@pytest.mark.tags(CaseLabel.L2)
def test_has_default_partition(self):
"""
target: test has_partition with '_default' partition
method: input collection name and partition name created before
expected: True
"""
c_name = cf.gen_unique_str(prefix)
self.init_collection_wrap(name=c_name)
res, _ = self.utility_wrap.has_partition(c_name, ct.default_partition_name)
assert res is True
@pytest.mark.tags(CaseLabel.L1)
def test_list_collections(self):
"""
target: test list_collections
method: create collection, list_collections
expected: in the result
"""
c_name = cf.gen_unique_str(prefix)
self.init_collection_wrap(name=c_name)
res, _ = self.utility_wrap.list_collections()
assert c_name in res
# TODO: make sure all collections deleted
@pytest.mark.tags(CaseLabel.L1)
def _test_list_collections_no_collection(self):
"""
target: test list_collections
method: no collection created, list_collections
expected: length of the result equals to 0
"""
self._connect()
res, _ = self.utility_wrap.list_collections()
assert len(res) == 0
@pytest.mark.tags(CaseLabel.L2)
def test_index_process_collection_not_existed(self):
"""
target: test building_process
method: input collection not created before
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
self.utility_wrap.index_building_progress(
c_name,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 4, ct.err_msg: "collection not found"})
@pytest.mark.tags(CaseLabel.L1)
def test_index_process_collection_empty(self):
"""
target: test building_process
method: input empty collection
expected: no exception raised
"""
c_name = cf.gen_unique_str(prefix)
cw = self.init_collection_wrap(name=c_name)
self.index_wrap.init_index(cw.collection, default_field_name, default_index_params)
res, _ = self.utility_wrap.index_building_progress(c_name)
exp_res = {'total_rows': 0, 'indexed_rows': 0, 'pending_index_rows': 0, 'state': 'Finished'}
assert res == exp_res
@pytest.mark.tags(CaseLabel.L2)
def test_index_process_collection_insert_no_index(self):
"""
target: test building_process
method: insert 1 entity, no index created
expected: no exception raised
"""
nb = 1
c_name = cf.gen_unique_str(prefix)
cw = self.init_collection_wrap(name=c_name)
data = cf.gen_default_list_data(nb)
cw.insert(data=data)
error = {ct.err_code: 999, ct.err_msg: f"index not found[collection={c_name}]"}
self.utility_wrap.index_building_progress(c_name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L1)
def test_index_process_collection_index(self):
"""
target: test building_process
method: 1.insert 1024 (because minSegmentSizeToEnableIndex=1024)
2.build(server does create index) and call building_process
expected: indexed_rows=nb
"""
nb = 1024
c_name = cf.gen_unique_str(prefix)
cw = self.init_collection_wrap(name=c_name)
data = cf.gen_default_list_data(nb)
cw.insert(data=data)
cw.create_index(default_field_name, default_index_params)
cw.flush()
res, _ = self.utility_wrap.index_building_progress(c_name)
# The indexed_rows may be 0 due to compaction,
# remove this assertion for now
# assert res['indexed_rows'] == nb
assert res['total_rows'] == nb
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason='wait to modify')
def test_index_process_collection_indexing(self):
"""
target: test building_process
method: 1.insert 2048 entities to ensure that server will build
2.call building_process during building
expected: 2048 or less entities indexed
"""
nb = 2048
c_name = cf.gen_unique_str(prefix)
cw = self.init_collection_wrap(name=c_name)
data = cf.gen_default_list_data(nb)
cw.insert(data=data)
cw.create_index(default_field_name, default_index_params)
cw.flush()
start = time.time()
while True:
time.sleep(1)
res, _ = self.utility_wrap.index_building_progress(c_name)
if 0 < res['indexed_rows'] <= nb:
break
if time.time() - start > 5:
raise MilvusException(1, f"Index build completed in more than 5s")
@pytest.mark.tags(CaseLabel.L2)
def test_wait_index_collection_not_existed(self):
"""
target: test wait_index
method: input collection not created before
expected: raise exception
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
self.utility_wrap.wait_for_index_building_complete(
c_name,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 4, ct.err_msg: "collection not found"})
@pytest.mark.tags(CaseLabel.L1)
def test_wait_index_collection_empty(self):
"""
target: test wait_index
method: input empty collection
expected: no exception raised
"""
self._connect()
c_name = cf.gen_unique_str(prefix)
cw = self.init_collection_wrap(name=c_name)
cw.create_index(default_field_name, default_index_params)
assert self.utility_wrap.wait_for_index_building_complete(c_name)[0]
res, _ = self.utility_wrap.index_building_progress(c_name)
exp_res = {'total_rows': 0, 'indexed_rows': 0, 'pending_index_rows': 0, 'state': 'Finished'}
assert res == exp_res
@pytest.mark.tags(CaseLabel.L1)
def test_wait_index_collection_index(self):
"""
target: test wait_index
method: insert 5000 entities, build and call wait_index
expected: 5000 entity indexed
"""
nb = 5000
c_name = cf.gen_unique_str(prefix)
cw = self.init_collection_wrap(name=c_name)
data = cf.gen_default_list_data(nb)
cw.insert(data=data)
cw.flush()
cw.create_index(default_field_name, default_index_params)
res, _ = self.utility_wrap.wait_for_index_building_complete(c_name)
assert res is True
res, _ = self.utility_wrap.index_building_progress(c_name)
assert res["indexed_rows"] == nb
@pytest.mark.tags(CaseLabel.L2)
def test_loading_progress_without_loading(self):
"""
target: test loading progress without loading
method: insert and flush data, call loading_progress without loading
expected: raise exception
"""
collection_w = self.init_collection_wrap()
df = cf.gen_default_dataframe_data()
collection_w.insert(df)
assert collection_w.num_entities == ct.default_nb
self.utility_wrap.loading_progress(collection_w.name,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 101,
ct.err_msg: 'collection not loaded'})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("nb", [ct.default_nb, 5000])
def test_loading_progress_collection(self, nb):
"""
target: test loading progress
method: 1.insert flush and load 2.call loading_progress
expected: all entities is loafed, because load is synchronous
"""
# create, insert default_nb, flush and load
collection_w = self.init_collection_general(prefix, insert_data=True, nb=nb)[0]
res, _ = self.utility_wrap.loading_progress(collection_w.name)
assert res[loading_progress] == '100%'
@pytest.mark.tags(CaseLabel.L2)
def test_loading_progress_with_async_load(self):
"""
target: test loading progress with async collection load
method: 1.load collection with async=True 2.loading_progress
expected: loading part entities
"""
collection_w = self.init_collection_wrap()
df = cf.gen_default_dataframe_data()
collection_w.insert(df)
assert collection_w.num_entities == ct.default_nb
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load(_async=True)
res, _ = self.utility_wrap.loading_progress(collection_w.name)
loading_int = cf.percent_to_int(res[loading_progress])
if -1 != loading_int:
assert (0 <= loading_int <= 100)
else:
log.info("The output of loading progress is not a string or a percentage")
@pytest.mark.tags(CaseLabel.L2)
def test_loading_progress_empty_collection(self):
"""
target: test loading_progress on an empty collection
method: 1.create collection and no insert 2.loading_progress
expected: 0 entities is loaded
"""
collection_w = self.init_collection_wrap()
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
res, _ = self.utility_wrap.loading_progress(collection_w.name)
exp_res = {loading_progress: '100%'}
assert exp_res == res
@pytest.mark.tags(CaseLabel.L1)
def test_loading_progress_after_release(self):
"""
target: test loading progress after release
method: insert and flush data, call loading_progress after release
expected: return successfully with 0%
"""
collection_w = self.init_collection_general(prefix, insert_data=True, nb=100)[0]
collection_w.release()
error = {ct.err_code: 999, ct.err_msg: "collection not loaded"}
self.utility_wrap.loading_progress(collection_w.name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_loading_progress_with_release_partition(self):
"""
target: test loading progress after release part partitions
method: 1.insert data into two partitions and flush
2.load one partition and release one partition
expected: loaded one partition entities
"""
half = ct.default_nb
# insert entities into two partitions, collection flush and load
collection_w, partition_w, _, _ = self.insert_entities_into_two_partitions_in_half(half)
partition_w.release()
res = self.utility_wrap.loading_progress(collection_w.name)[0]
assert res[loading_progress] == '100%'
@pytest.mark.tags(CaseLabel.L2)
def test_loading_progress_with_load_partition(self):
"""
target: test loading progress after load partition
method: 1.insert data into two partitions and flush
2.load one partition and loading progress
expected: loaded one partition entities
"""
half = ct.default_nb
collection_w, partition_w, _, _ = self.insert_entities_into_two_partitions_in_half(half)
collection_w.release()
partition_w.load()
res = self.utility_wrap.loading_progress(collection_w.name)[0]
assert res[loading_progress] == '100%'
@pytest.mark.tags(CaseLabel.L1)
def test_loading_progress_with_partition(self):
"""
target: test loading progress with partition
method: 1.insert data into two partitions and flush, and load
2.loading progress with one partition
expected: loaded one partition entities
"""
half = ct.default_nb
collection_w, partition_w, _, _ = self.insert_entities_into_two_partitions_in_half(half)
res = self.utility_wrap.loading_progress(collection_w.name, partition_names=[partition_w.name])[0]
assert res[loading_progress] == '100%'
@pytest.mark.tags(CaseLabel.ClusterOnly)
def test_loading_progress_multi_replicas(self):
"""
target: test loading progress with multi replicas
method: 1.Create collection and insert data
2.Load replicas and get loading progress
3.Create partitions and insert data
4.Get loading progress
5.Release and re-load replicas, get loading progress
expected: Verify loading progress result
"""
collection_w = self.init_collection_wrap()
collection_w.insert(cf.gen_default_dataframe_data())
assert collection_w.num_entities == ct.default_nb
default_index = {"index_type": "IVF_FLAT", "params": {"nlist": 128}, "metric_type": "L2"}
collection_w.create_index("float_vector", default_index)
collection_w.load(partition_names=[ct.default_partition_name], replica_number=2)
res_collection, _ = self.utility_wrap.loading_progress(collection_w.name)
assert res_collection == {loading_progress: '100%'}
# create partition and insert
partition_w = self.init_partition_wrap(collection_wrap=collection_w)
partition_w.insert(cf.gen_default_dataframe_data(start=ct.default_nb))
assert partition_w.num_entities == ct.default_nb
res_part_partition, _ = self.utility_wrap.loading_progress(collection_w.name)
assert res_part_partition == {'loading_progress': '100%'}
res_part_partition, _ = self.utility_wrap.loading_progress(collection_w.name)
assert res_part_partition == {'loading_progress': '100%'}
collection_w.release()
collection_w.load(replica_number=2)
res_all_partitions, _ = self.utility_wrap.loading_progress(collection_w.name)
assert res_all_partitions == {'loading_progress': '100%'}
@pytest.mark.tags(CaseLabel.L1)
def test_wait_loading_collection_empty(self):
"""
target: test wait_for_loading
method: input empty collection
expected: no exception raised
"""
self._connect()
cw = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
cw.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
cw.load()
self.utility_wrap.wait_for_loading_complete(cw.name)
res, _ = self.utility_wrap.loading_progress(cw.name)
exp_res = {loading_progress: "100%"}
assert res == exp_res
@pytest.mark.tags(CaseLabel.L1)
def test_wait_for_loading_complete(self):
"""
target: test wait for loading collection
method: insert 10000 entities and wait for loading complete
expected: after loading complete, loaded entities is 10000
"""
nb = 6000
collection_w = self.init_collection_wrap()
df = cf.gen_default_dataframe_data(nb)
collection_w.insert(df, timeout=60)
assert collection_w.num_entities == nb
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load(_async=True)
self.utility_wrap.wait_for_loading_complete(collection_w.name, timeout=45)
res, _ = self.utility_wrap.loading_progress(collection_w.name)
assert res[loading_progress] == '100%'
@pytest.mark.tags(CaseLabel.L0)
def test_drop_collection(self):
"""
target: test utility drop collection by name
method: input collection name and drop collection
expected: collection is dropped
"""
c_name = cf.gen_unique_str(prefix)
self.init_collection_wrap(c_name)
assert self.utility_wrap.has_collection(c_name)[0]
self.utility_wrap.drop_collection(c_name)
assert not self.utility_wrap.has_collection(c_name)[0]
@pytest.mark.tags(CaseLabel.L0)
def test_drop_collection_repeatedly(self):
"""
target: test drop collection repeatedly
method: 1.collection.drop 2.utility.drop_collection
expected: raise exception
"""
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(c_name)
assert self.utility_wrap.has_collection(c_name)[0]
collection_w.drop()
assert not self.utility_wrap.has_collection(c_name)[0]
# error = {ct.err_code: 1, ct.err_msg: {"describe collection failed: can't find collection:"}}
# self.utility_wrap.drop_collection(c_name, check_task=CheckTasks.err_res, check_items=error)
# @longjiquan: dropping collection should be idempotent.
self.utility_wrap.drop_collection(c_name)
@pytest.mark.tags(CaseLabel.L2)
def test_drop_collection_create_repeatedly(self):
"""
target: test repeatedly create and drop same name collection
method: repeatedly create and drop collection
expected: no exception
"""
from time import sleep
loops = 3
c_name = cf.gen_unique_str(prefix)
for _ in range(loops):
self.init_collection_wrap(c_name)
assert self.utility_wrap.has_collection(c_name)[0]
self.utility_wrap.drop_collection(c_name)
assert not self.utility_wrap.has_collection(c_name)[0]
sleep(1)
@pytest.mark.tags(CaseLabel.L1)
def test_rename_collection(self):
"""
target: test rename collection function to single collection
method: call rename_collection API to rename collection
expected: collection renamed successfully without any change on aliases
"""
self._connect()
collection_w, vectors, _, insert_ids, _ = self.init_collection_general(prefix)
old_collection_name = collection_w.name
new_collection_name = cf.gen_unique_str(prefix + "new")
alias = cf.gen_unique_str(prefix + "alias")
self.utility_wrap.create_alias(old_collection_name, alias)
self.utility_wrap.rename_collection(old_collection_name, new_collection_name)
collection_w = self.init_collection_wrap(name=new_collection_name,
check_task=CheckTasks.check_collection_property,
check_items={exp_name: new_collection_name,
exp_schema: default_schema})
collections = self.utility_wrap.list_collections()[0]
assert new_collection_name in collections
assert old_collection_name not in collections
assert [alias] == collection_w.aliases
@pytest.mark.tags(CaseLabel.L2)
def test_rename_collections(self):
"""
target: test rename collection function to multiple collections
method: call rename_collection API to rename collections
expected: collections renamed successfully without any change on aliases
"""
self._connect()
# create two collections
collection_w_1 = self.init_collection_general(prefix)[0]
old_collection_name_1 = collection_w_1.name
collection_w_2 = self.init_collection_general(prefix)[0]
old_collection_name_2 = collection_w_2.name
tmp_collection_name = cf.gen_unique_str(prefix + "tmp")
# create alias to each collection
alias_1 = cf.gen_unique_str(prefix + "alias1")
alias_2 = cf.gen_unique_str(prefix + "alias2")
self.utility_wrap.create_alias(old_collection_name_1, alias_1)
self.utility_wrap.create_alias(old_collection_name_2, alias_2)
# switch name of the existing collections
self.utility_wrap.rename_collection(old_collection_name_1, tmp_collection_name)
self.utility_wrap.rename_collection(old_collection_name_2, old_collection_name_1)
self.utility_wrap.rename_collection(tmp_collection_name, old_collection_name_2)
# check collection renamed successfully
collection_w_1 = self.init_collection_wrap(name=old_collection_name_1,
check_task=CheckTasks.check_collection_property,
check_items={exp_name: old_collection_name_1,
exp_schema: default_schema})
collection_w_2 = self.init_collection_wrap(name=old_collection_name_2,
check_task=CheckTasks.check_collection_property,
check_items={exp_name: old_collection_name_2,
exp_schema: default_schema})
collections = self.utility_wrap.list_collections()[0]
assert old_collection_name_1 in collections
assert old_collection_name_2 in collections
assert tmp_collection_name not in collections
# check alias not changed
assert collection_w_1.aliases[0] == alias_2
assert collection_w_2.aliases[0] == alias_1
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip("move to test_milvus_client_v2_rename_back_old_collection")
def test_rename_back_old_collection(self):
"""
target: test rename collection function to single collection
method: rename back to old collection name
expected: collection renamed successfully without any change on aliases
"""
# 1. connect
self._connect()
# 2. create a collection
collection_w, vectors, _, insert_ids, _ = self.init_collection_general(prefix)
old_collection_name = collection_w.name
new_collection_name = cf.gen_unique_str(prefix + "new")
alias = cf.gen_unique_str(prefix + "alias")
# 3. create an alias
self.utility_wrap.create_alias(old_collection_name, alias)
collection_alias = collection_w.aliases
# 4. rename collection
self.utility_wrap.rename_collection(old_collection_name, new_collection_name)
# 5. rename back to old collection name
self.utility_wrap.rename_collection(new_collection_name, old_collection_name)
collection_w = self.init_collection_wrap(name=old_collection_name,
check_task=CheckTasks.check_collection_property,
check_items={exp_name: old_collection_name,
exp_schema: default_schema})
collections = self.utility_wrap.list_collections()[0]
assert old_collection_name in collections
assert new_collection_name not in collections
assert collection_alias == collection_w.aliases
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip("move to test_milvus_client_v2_rename_back_old_alias")
def test_rename_back_old_alias(self):
"""
target: test rename collection function to single collection
method: rename back to old collection alias
expected: collection renamed successfully without any change on aliases
"""
# 1. connect
self._connect()
# 2. create a collection
collection_w, vectors, _, insert_ids, _ = self.init_collection_general(prefix)
old_collection_name = collection_w.name
alias = cf.gen_unique_str(prefix + "alias")
# 3. create an alias
self.utility_wrap.create_alias(old_collection_name, alias)
collection_alias = collection_w.aliases
log.info(collection_alias)
# 4. drop the alias
self.utility_wrap.drop_alias(collection_alias[0])
# 5. rename collection to the dropped alias name
self.utility_wrap.rename_collection(old_collection_name, collection_alias[0])
self.init_collection_wrap(name=collection_alias[0],
check_task=CheckTasks.check_collection_property,
check_items={exp_name: collection_alias[0],
exp_schema: default_schema})
collections = self.utility_wrap.list_collections()[0]
assert collection_alias[0] in collections
assert old_collection_name not in collections
@pytest.mark.tags(CaseLabel.L2)
def test_create_alias_using_dropped_collection_name(self):
"""
target: test create alias using a dropped collection name
method: create 2 collections and drop one collection
expected: raise no exception
"""
# 1. create 2 collections
a_name = cf.gen_unique_str("aa")
b_name = cf.gen_unique_str("bb")
self.init_collection_wrap(name=a_name, schema=default_schema,
check_task=CheckTasks.check_collection_property,
check_items={exp_name: a_name, exp_schema: default_schema})
self.init_collection_wrap(name=b_name, schema=default_schema,
check_task=CheckTasks.check_collection_property,
check_items={exp_name: b_name, exp_schema: default_schema})
# 2. drop collection a
self.utility_wrap.drop_collection(a_name)
assert self.utility_wrap.has_collection(a_name)[0] is False
assert len(self.utility_wrap.list_aliases(b_name)[0]) == 0
# 3. create alias with the name of collection a
self.utility_wrap.create_alias(b_name, a_name)
b_alias, _ = self.utility_wrap.list_aliases(b_name)
assert a_name in b_alias
@pytest.mark.tags(CaseLabel.L1)
def test_list_indexes(self):
"""
target: test utility.list_indexes
method: create 2 collections and list indexes
expected: raise no exception
"""
# 1. create 2 collections
string_field = ct.default_string_field_name
collection_w1 = self.init_collection_general(prefix, True)[0]
collection_w2 = self.init_collection_general(prefix, True, is_index=False)[0]
collection_w2.create_index(string_field)
# 2. list indexes
res1, _ = self.utility_wrap.list_indexes(collection_w1.name)
assert res1 == [ct.default_float_vec_field_name]
res2, _ = self.utility_wrap.list_indexes(collection_w2.name)
assert res2 == [string_field]
@pytest.mark.tags(CaseLabel.L1)
def test_get_server_type(self):
"""
target: test utility.get_server_type
method: get_server_type
expected: raise no exception
"""
self._connect()
res, _ = self.utility_wrap.get_server_type()
assert res == "milvus"
@pytest.mark.tags(CaseLabel.L1)
def test_load_state(self):
"""
target: test utility.load_state
method: load_state
expected: raise no exception
"""
collection_w = self.init_collection_general(prefix, True, partition_num=1)[0]
res1, _ = self.utility_wrap.load_state(collection_w.name)
assert str(res1) == "Loaded"
collection_w.release()
res2, _ = self.utility_wrap.load_state(collection_w.name)
assert str(res2) == "NotLoad"
collection_w.load(partition_names=[ct.default_partition_name])
res3, _ = self.utility_wrap.load_state(collection_w.name)
assert str(res3) == "Loaded"
class TestUtilityAdvanced(TestcaseBase):
""" Test case of index interface """
@pytest.mark.tags(CaseLabel.L2)
def test_has_collection_multi_collections(self):
"""
target: test has_collection with collection name
method: input collection name created before
expected: True
"""
c_name = cf.gen_unique_str(prefix)
c_name_2 = cf.gen_unique_str(prefix)
self.init_collection_wrap(name=c_name)
self.init_collection_wrap(name=c_name_2)
for name in [c_name, c_name_2]:
res, _ = self.utility_wrap.has_collection(name)
assert res is True
@pytest.mark.tags(CaseLabel.L2)
def test_list_collections_multi_collection(self):
"""
target: test list_collections
method: create collection, list_collections
expected: in the result
"""
c_name = cf.gen_unique_str(prefix)
c_name_2 = cf.gen_unique_str(prefix)
self.init_collection_wrap(name=c_name)
self.init_collection_wrap(name=c_name_2)
res, _ = self.utility_wrap.list_collections()
for name in [c_name, c_name_2]:
assert name in res
@pytest.mark.tags(CaseLabel.L2)
def test_drop_multi_collection_concurrent(self):
"""
target: test concurrent drop collection
method: multi thread drop one collection
expected: drop successfully
"""
thread_num = 3
threads = []
c_names = []
num = 5
for i in range(thread_num * num):
c_name = cf.gen_unique_str(prefix)
self.init_collection_wrap(c_name)
c_names.append(c_name)
def create_and_drop_collection(names):
for name in names:
assert self.utility_wrap.has_collection(name)[0]
self.utility_wrap.drop_collection(name)
assert not self.utility_wrap.has_collection(name)[0]
for i in range(thread_num):
x = threading.Thread(target=create_and_drop_collection, args=(c_names[i * num:(i + 1) * num],))
threads.append(x)
x.start()
for t in threads:
t.join()
log.debug(self.utility_wrap.list_collections()[0])
@pytest.mark.tags(CaseLabel.L2)
def test_get_query_segment_info_empty_collection(self):
"""
target: test getting query segment info of empty collection
method: init a collection and get query segment info
expected: length of segment is 0
"""
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(name=c_name)
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
res, _ = self.utility_wrap.get_query_segment_info(c_name)
assert len(res) == 0
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("primary_field", ["int64_pk", "varchar_pk"])
def test_get_sealed_query_segment_info(self, primary_field):
"""
target: test getting sealed query segment info of collection with data
method: init a collection, insert data, flush, index, load, and get query segment info
expected:
1. length of segment is greater than 0
2. the sum num_rows of each segment is equal to num of entities
3. all segment is_sorted true
"""
nb = 3000
segment_num = 2
collection_name = cf.gen_unique_str(prefix)
# connect -> create collection
self._connect()
self.collection_wrap.init_collection(
name=collection_name,
schema=cf.set_collection_schema(
fields=[primary_field, ct.default_float_vec_field_name],
field_params={
primary_field: FieldParams(is_primary=True, max_length=128).to_dict,
ct.default_float_vec_field_name: FieldParams(dim=ct.default_dim).to_dict,
},
)
)
for _ in range(segment_num):
# insert random pks
data = cf.gen_values(self.collection_wrap.schema, nb=nb, random_pk=True)
self.collection_wrap.insert(data)
self.collection_wrap.flush()
# flush -> index -> load -> sealed segments is sorted
self.build_multi_index(index_params=DefaultVectorIndexParams.IVF_SQ8(ct.default_float_vec_field_name))
self.collection_wrap.load()
# get_query_segment_info and verify results
res_sealed, _ = self.utility_wrap.get_query_segment_info(collection_name)
assert len(res_sealed) > 0 # maybe mix compaction to 1 segment
cnt = 0
for r in res_sealed:
log.info(f"segmentID {r.segmentID}: state: {r.state}; num_rows: {r.num_rows}; is_sorted: {r.is_sorted} ")
cnt += r.num_rows
assert r.is_sorted is True
assert cnt == nb * segment_num
# verify search
self.collection_wrap.search(data=cf.gen_vectors(ct.default_nq, ct.default_dim),
anns_field=ct.default_float_vec_field_name, param=DefaultVectorSearchParams.IVF_SQ8(),
limit=ct.default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": ct.default_nq,
"limit": ct.default_limit})
@pytest.mark.tags(CaseLabel.L1)
def test_get_growing_query_segment_info(self):
"""
target: test getting growing query segment info of collection with data
method: init a collection, index, load, insert data, and get query segment info
expected:
1. length of segment is 0, growing segment is not visible for get_query_segment_info
"""
nb = 3000
primary_field = "int64"
collection_name = cf.gen_unique_str(prefix)
# connect -> create collection
self._connect()
self.collection_wrap.init_collection(
name=collection_name,
schema=cf.set_collection_schema(
fields=[primary_field, ct.default_float_vec_field_name],
field_params={
primary_field: FieldParams(is_primary=True, max_length=128).to_dict,
ct.default_float_vec_field_name: FieldParams(dim=ct.default_dim).to_dict,
},
)
)
# index -> load
self.build_multi_index(index_params=DefaultVectorIndexParams.IVF_SQ8(ct.default_float_vec_field_name))
self.collection_wrap.load()
# insert random pks ***
data = cf.gen_values(self.collection_wrap.schema, nb=nb, random_pk=True)
self.collection_wrap.insert(data)
# get_query_segment_info and verify results
res_sealed, _ = self.utility_wrap.get_query_segment_info(collection_name)
assert len(res_sealed) == 0
@pytest.mark.tags(CaseLabel.L1)
def test_get_sealed_query_segment_info_after_create_index(self):
"""
target: test getting sealed query segment info of collection with data
method: init a collection, insert data, flush, create index, load, and get query segment info
expected:
1. length of segment is greater than 0
2. the sum num_rows of each segment is equal to num of entities
"""
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(name=c_name)
nb = 3000
df = cf.gen_default_dataframe_data(nb)
collection_w.insert(df)
collection_w.num_entities
collection_w.create_index(default_field_name, default_index_params)
collection_w.load()
res, _ = self.utility_wrap.get_query_segment_info(c_name)
assert len(res) > 0
segment_ids = []
cnt = 0
for r in res:
log.info(f"segmentID {r.segmentID}: state: {r.state}; num_rows: {r.num_rows} ")
if r.segmentID not in segment_ids:
segment_ids.append(r.segmentID)
cnt += r.num_rows
assert cnt == nb
@pytest.mark.tags(CaseLabel.L2)
def test_load_balance_normal(self):
"""
target: test load balance of collection
method: init a collection and load balance
expected: sealed_segment_ids is subset of des_sealed_segment_ids
"""
# init a collection
self._connect()
querynode_num = len(MilvusSys().query_nodes)
if querynode_num < 2:
pytest.skip("skip load balance testcase when querynode number less than 2")
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(name=c_name)
collection_w.create_index(default_field_name, default_index_params)
ms = MilvusSys()
nb = 3000
df = cf.gen_default_dataframe_data(nb)
collection_w.insert(df)
# get sealed segments
collection_w.num_entities
# get growing segments
collection_w.insert(df)
collection_w.load()
# prepare load balance params
time.sleep(0.5)
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
all_querynodes = [node["identifier"] for node in ms.query_nodes]
assert len(all_querynodes) > 1
all_querynodes = sorted(all_querynodes,
key=lambda x: len(segment_distribution[x]["sealed"])
if x in segment_distribution else 0, reverse=True)
src_node_id = all_querynodes[0]
des_node_ids = all_querynodes[1:]
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
# load balance
self.utility_wrap.load_balance(collection_w.name, src_node_id, des_node_ids, sealed_segment_ids)
# get segments distribution after load balance
time.sleep(0.5)
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
sealed_segment_ids_after_load_banalce = segment_distribution[src_node_id]["sealed"]
# assert src node has no sealed segments
assert sealed_segment_ids_after_load_banalce == []
des_sealed_segment_ids = []
for des_node_id in des_node_ids:
des_sealed_segment_ids += segment_distribution[des_node_id]["sealed"]
# assert sealed_segment_ids is subset of des_sealed_segment_ids
assert set(sealed_segment_ids).issubset(des_sealed_segment_ids)
@pytest.mark.tags(CaseLabel.L1)
def test_load_balance_with_src_node_not_exist(self):
"""
target: test load balance of collection
method: init a collection and load balance with src_node not exist
expected: raise exception
"""
# init a collection
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(name=c_name)
ms = MilvusSys()
nb = 3000
df = cf.gen_default_dataframe_data(nb)
collection_w.insert(df)
# get sealed segments
collection_w.num_entities
# get growing segments
collection_w.insert(df)
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
# prepare load balance params
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
all_querynodes = [node["identifier"] for node in ms.query_nodes]
all_querynodes = sorted(all_querynodes,
key=lambda x: len(segment_distribution[x]["sealed"])
if x in segment_distribution else 0, reverse=True)
# add node id greater than all querynodes, which is not exist for querynode, to src_node_ids
max_query_node_id = max(all_querynodes)
invalid_src_node_id = max_query_node_id + 1
src_node_id = all_querynodes[0]
dst_node_ids = all_querynodes[1:]
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
# load balance
self.utility_wrap.load_balance(collection_w.name, invalid_src_node_id, dst_node_ids, sealed_segment_ids,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "source node not found in any replica"})
@pytest.mark.tags(CaseLabel.L1)
def test_load_balance_with_all_dst_node_not_exist(self):
"""
target: test load balance of collection
method: init a collection and load balance with all dst_node not exist
expected: raise exception
"""
# init a collection
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(name=c_name)
ms = MilvusSys()
nb = 3000
df = cf.gen_default_dataframe_data(nb)
collection_w.insert(df)
# get sealed segments
collection_w.num_entities
# get growing segments
collection_w.insert(df)
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
# prepare load balance params
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
all_querynodes = [node["identifier"] for node in ms.query_nodes]
all_querynodes = sorted(all_querynodes,
key=lambda x: len(segment_distribution[x]["sealed"])
if x in segment_distribution else 0, reverse=True)
src_node_id = all_querynodes[0]
# add node id greater than all querynodes, which is not exist for querynode, to dst_node_ids
max_query_node_id = max(all_querynodes)
dst_node_ids = [id for id in range(max_query_node_id + 1, max_query_node_id + 3)]
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
# load balance
self.utility_wrap.load_balance(collection_w.name, src_node_id, dst_node_ids, sealed_segment_ids,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "destination node not found in the same replica"})
@pytest.mark.tags(CaseLabel.L1)
def test_load_balance_with_one_sealed_segment_id_not_exist(self):
"""
target: test load balance of collection
method: init a collection and load balance with one of sealed segment ids not exist
expected: raise exception
"""
# init a collection
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(name=c_name)
collection_w.create_index(default_field_name, default_index_params)
ms = MilvusSys()
nb = 3000
df = cf.gen_default_dataframe_data(nb)
collection_w.insert(df)
# get sealed segments
collection_w.flush()
# get growing segments
collection_w.insert(df)
collection_w.load()
# prepare load balance params
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
all_querynodes = [node["identifier"] for node in ms.query_nodes]
all_querynodes = sorted(all_querynodes,
key=lambda x: len(segment_distribution[x]["sealed"])
if x in segment_distribution else 0, reverse=True)
src_node_id = all_querynodes[0]
dst_node_ids = all_querynodes[1:]
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
if len(segment_distribution[src_node_id]["sealed"]) == 0:
sealed_segment_ids = [0] # add a segment id which is not exist
else:
# add a segment id which is not exist
sealed_segment_ids.append(max(segment_distribution[src_node_id]["sealed"]) + 1)
# load balance
self.utility_wrap.load_balance(collection_w.name, src_node_id, dst_node_ids, sealed_segment_ids,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 999, ct.err_msg: "not found in source node"})
@pytest.mark.tags(CaseLabel.L1)
def test_load_balance_with_all_sealed_segment_id_not_exist(self):
"""
target: test load balance of collection
method: init a collection and load balance with one of sealed segment ids not exist
expected: raise exception
"""
# init a collection
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(name=c_name)
ms = MilvusSys()
nb = 3000
df = cf.gen_default_dataframe_data(nb)
collection_w.insert(df)
# get sealed segments
collection_w.num_entities
collection_w.create_index(default_field_name, default_index_params)
# get growing segments
collection_w.insert(df)
collection_w.load()
# prepare load balance params
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
all_querynodes = [node["identifier"] for node in ms.query_nodes]
all_querynodes = sorted(all_querynodes,
key=lambda x: len(segment_distribution[x]["sealed"])
if x in segment_distribution else 0, reverse=True)
src_node_id = all_querynodes[0]
dst_node_ids = all_querynodes[1:]
# add segment ids which are not exist
sealed_segment_ids = [sealed_segment_id
for sealed_segment_id in range(max(segment_distribution[src_node_id]["sealed"]) + 100,
max(segment_distribution[src_node_id]["sealed"]) + 103)]
# load balance
self.utility_wrap.load_balance(collection_w.name, src_node_id, dst_node_ids, sealed_segment_ids,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "not found in source node"})
@pytest.mark.tags(CaseLabel.L2)
def test_load_balance_in_one_group(self):
"""
target: test load balance of collection in one group
method: init a collection, load with multi replicas and load balance among the querynodes in one group
expected: load balance successfully
"""
self._connect()
querynode_num = len(MilvusSys().query_nodes)
if querynode_num < 3:
pytest.skip("skip load balance for multi replicas testcase when querynode number less than 3")
# init a collection
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(name=c_name)
nb = 3000
df = cf.gen_default_dataframe_data(nb)
collection_w.insert(df)
# get sealed segments
collection_w.num_entities
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load(replica_number=2)
# get growing segments
collection_w.insert(df)
# get replicas information
res, _ = collection_w.get_replicas()
# prepare load balance params
# find a group which has multi nodes
group_nodes = []
for g in res.groups:
if len(g.group_nodes) >= 2:
group_nodes = list(g.group_nodes)
break
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
group_nodes = sorted(group_nodes,
key=lambda x: len(
segment_distribution[x]["sealed"])
if x in segment_distribution else 0, reverse=True)
src_node_id = group_nodes[0]
dst_node_ids = group_nodes[1:]
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
# load balance
self.utility_wrap.load_balance(collection_w.name, src_node_id, dst_node_ids, sealed_segment_ids)
# get segments distribution after load balance
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
sealed_segment_ids_after_load_balance = segment_distribution[src_node_id]["sealed"]
# assert src node has no sealed segments
# assert sealed_segment_ids_after_load_balance == []
des_sealed_segment_ids = []
for des_node_id in dst_node_ids:
des_sealed_segment_ids += segment_distribution[des_node_id]["sealed"]
# assert sealed_segment_ids is subset of des_sealed_segment_ids
assert set(sealed_segment_ids).issubset(des_sealed_segment_ids)
@pytest.mark.tags(CaseLabel.L3)
def test_load_balance_not_in_one_group(self):
"""
target: test load balance of collection in one group
method: init a collection, load with multi replicas and load balance among the querynodes in different group
expected: load balance failed
"""
# init a collection
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(name=c_name)
ms = MilvusSys()
nb = 3000
df = cf.gen_default_dataframe_data(nb)
collection_w.insert(df)
# get sealed segments
collection_w.num_entities
collection_w.load(replica_number=2)
# get growing segments
collection_w.insert(df)
# get replicas information
res, _ = collection_w.get_replicas()
# prepare load balance params
all_querynodes = [node["identifier"] for node in ms.query_nodes]
# find a group which has multi nodes
group_nodes = []
for g in res.groups:
if len(g.group_nodes) >= 2:
group_nodes = list(g.group_nodes)
break
src_node_id = group_nodes[0]
dst_node_ids = list(set(all_querynodes) - set(group_nodes))
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
# load balance
self.utility_wrap.load_balance(collection_w.name, src_node_id, dst_node_ids, sealed_segment_ids,
check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "must be in the same replica group"})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip(reason="querycoordv2")
def test_handoff_query_search(self):
"""
target: test query search after handoff
method: 1.load collection
2.insert, query and search
3.flush collection and triggere handoff
4. search with handoff indexed segments
expected: Search ids before and after handoff are different, because search from growing and search from index
"""
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1)
collection_w.create_index(default_field_name, default_index_params)
collection_w.load()
# handoff: insert and flush one segment
df = cf.gen_default_dataframe_data()
insert_res, _ = collection_w.insert(df)
term_expr = f'{ct.default_int64_field_name} in {insert_res.primary_keys[:10]}'
res = df.iloc[:10, :1].to_dict('records')
collection_w.query(term_expr, check_task=CheckTasks.check_query_results,
check_items={'exp_res': res,
"pk_name": collection_w.primary_field.name})
search_res_before, _ = collection_w.search(df[ct.default_float_vec_field_name][:1].to_list(),
ct.default_float_vec_field_name,
ct.default_search_params, ct.default_limit)
log.debug(collection_w.num_entities)
start = time.time()
while True:
time.sleep(2)
segment_infos, _ = self.utility_wrap.get_query_segment_info(collection_w.name)
# handoff done
if len(segment_infos) == 1 and segment_infos[0].state == SegmentState.Sealed:
break
if time.time() - start > 20:
raise MilvusException(1, f"Get query segment info after handoff cost more than 20s")
# query and search from handoff segments
collection_w.query(term_expr, check_task=CheckTasks.check_query_results,
check_items={'exp_res': res,
"pk_name": collection_w.primary_field.name})
search_res_after, _ = collection_w.search(df[ct.default_float_vec_field_name][:1].to_list(),
ct.default_float_vec_field_name,
ct.default_search_params, ct.default_limit)
# the ids between twice search is different because of index building
# log.debug(search_res_before[0].ids)
# log.debug(search_res_after[0].ids)
assert search_res_before[0].ids != search_res_after[0].ids
# assert search result includes the nq-vector before or after handoff
assert search_res_after[0].ids[0] == 0
assert search_res_before[0].ids[0] == search_res_after[0].ids[0]
class TestUtilityUserPassword(TestcaseBase):
""" Test case of user interface """
@pytest.mark.tags(ct.CaseLabel.RBAC)
def test_create_user_with_user_password(self, host, port):
"""
target: test the user creation with user and password
method: create user with the default user and password parameter
expected: connected is True
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = "nico"
password = "wertyu567"
self.utility_wrap.create_user(user=user, password=password)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user, password=password,
check_task=ct.CheckTasks.ccr)
self.utility_wrap.list_collections()
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("old_password", ["abc1234"])
@pytest.mark.parametrize("new_password", ["abc12345"])
def test_reset_password_with_user_and_old_password(self, host, port, old_password, new_password):
"""
target: test the password reset with old password
method: get a connection with user and corresponding old password
expected: connected is True
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = "robot2048"
self.utility_wrap.create_user(user=user, password=old_password)
self.utility_wrap.reset_password(user=user, old_password=old_password, new_password=new_password)
self.utility_wrap.list_collections()
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("old_password", ["abc1234"])
@pytest.mark.parametrize("new_password", ["abc12345"])
def test_update_password_with_user_and_old_password(self, host, port, old_password, new_password):
"""
target: test the password update with old password
method: get a connection with user and corresponding old password
expected: connected is True
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = "robot2060"
self.utility_wrap.create_user(user=user, password=old_password)
self.utility_wrap.update_password(user=user, old_password=old_password, new_password=new_password)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=new_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.list_collections()
@pytest.mark.tags(ct.CaseLabel.RBAC)
def test_list_usernames(self, host, port):
"""
target: test the user list created successfully
method: get a list of users
expected: list all users
"""
# 1. default user login
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# 2. create 2 users
self.utility_wrap.create_user(user="user1", password="abc123")
self.utility_wrap.create_user(user="user2", password="abc123")
# 3. list all users
res = self.utility_wrap.list_usernames()[0]
assert "user1" and "user2" in res
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("connect_name", [DefaultConfig.DEFAULT_USING])
def test_delete_user_with_username(self, host, port, connect_name):
"""
target: test deleting user with username
method: delete user with username and connect with the wrong user then list collections
expected: deleted successfully
"""
user_name = cf.gen_unique_str(prefix)
password = cf.gen_str_by_length()
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.create_user(user=user_name, password=password)
self.utility_wrap.delete_user(user=user_name)
self.connection_wrap.disconnect(alias=connect_name)
self.connection_wrap.connect(host=host, port=port, user=user_name, password=password,
check_task=CheckTasks.check_auth_failure)
@pytest.mark.tags(ct.CaseLabel.RBAC)
def test_delete_user_with_invalid_username(self, host, port):
"""
target: test the nonexistant user when deleting credential
method: delete a credential with user wrong
excepted: delete is true
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.delete_user(user="asdfghj")
@pytest.mark.tags(ct.CaseLabel.RBAC)
def test_delete_all_users(self, host, port):
"""
target: delete the users that created for test
method: delete the users in list_usernames except root
excepted: delete is true
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
res = self.utility_wrap.list_usernames()[0]
for user in res:
if user != "root":
self.utility_wrap.delete_user(user=user)
res = self.utility_wrap.list_usernames()[0]
assert len(res) == 1
class TestUtilityInvalidUserPassword(TestcaseBase):
""" Test invalid case of user interface """
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("user", ["qwertyuiopasdfghjklzxcvbnmqwertyui", "@*-.-*", "alisd/"])
def test_create_user_with_invalid_username(self, host, port, user):
"""
target: test the user when create user
method: make the length of user beyond standard
excepted: the creation is false
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.create_user(user=user, password=ct.default_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 1100,
ct.err_msg: "invalid parameter"})
@pytest.mark.tags(ct.CaseLabel.RBAC)
def test_create_user_with_existed_username(self, host, port):
"""
target: test the user when create user
method: create a user, and then create a user with the same username
excepted: the creation is false
"""
# 1.default user login
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# 2.create the first user successfully
user_name = cf.gen_unique_str(prefix)
self.utility_wrap.create_user(user=user_name, password=ct.default_password)
# 3.create the second user with the same username
self.utility_wrap.create_user(user=user_name, password=ct.default_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 65535,
ct.err_msg: "user already exists: %s" % user_name})
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("invalid_password", ["12345"])
def test_create_user_with_invalid_password(self, host, port, invalid_password):
"""
target: test the password when create user
method: make the length of user exceed the limitation [6, 256]
excepted: the creation is false
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user_name = cf.gen_unique_str(prefix)
self.utility_wrap.create_user(user=user_name, password=invalid_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 1100,
ct.err_msg: "invalid password length: invalid parameter"
"[5 out of range 6 <= value <= 256]"})
@pytest.mark.tags(ct.CaseLabel.RBAC)
def test_reset_password_with_invalid_username(self, host, port):
"""
target: test the wrong user when resetting password
method: create a user, and then reset the password with wrong username
excepted: reset is false
"""
# 1.default user login
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# 2.create a user
user_name = cf.gen_unique_str(prefix)
old_password = cf.gen_str_by_length()
new_password = cf.gen_str_by_length()
self.utility_wrap.create_user(user=user_name, password=old_password)
# 3.reset password with the wrong username
self.utility_wrap.reset_password(user="hobo", old_password=old_password, new_password=new_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 1400,
ct.err_msg: "old password not correct for hobo: "
"not authenticated"})
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("new_password", ["12345"])
def test_reset_password_with_invalid_new_password(self, host, port, new_password):
"""
target: test the new password when resetting password
method: create a user, and then set a wrong new password
excepted: reset is false
"""
# 1.default user login
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# 2.create a user
user_name = cf.gen_unique_str(prefix)
old_password = cf.gen_str_by_length()
self.utility_wrap.create_user(user=user_name, password=old_password)
# 3.reset password with the wrong new password
self.utility_wrap.reset_password(user=user_name, old_password=old_password, new_password=new_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 1100,
ct.err_msg: "invalid password length: invalid parameter"
"[5 out of range 6 <= value <= 256]"})
@pytest.mark.tags(ct.CaseLabel.RBAC)
def test_reset_password_with_invalid_old_password(self, host, port):
"""
target: test the old password when resetting password
method: create a credential, and then reset with a wrong old password
excepted: reset is false
"""
user_name = cf.gen_unique_str(prefix)
old_password = cf.gen_str_by_length()
new_password = cf.gen_str_by_length()
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.create_user(user=user_name, password=old_password)
self.utility_wrap.reset_password(user=user_name, old_password="waszx0", new_password=new_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 1400,
ct.err_msg: "old password not correct for %s: "
"not authenticated" % user_name})
@pytest.mark.tags(ct.CaseLabel.RBAC)
def test_update_password_with_invalid_username(self, host, port):
"""
target: test the wrong user when resetting password
method: create a user, and then reset the password with wrong username
excepted: reset is false
"""
# 1.default user login
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# 2.create a user
user_name = cf.gen_unique_str(prefix)
old_password = cf.gen_str_by_length()
new_password = cf.gen_str_by_length()
self.utility_wrap.create_user(user=user_name, password=old_password)
# 3.reset password with the wrong username
self.utility_wrap.update_password(user="hobo", old_password=old_password, new_password=new_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 1400,
ct.err_msg: "old password not correct for hobo:"
" not authenticated"})
@pytest.mark.tags(ct.CaseLabel.RBAC)
@pytest.mark.parametrize("new_password", ["12345"])
def test_update_password_with_invalid_new_password(self, host, port, new_password):
"""
target: test the new password when resetting password
method: create a user, and then set a wrong new password
excepted: reset is false
"""
# 1.default user login
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# 2.create a user
user_name = cf.gen_unique_str(prefix)
old_password = cf.gen_str_by_length()
self.utility_wrap.create_user(user=user_name, password=old_password)
# 3.reset password with the wrong new password
self.utility_wrap.update_password(user=user_name, old_password=old_password, new_password=new_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 1100,
ct.err_msg: "invalid password length: invalid parameter[5 out "
"of range 6 <= value <= 256]"})
@pytest.mark.tags(ct.CaseLabel.RBAC)
def test_update_password_with_invalid_old_password(self, host, port):
"""
target: test the old password when resetting password
method: create a credential, and then reset with a wrong old password
excepted: reset is false
"""
user_name = cf.gen_unique_str(prefix)
old_password = cf.gen_str_by_length()
new_password = cf.gen_str_by_length()
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.create_user(user=user_name, password=old_password)
self.utility_wrap.update_password(user=user_name, old_password="waszx0", new_password=new_password,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 1400,
ct.err_msg: "old password not correct for %s"
": not authenticated" % user_name})
@pytest.mark.tags(ct.CaseLabel.RBAC)
def test_delete_user_root(self, host, port):
"""
target: test deleting user root when deleting credential
method: connect and then delete the user root
excepted: delete is false
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.delete_user(user=ct.default_user, check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 1401,
ct.err_msg: "root user cannot be deleted: "
"privilege not permitted"})
class TestUtilityRBAC(TestcaseBase):
def teardown_method(self, method):
"""
teardown method: drop role and user
"""
log.info("[utility_teardown_method] Start teardown utility test cases ...")
self.connection_wrap.connect(host=cf.param_info.param_host, port=cf.param_info.param_port, user=ct.default_user,
password=ct.default_password, secure=cf.param_info.param_secure)
# drop users
users, _ = self.utility_wrap.list_users(False)
for u in users.groups:
if u.username != ct.default_user:
self.utility_wrap.delete_user(u.username)
user_groups, _ = self.utility_wrap.list_users(False)
assert len(user_groups.groups) == 1
role_groups, _ = self.utility_wrap.list_roles(False)
# drop roles
for role_group in role_groups.groups:
if role_group.role_name not in ['admin', 'public']:
self.utility_wrap.init_role(role_group.role_name)
g_list, _ = self.utility_wrap.role_list_grants()
for g in g_list.groups:
self.utility_wrap.role_revoke(g.object, g.object_name, g.privilege)
self.utility_wrap.role_drop()
role_groups, _ = self.utility_wrap.list_roles(False)
assert len(role_groups.groups) == 2
# drop database
databases, _ = self.database_wrap.list_database()
for db_name in databases:
self.database_wrap.using_database(db_name)
for c_name in self.utility_wrap.list_collections()[0]:
self.utility_wrap.drop_collection(c_name)
if db_name != ct.default_db:
self.database_wrap.drop_database(db_name)
super().teardown_method(method)
def init_db_kwargs(self, with_db):
"""
init db name kwargs
"""
db_kwargs = {}
if with_db:
db_name = cf.gen_unique_str("db")
self.database_wrap.create_database(db_name)
db_kwargs = {"db_name": db_name}
return db_kwargs
@pytest.mark.tags(CaseLabel.RBAC)
def test_clear_roles(self, host, port):
"""
target: check get roles list and clear them
method: remove all roles except admin and public
expected: assert clear success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# add user and bind to role
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
usernames, _ = self.utility_wrap.list_usernames()
for username in usernames:
if username != "root":
self.utility_wrap.delete_user(username)
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(user)
# get roles
role_groups, _ = self.utility_wrap.list_roles(False)
# drop roles
for role_group in role_groups.groups:
if role_group.role_name not in ['admin', 'public']:
self.utility_wrap.init_role(role_group.role_name)
g_list, _ = self.utility_wrap.role_list_grants()
for g in g_list.groups:
self.utility_wrap.role_revoke(g.object, g.object_name, g.privilege)
self.utility_wrap.role_drop()
role_groups, _ = self.utility_wrap.list_roles(False)
assert len(role_groups.groups) == 2
@pytest.mark.tags(CaseLabel.RBAC)
def test_role_list_user_with_root_user(self, host, port):
"""
target: check list user
method: check list user with root
expected: assert list user success, and root has no roles
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user_info, _ = self.utility_wrap.list_user("root", True)
user_item = user_info.groups[0]
assert user_item.roles == ()
assert user_item.username == "root"
@pytest.mark.tags(CaseLabel.RBAC)
def test_role_list_users(self, host, port):
"""
target: check list users
method: check list users con
expected: assert list users success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# add user and bind to role
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(user)
# get users
user_info, _ = self.utility_wrap.list_users(True)
# check root user and new user
root_exist = False
new_user_exist = False
for user_item in user_info.groups:
if user_item.username == "root" and len(user_item.roles) == 0:
root_exist = True
if user_item.username == user and user_item.roles[0] == r_name:
new_user_exist = True
assert root_exist
assert new_user_exist
@pytest.mark.tags(CaseLabel.RBAC)
def test_create_role(self, host, port):
"""
target: test create role
method: test create role with random name
expected: assert role create success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name, check_task=CheckTasks.check_role_property,
check_items={exp_name: r_name})
assert not self.utility_wrap.role_is_exist()[0]
self.utility_wrap.create_role()
assert self.utility_wrap.role_is_exist()[0]
@pytest.mark.tags(CaseLabel.RBAC)
def test_drop_role(self, host, port):
"""
target: test drop role
method: create a role, drop this role
expected: assert role drop success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_drop()
assert not self.utility_wrap.role_is_exist()[0]
@pytest.mark.tags(CaseLabel.RBAC)
def test_add_user_to_role(self, host, port):
"""
target: test add user to role
method: create a new user,add user to role
expected: assert add user success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(user)
users, _ = self.utility_wrap.role_get_users()
user_info, _ = self.utility_wrap.list_user(user, True)
user_item = user_info.groups[0]
assert r_name in user_item.roles
assert user in users
@pytest.mark.tags(CaseLabel.RBAC)
def test_remove_user_from_role(self, host, port):
"""
target: test remove user from role
method: create a new user,add user to role, remove user from role
expected: assert remove user from role success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(user)
self.utility_wrap.role_remove_user(user)
users, _ = self.utility_wrap.role_get_users()
assert len(users) == 0
@pytest.mark.tags(CaseLabel.RBAC)
def test_role_is_exist(self, host, port):
"""
target: test role is existed
method: check not exist role and exist role
expected: assert is_exist interface is correct
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
r_not_exist = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
assert self.utility_wrap.role_is_exist()[0]
self.utility_wrap.init_role(r_not_exist)
assert not self.utility_wrap.role_is_exist()[0]
@pytest.mark.tags(CaseLabel.RBAC)
def test_role_grant_collection_insert(self, host, port):
"""
target: test grant role collection insert privilege
method: create one role and two collections, grant one collection insert privilege
expected: assert grant privilege success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
c_name_2 = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.init_role(r_name, check_task=CheckTasks.check_role_property,
check_items={exp_name: r_name})
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(user)
time.sleep(60)
collection_w1 = self.init_collection_wrap(name=c_name)
collection_w2 = self.init_collection_wrap(name=c_name_2)
# verify user default privilege
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr)
data = cf.gen_default_dataframe_data()
collection_w1.insert(data=data, check_task=CheckTasks.check_permission_deny)
collection_w2.insert(data=data, check_task=CheckTasks.check_permission_deny)
# grant user collection insert privilege
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role(r_name)
self.utility_wrap.role_grant("Collection", c_name, "Insert")
time.sleep(60)
# verify user specific collection insert privilege
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr)
collection_w1.insert(data=data)
# verify grant scope
index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
collection_w1.create_index(ct.default_float_vec_field_name, index_params,
check_task=CheckTasks.check_permission_deny)
collection_w2.insert(data=data, check_task=CheckTasks.check_permission_deny)
@pytest.mark.tags(CaseLabel.RBAC)
def test_revoke_public_role_privilege(self, host, port):
"""
target: revoke public role privilege
method: revoke public role privilege
expected: success to revoke
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
self.init_collection_wrap(name=c_name)
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.init_role("public")
self.utility_wrap.role_add_user(user)
self.utility_wrap.role_revoke("Collection", c_name, "Insert")
time.sleep(60)
data = cf.gen_default_list_data(ct.default_nb)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr)
collection_w = self.init_collection_wrap(name=c_name)
collection_w.insert(data=data, check_task=CheckTasks.check_permission_deny)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role("public")
self.utility_wrap.role_grant("Collection", c_name, "Insert")
@pytest.mark.tags(CaseLabel.RBAC)
def test_revoke_user_after_delete_user(self, host, port):
"""
target: test revoke user with deleted user
method: 1. create user -> create a role -> role add user
2. delete user
3. revoke the deleted user
expected: revoke successfully
"""
# root connect
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# create user
username = cf.gen_unique_str("user")
u, _ = self.utility_wrap.create_user(user=username, password=ct.default_password)
# create a role and bind user
role_name = cf.gen_unique_str("role")
self.utility_wrap.init_role(role_name)
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(username)
# delete user
self.utility_wrap.delete_user(username)
# get role users
users, _ = self.utility_wrap.role_get_users()
log.debug(users)
# re-create the user with different password
self.utility_wrap.create_user(user=username, password=ct.default_password + "aaa")
self.connection_wrap.connect(alias="re-user", host=host, port=port, user=username,
password=ct.default_password + "aaa", check_task=ct.CheckTasks.ccr)
self.utility_wrap.list_collections(using="re-user")
# delete user and remove user successfully
self.utility_wrap.delete_user(username)
self.utility_wrap.role_remove_user(username)
# get role users and verify user removed
role_users, _ = self.utility_wrap.role_get_users()
assert username not in role_users
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [True, False])
def test_role_revoke_collection_privilege(self, host, port, with_db):
"""
target: test revoke role collection privilege,
method: create role and collection, grant role insert privilege, revoke privilege
expected: assert revoke privilege success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(user)
# self.init_collection_wrap(name=c_name)
db_kwargs = self.init_db_kwargs(with_db)
db_name = db_kwargs.get("db_name", ct.default_db)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=c_name)
# grant user collection insert privilege
self.utility_wrap.role_grant("Collection", c_name, "Insert", **db_kwargs)
time.sleep(60)
self.utility_wrap.role_list_grants(**db_kwargs)
# verify user specific collection insert privilege
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
data = cf.gen_default_list_data(ct.default_nb)
collection_w.insert(data=data)
# revoke privilege
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role(r_name)
self.utility_wrap.role_revoke("Collection", c_name, "Insert", **db_kwargs)
time.sleep(60)
# verify revoke is success
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
collection_w = self.init_collection_wrap(name=c_name)
collection_w.insert(data=data, check_task=CheckTasks.check_permission_deny)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_role_revoke_global_privilege(self, host, port, with_db):
"""
target: test revoke role global privilege,
method: create role, grant role global createcollection privilege, revoke privilege
expected: assert revoke privilege success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
c_name_2 = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(user)
# grant user Global CreateCollection privilege
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "CreateCollection", **db_kwargs)
time.sleep(60)
# verify user specific Global CreateCollection privilege
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
schema = cf.gen_default_collection_schema()
_, create_res = self.collection_wrap.init_collection(name=c_name, schema=schema,
check_task=CheckTasks.check_nothing)
retry_times = 6
while not create_res and retry_times > 0:
time.sleep(10)
_, create_res = self.collection_wrap.init_collection(name=c_name, schema=schema,
check_task=CheckTasks.check_nothing)
retry_times -= 1
# revoke privilege
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
db_name = db_kwargs.get("db_name", ct.default_db)
self.database_wrap.using_database(db_name)
assert c_name in self.utility_wrap.list_collections()[0]
self.utility_wrap.init_role(r_name)
self.utility_wrap.role_revoke("Global", "*", "CreateCollection", **db_kwargs)
time.sleep(60)
# verify revoke is success
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
_, create_res = self.collection_wrap.init_collection(name=c_name_2, schema=schema,
check_task=CheckTasks.check_nothing)
retry_times = 6
while create_res and retry_times > 0:
time.sleep(10)
_, create_res = self.collection_wrap.init_collection(name=c_name_2, schema=schema,
check_task=CheckTasks.check_nothing)
retry_times -= 1
self.collection_wrap.init_collection(name=cf.gen_unique_str(prefix), schema=schema,
check_task=CheckTasks.check_permission_deny)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_role_revoke_user_privilege(self, host, port, with_db):
"""
target: test revoke role user privilege,
method: create role, grant role user updateuser privilege, revoke privilege
expected: assert revoke privilege success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
user_test = cf.gen_unique_str(prefix)
password_test = cf.gen_unique_str(prefix)
self.utility_wrap.create_user(user=user_test, password=password_test)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(user)
# grant user User UpdateUser privilege
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("User", "*", "UpdateUser", **db_kwargs)
time.sleep(60)
self.utility_wrap.role_revoke("User", "*", "UpdateUser", **db_kwargs)
time.sleep(60)
# verify revoke is success
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
self.utility_wrap.reset_password(user=user_test, old_password=password_test, new_password=password,
check_task=CheckTasks.check_permission_deny)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_role_list_grants(self, host, port, with_db):
"""
target: test grant role privileges and list them
method: grant role privileges and list them
expected: assert list granted privileges success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
user2 = cf.gen_unique_str(prefix)
u2, _ = self.utility_wrap.create_user(user=user2, password=password)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(user)
self.init_collection_wrap(name=c_name)
# grant user privilege
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.init_role(r_name)
grant_list = cf.gen_grant_list(c_name)
for grant_item in grant_list:
self.utility_wrap.role_grant(grant_item["object"], grant_item["object_name"], grant_item["privilege"],
**db_kwargs)
time.sleep(60)
# list grants with default user
g_list, _ = self.utility_wrap.role_list_grants(**db_kwargs)
assert len(g_list.groups) == len(grant_list)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
# list grants with user
g_list, _ = self.utility_wrap.role_list_grants(**db_kwargs)
assert len(g_list.groups) == len(grant_list)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user2,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
# user2 can not list grants of role
self.utility_wrap.role_list_grants(**db_kwargs, check_task=CheckTasks.check_permission_deny)
@pytest.mark.tags(CaseLabel.RBAC)
def test_drop_role_which_bind_user(self, host, port):
"""
target: drop role which bind user
method: create a role, bind user to the role, drop the role
expected: drop success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(user)
self.utility_wrap.role_drop()
assert not self.utility_wrap.role_is_exist()[0]
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", ["admin", "public"])
def test_add_user_to_default_role(self, name, host, port):
"""
target: add user to admin role or public role
method: create a user,add user to admin role or public role
expected: add success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.init_role(name)
self.utility_wrap.role_add_user(user)
self.utility_wrap.role_add_user(user)
users, _ = self.utility_wrap.role_get_users()
user_info, _ = self.utility_wrap.list_user(user, True)
user_item = user_info.groups[0]
assert name in user_item.roles
assert user in users
@pytest.mark.tags(CaseLabel.RBAC)
def test_add_root_to_new_role(self, host, port):
"""
target: add root to new role
method: add root to new role
expected: add success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_add_user("root")
users, _ = self.utility_wrap.role_get_users()
user_info, _ = self.utility_wrap.list_user("root", True)
user_item = user_info.groups[0]
assert r_name in user_item.roles
assert "root" in users
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
def test_list_collection_grands_by_role_and_object(self, host, port):
"""
target: list grants by role and object
method: create a new role,grant role collection privilege,list grants by role and object
expected: list success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(name=c_name)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_grant("Collection", c_name, "Search")
self.utility_wrap.role_grant("Collection", c_name, "Insert")
time.sleep(60)
g_list, _ = self.utility_wrap.role_list_grant("Collection", c_name)
assert len(g_list.groups) == 2
for g in g_list.groups:
assert g.object == "Collection"
assert g.object_name == c_name
assert g.privilege in ["Search", "Insert"]
self.utility_wrap.role_revoke(g.object, g.object_name, g.privilege)
time.sleep(60)
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
def test_list_global_grants_by_role_and_object(self, host, port):
"""
target: list grants by role and object
method: create a new role,grant role global privilege,list grants by role and object
expected: list success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_grant("Global", "*", "CreateCollection")
self.utility_wrap.role_grant("Global", "*", "All")
time.sleep(60)
g_list, _ = self.utility_wrap.role_list_grant("Global", "*")
assert len(g_list.groups) == 2
for g in g_list.groups:
assert g.object == "Global"
assert g.object_name == "*"
assert g.privilege in ["CreateCollection", "All"]
self.utility_wrap.role_revoke(g.object, g.object_name, g.privilege)
time.sleep(60)
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
def test_verify_admin_role_privilege(self, host, port):
"""
target: verify admin role privilege
method: create a new user, bind to admin role, crud collection
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role("admin")
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr)
collection_w = self.init_collection_wrap(name=c_name)
data = cf.gen_default_list_data(ct.default_nb)
collection_w.insert(data=data)
collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index)
collection_w.load()
assert collection_w.num_entities == ct.default_nb
collection_w.release()
collection_w.drop()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_grant_collection_load_privilege(self, host, port, with_db):
"""
target: verify grant collection load privilege
method: verify grant collection load privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
db_kwargs = self.init_db_kwargs(with_db)
db_name = db_kwargs.get("db_name", ct.default_db)
self.utility_wrap.role_grant("Collection", c_name, "Load", **db_kwargs)
self.utility_wrap.role_grant("Collection", c_name, "GetLoadingProgress", **db_kwargs)
time.sleep(60)
log.debug(self.utility_wrap.role_list_grants(**db_kwargs))
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=c_name)
data = cf.gen_default_list_data(100)
mutation_res, _ = collection_w.insert(data=data)
collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
log.debug(collection_w.name)
collection_w.load()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_grant_collection_release_privilege(self, host, port, with_db):
"""
target: verify grant collection release privilege
method: verify grant collection release privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
db_kwargs = self.init_db_kwargs(with_db)
db_name = db_kwargs.get("db_name", ct.default_db)
self.utility_wrap.role_grant("Collection", c_name, "Release", **db_kwargs)
time.sleep(60)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=c_name)
data = cf.gen_default_list_data(100)
mutation_res, _ = collection_w.insert(data=data)
collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index)
collection_w.load()
self.utility_wrap.role_list_grants(**db_kwargs)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
collection_w.release()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip(reason="https://github.com/milvus-io/milvus/issues/23945 not supported compaction privilege")
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_grant_collection_compaction_privilege(self, host, port, with_db):
"""
target: verify grant collection compaction privilege
method: verify grant collection compaction privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str("user")
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str("coll")
r_name = cf.gen_unique_str("role")
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Collection", c_name, "Compaction", **db_kwargs)
self.utility_wrap.role_list_grant("Collection", c_name, **db_kwargs)
self.utility_wrap.role_get_users()
# create collection in the db
db_name = db_kwargs.get("db_name", ct.default_db)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=c_name)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
collection_w.compact()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_grant_collection_insert_privilege(self, host, port, with_db):
"""
target: verify grant collection insert privilege
method: verify grant collection insert privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
db_kwargs = self.init_db_kwargs(with_db)
db_name = db_kwargs.get("db_name", ct.default_db)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=c_name)
# with db
self.utility_wrap.role_grant("Collection", c_name, "Insert", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
data = cf.gen_default_list_data(ct.default_nb)
mutation_res, _ = collection_w.insert(data=data)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_grant_collection_delete_privilege(self, host, port, with_db):
"""
target: verify grant collection delete privilege
method: verify grant collection delete privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
db_kwargs = self.init_db_kwargs(with_db)
db_name = db_kwargs.get("db_name", ct.default_db)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=c_name)
# with db
self.utility_wrap.role_grant("Collection", c_name, "Delete", **db_kwargs)
time.sleep(60)
data = cf.gen_default_list_data(ct.default_nb)
mutation_res, _ = collection_w.insert(data=data)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
tmp_expr = f'{ct.default_int64_field_name} in {[0]}'
collection_w.delete(tmp_expr)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_create_index_privilege(self, host, port, with_db):
"""
target: verify grant create index privilege
method: verify grant create index privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
# with db
db_kwargs = self.init_db_kwargs(with_db)
db_name = db_kwargs.get("db_name", ct.default_db)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=c_name)
self.utility_wrap.role_grant("Collection", c_name, "CreateIndex", **db_kwargs)
self.utility_wrap.role_grant("Collection", c_name, "Flush", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
collection_w.create_index(ct.default_float_vec_field_name)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_drop_index_privilege(self, host, port, with_db):
"""
target: verify grant drop index privilege
method: verify grant drop index privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
# with db
db_kwargs = self.init_db_kwargs(with_db)
db_name = db_kwargs.get("db_name", ct.default_db)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=c_name)
collection_w.create_index(ct.default_float_vec_field_name)
self.utility_wrap.role_grant("Collection", c_name, "DropIndex", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
collection_w.drop_index()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_collection_search_privilege(self, host, port, with_db):
"""
target: verify grant collection search privilege
method: verify grant collection search privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
# with db
db_kwargs = self.init_db_kwargs(with_db)
db_name = db_kwargs.get("db_name", ct.default_db)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=c_name)
data = cf.gen_default_list_data(ct.default_nb)
mutation_res, _ = collection_w.insert(data=data)
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
collection_w.load()
self.utility_wrap.role_grant("Collection", c_name, "Search", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
vectors = [[random.random() for _ in range(ct.default_dim)] for _ in range(ct.default_nq)]
collection_w.search(vectors[:ct.default_nq], ct.default_float_vec_field_name,
{}, ct.default_limit,
"int64 >= 0", check_task=CheckTasks.check_search_results,
check_items={"nq": ct.default_nq,
"limit": ct.default_limit})
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
@pytest.mark.skip("will be modified soon, now flush will fail for GetFlushState")
def test_verify_collection_flush_privilege(self, host, port, with_db):
"""
target: verify grant collection flush privilege
method: verify grant collection flush privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
# with db
db_kwargs = self.init_db_kwargs(with_db)
db_name = db_kwargs.get("db_name", ct.default_db)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=c_name)
self.utility_wrap.role_grant("Collection", c_name, "Flush", db_name=db_name)
time.sleep(120)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, db_name=db_name)
collection_w.flush()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_collection_query_privilege(self, host, port, with_db):
"""
target: verify grant collection query privilege
method: verify grant collection query privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
# with db
db_kwargs = self.init_db_kwargs(with_db)
db_name = db_kwargs.get("db_name", ct.default_db)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=c_name)
data = cf.gen_default_list_data(100)
mutation_res, _ = collection_w.insert(data=data)
collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index)
collection_w.load()
self.utility_wrap.role_grant("Collection", c_name, "Query", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'
res, _ = collection_w.query(default_term_expr)
assert len(res) == 2
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_global_all_privilege(self, host, port, with_db):
"""
target: verify grant global all privilege
method: verify grant global all privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "All", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
collection_w = self.init_collection_wrap(name=c_name)
collection_w.drop()
user_test = cf.gen_unique_str(prefix)
password_test = cf.gen_unique_str(prefix)
self.utility_wrap.create_user(user=user_test, password=password_test)
r_test = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_test)
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(user_test)
self.utility_wrap.role_grant("Collection", c_name, "Insert")
time.sleep(60)
self.utility_wrap.role_revoke("Collection", c_name, "Insert")
time.sleep(60)
self.utility_wrap.role_remove_user(user_test)
self.utility_wrap.delete_user(user=user_test)
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_global_create_collection_privilege(self, host, port, with_db):
"""
target: verify grant global create collection privilege
method: verify grant global create collection privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "CreateCollection", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
self.init_collection_wrap(name=c_name)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_global_drop_collection_privilege(self, host, port, with_db):
"""
target: verify grant global drop collection privilege
method: verify grant global drop collection privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "DropCollection", **db_kwargs)
time.sleep(60)
collection_w = self.init_collection_wrap(name=c_name)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
collection_w.drop()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_global_create_ownership_privilege(self, host, port, with_db):
"""
target: verify grant global create ownership privilege
method: verify grant global create ownership privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "CreateOwnership", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
user_test = cf.gen_unique_str(prefix)
password_test = cf.gen_unique_str(prefix)
self.utility_wrap.create_user(user=user_test, password=password_test)
r_test = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_test)
self.utility_wrap.create_role()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_global_drop_ownership_privilege(self, host, port, with_db):
"""
target: verify grant global drop ownership privilege
method: verify grant global drop ownership privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "DropOwnership", **db_kwargs)
time.sleep(60)
user_test = cf.gen_unique_str(prefix)
password_test = cf.gen_unique_str(prefix)
self.utility_wrap.create_user(user=user_test, password=password_test)
r_test = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_test)
self.utility_wrap.create_role()
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
self.utility_wrap.role_drop()
self.utility_wrap.delete_user(user=user_test)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_global_select_ownership_privilege(self, host, port, with_db):
"""
target: verify grant global select ownership privilege
method: verify grant global select ownership privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "SelectOwnership", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
self.utility_wrap.list_usernames()
self.utility_wrap.role_list_grants()
self.utility_wrap.list_roles(False)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_global_manage_ownership_privilege(self, host, port, with_db):
"""
target: verify grant global manage ownership privilege
method: verify grant global manage ownership privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(name=c_name)
user_test = cf.gen_unique_str(prefix)
password_test = cf.gen_unique_str(prefix)
self.utility_wrap.create_user(user=user_test, password=password_test)
r_test = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_test)
self.utility_wrap.create_role()
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("Global", "*", "ManageOwnership", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
self.utility_wrap.role_add_user(user_test)
self.utility_wrap.role_remove_user(user_test)
self.utility_wrap.role_grant("Collection", c_name, "Search")
time.sleep(60)
self.utility_wrap.role_revoke("Collection", c_name, "Search")
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_user_update_privilege(self, host, port, with_db):
"""
target: verify grant user update privilege
method: verify grant user update privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user_test = cf.gen_unique_str(prefix)
password_test = cf.gen_unique_str(prefix)
self.utility_wrap.create_user(user=user_test, password=password_test)
r_test = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_test)
self.utility_wrap.create_role()
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("User", "*", "UpdateUser", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
self.utility_wrap.reset_password(user=user_test, old_password=password_test, new_password=password)
self.utility_wrap.update_password(user=user_test, old_password=password, new_password=password_test)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_select_user_privilege(self, host, port, with_db):
"""
target: verify grant select user privilege
method: verify grant select user privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user_test = cf.gen_unique_str(prefix)
password_test = cf.gen_unique_str(prefix)
self.utility_wrap.create_user(user=user_test, password=password_test)
r_test = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_test)
self.utility_wrap.create_role()
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
# with db
db_kwargs = self.init_db_kwargs(with_db)
self.utility_wrap.role_grant("User", "*", "SelectUser", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
self.utility_wrap.list_user(username=user_test, include_role_info=False)
self.utility_wrap.list_users(include_role_info=False)
def test_admin_user_after_db_deleted(self, host, port):
"""
target: test admin role can opearte after db deleted
method: 1.root connect -> create collection in
2.create co
expected:
"""
# root connect
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
collection_w_default = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
# create db and create collection
self.database_wrap.create_database("db")
self.database_wrap.create_database("db2")
# self.database_wrap.using_database("db")
# collection_w_db = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
# create a user and bind to admin role
self.utility_wrap.create_user("u1", "Milvus")
self.utility_wrap.init_role("admin")
self.utility_wrap.role_add_user("u1")
# drop db
self.database_wrap.drop_database("db")
self.database_wrap.list_database()
self.utility_wrap.list_collections()
self.database_wrap.using_database("db2")
self.database_wrap.list_database()
self.utility_wrap.list_collections()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_grant_privilege_with_wildcard_object_name(self, host, port, with_db):
"""
target: verify grant privilege with wildcard instead of object name
method: verify grant privilege with wildcard instead of object name
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
c_name_2 = cf.gen_unique_str(prefix)
# with db
db_kwargs = self.init_db_kwargs(with_db)
db_name = db_kwargs.get("db_name", ct.default_db)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=c_name)
collection_w2 = self.init_collection_wrap(name=c_name_2)
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
collection_w2.create_index(ct.default_float_vec_field_name, default_index_params)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
self.utility_wrap.role_grant("Collection", "*", "Load", **db_kwargs)
self.utility_wrap.role_grant("Collection", "*", "GetLoadingProgress", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
collection_w.load()
collection_w2.load()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("with_db", [False, True])
def test_verify_grant_privilege_with_wildcard_privilege(self, host, port, with_db):
"""
target: verify grant privilege with wildcard instead of privilege
method: verify grant privilege with wildcard instead of privilege
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
# with db
db_kwargs = self.init_db_kwargs(with_db)
db_name = db_kwargs.get("db_name", ct.default_db)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=c_name)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.role_add_user(user)
self.utility_wrap.role_grant("Collection", "*", "*", **db_kwargs)
time.sleep(60)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
collection_w.load()
collection_w.release()
collection_w.compact()
data = cf.gen_default_list_data(ct.default_nb)
mutation_res, _ = collection_w.insert(data=data)
tmp_expr = f'{ct.default_int64_field_name} in {[0]}'
collection_w.delete(tmp_expr)
vectors = [[random.random() for _ in range(ct.default_dim)] for _ in range(ct.default_nq)]
collection_w.load()
collection_w.search(vectors[:ct.default_nq], ct.default_float_vec_field_name,
{}, ct.default_limit,
"int64 >= 0")
collection_w.flush()
default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'
collection_w.query(default_term_expr)
collection_w.release()
collection_w.drop_index()
@pytest.mark.tags(CaseLabel.RBAC)
def test_new_user_default_owns_public_role_permission(self, host, port):
"""
target: new user owns public role privilege
method: create a role,verify its permission
expected: verify success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user_test = cf.gen_unique_str(prefix)
password_test = cf.gen_unique_str(prefix)
self.utility_wrap.create_user(user=user_test, password=password_test)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
c_name_2 = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
collection_w = self.init_collection_wrap(name=c_name)
_, _ = self.index_wrap.init_index(collection_w.collection, default_field_name, default_index_params)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr)
# Collection permission deny
time.sleep(60)
collection_w.load(check_task=CheckTasks.check_permission_deny)
collection_w.release(check_task=CheckTasks.check_permission_deny)
collection_w.compact(check_task=CheckTasks.check_permission_deny)
data = cf.gen_default_list_data(ct.default_nb)
mutation_res, _ = collection_w.insert(data=data, check_task=CheckTasks.check_permission_deny)
tmp_expr = f'{ct.default_int64_field_name} in {[0]}'
collection_w.delete(tmp_expr, check_task=CheckTasks.check_permission_deny)
self.index_wrap.drop(ct.default_int64_field_name, check_task=CheckTasks.check_permission_deny)
self.index_wrap.init_index(collection_w.collection, ct.default_int64_field_name,
default_index_params, check_task=CheckTasks.check_permission_deny)
vectors = [[random.random() for _ in range(ct.default_dim)] for _ in range(ct.default_nq)]
collection_w.search(vectors[:ct.default_nq], ct.default_float_vec_field_name,
ct.default_search_params, ct.default_limit,
"int64 >= 0", check_task=CheckTasks.check_permission_deny)
collection_w.flush(check_task=CheckTasks.check_permission_deny)
default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'
collection_w.query(default_term_expr, check_task=CheckTasks.check_permission_deny)
# self.utility_wrap.bulk_insert(c_name, check_task=CheckTasks.check_permission_deny)
# Global permission deny
self.init_collection_wrap(name=c_name_2, check_task=CheckTasks.check_permission_deny)
collection_w.drop(check_task=CheckTasks.check_permission_deny)
self.utility_wrap.create_user(user=c_name, password=password, check_task=CheckTasks.check_permission_deny)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role(check_task=CheckTasks.check_permission_deny)
self.utility_wrap.delete_user(user=user, check_task=CheckTasks.check_permission_deny)
self.utility_wrap.role_drop(check_task=CheckTasks.check_permission_deny)
self.utility_wrap.list_usernames(check_task=CheckTasks.check_permission_deny)
self.utility_wrap.role_list_grants(check_task=CheckTasks.check_permission_deny)
self.utility_wrap.list_roles(False, check_task=CheckTasks.check_permission_deny)
self.utility_wrap.role_add_user(user, check_task=CheckTasks.check_permission_deny)
self.utility_wrap.role_remove_user(user, check_task=CheckTasks.check_permission_deny)
self.utility_wrap.role_grant("Collection", c_name, "Insert", check_task=CheckTasks.check_permission_deny)
self.utility_wrap.role_revoke("Collection", c_name, "Insert", check_task=CheckTasks.check_permission_deny)
# User permission deny
self.utility_wrap.reset_password(user=user_test, old_password=password_test, new_password=password,
check_task=CheckTasks.check_permission_deny)
self.utility_wrap.update_password(user=user_test, old_password=password, new_password=password_test,
check_task=CheckTasks.check_permission_deny)
self.utility_wrap.list_user(user_test, False, check_task=CheckTasks.check_permission_deny)
# public role access
collection_w.index()
self.utility_wrap.list_collections()
self.utility_wrap.has_collection(c_name)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", ["admin", "public"])
def test_remove_user_from_default_role(self, name, host, port):
"""
target: remove user from admin role or public role
method: create a user,add user to admin role or public role,remove user from role
expected: remove success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.init_role(name)
self.utility_wrap.role_add_user(user)
users, _ = self.utility_wrap.role_get_users()
user_info, _ = self.utility_wrap.list_user(user, True)
user_item = user_info.groups[0]
assert name in user_item.roles
assert user in users
self.utility_wrap.role_remove_user(user)
users, _ = self.utility_wrap.role_get_users()
assert user not in users
@pytest.mark.tags(CaseLabel.RBAC)
def test_remove_root_from_new_role(self, host, port):
"""
target: remove root from new role
method: create a new role, bind root to role,remove root from role
expected: remove success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
assert self.utility_wrap.role_is_exist()[0]
self.utility_wrap.role_add_user("root")
users, _ = self.utility_wrap.role_get_users()
user_info, _ = self.utility_wrap.list_user("root", True)
user_item = user_info.groups[0]
assert r_name in user_item.roles
assert "root" in users
self.utility_wrap.role_remove_user("root")
users, _ = self.utility_wrap.role_get_users()
assert "root" not in users
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("will be modified soon, now flush will fail for GetFlushState")
def test_grant_db_collections(self, host, port):
"""
target: test grant collection privilege with db
method: 1.root user create db and collection
2.create a role and grant collection privilege with db
3.bind role to a user
4.user connect and verify privilege with different db
expected: privilege valid only with the granted db
"""
tmp_nb = 100
# root connect
self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# create collection in db
db_name = cf.gen_unique_str(prefix)
self.database_wrap.create_database(db_name)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
collection_w.insert(cf.gen_default_dataframe_data(tmp_nb))
# grant role collection flush privilege
user, pwd, role = self.init_user_with_privilege("Collection", collection_w.name, "Flush", db_name)
self.utility_wrap.role_grant("Collection", collection_w.name, "GetStatistics", db_name)
time.sleep(60)
# re-connect with new user and default db
self.connection_wrap.disconnect(alias=ct.default_alias)
self.connection_wrap.connect(host=host, port=port, user=user, password=pwd,
db_name=ct.default_db, secure=cf.param_info.param_secure,
check_task=ct.CheckTasks.ccr)
# verify user flush collection with different db
collection_w.flush(check_task=CheckTasks.check_permission_deny)
# set using db to db_name and flush
self.database_wrap.using_database(db_name)
assert collection_w.num_entities == tmp_nb
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_db_global(self, host, port):
"""
target: test grant global privilege with db
method: 1.root user create db
2.create a role and grant all global privilege with db
3.bind role to a user
4.user connect and verify privilege with different db
expected: privilege valid only with the granted db
"""
# root connect
self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# create collection in db
db_name = cf.gen_unique_str(prefix)
self.database_wrap.create_database(db_name)
# grant role collection flush privilege
user, pwd, role = self.init_user_with_privilege("Global", "*", "*", db_name)
time.sleep(60)
# re-connect with new user and default db
self.connection_wrap.disconnect(alias=ct.default_alias)
self.connection_wrap.connect(host=host, port=port, user=user, password=pwd,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# verify user list grants with different db
self.database_wrap.using_database(ct.default_db)
self.utility_wrap.describe_resource_group(ct.default_resource_group_name,
check_task=CheckTasks.check_permission_deny)
# set using db to db_name and verify grants
self.database_wrap.using_database(db_name)
self.utility_wrap.role_list_grants()
self.utility_wrap.describe_resource_group(ct.default_resource_group_name)
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_db_users(self, host, port):
"""
target: test grant user privilege with db
method: 1.root user create db
2.create a role and grant user privilege with db
3.bind role to a user
4.user connect and verify privilege with different db
expected: privilege valid only with the granted db
"""
# root connect
self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# create collection in db
db_name = cf.gen_unique_str(prefix)
self.database_wrap.create_database(db_name)
# grant role collection flush privilege
user, pwd, role = self.init_user_with_privilege("User", "*", "SelectUser", db_name)
time.sleep(60)
# re-connect with new user and default db
self.connection_wrap.disconnect(alias=ct.default_alias)
self.connection_wrap.connect(host=host, port=port, user=user, password=pwd, db_name=ct.default_db,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# verify user list grants with different db
self.utility_wrap.list_users(False, check_task=CheckTasks.check_permission_deny)
# set using db to db_name and verify grants
self.database_wrap.using_database(db_name)
self.utility_wrap.list_users(False)
@pytest.mark.tags(CaseLabel.RBAC)
def test_revoke_db_collection(self, host, port):
"""
target: test revoke db collection
method: test revoke privilege with normal db or not existed db
expected: verify privilege
"""
tmp_nb = 100
# root connect
self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# create collection in db
db_name = cf.gen_unique_str(prefix)
self.database_wrap.create_database(db_name)
self.database_wrap.using_database(db_name)
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
collection_w.insert(cf.gen_default_dataframe_data(tmp_nb))
# grant role collection flush privilege
user, pwd, role = self.init_user_with_privilege("Collection", collection_w.name, "Flush", db_name)
time.sleep(60)
# revoke privilege with default db
self.utility_wrap.role_revoke("Collection", collection_w.name, "Flush", ct.default_db)
self.utility_wrap.role_revoke("Collection", collection_w.name, "Flush", db_name)
time.sleep(60)
# re-connect with new user and db
self.connection_wrap.disconnect(alias=ct.default_alias)
self.connection_wrap.connect(host=host, port=port, user=user, password=pwd,
db_name=ct.default_db, secure=cf.param_info.param_secure,
check_task=ct.CheckTasks.ccr)
# verify user flush collection with db
collection_w.flush(check_task=CheckTasks.check_permission_deny)
# set using db to default db and verify privilege
self.database_wrap.using_database(db_name)
collection_w.flush(check_task=CheckTasks.check_permission_deny)
@pytest.mark.tags(CaseLabel.RBAC)
def test_list_grant_db(self, host, port):
"""
target: test list grant with db
method: 1.list grant with granted db and default db
expected: verify role grants with different db
"""
# root connect
self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# create collection in db
db_name = cf.gen_unique_str("db")
self.database_wrap.create_database(db_name)
# grant role collection * All privilege
_, _, role_name = self.init_user_with_privilege("Global", "*", "All", db_name)
time.sleep(60)
log.debug(f"role name: {role_name}")
# list grant with db and verify
grants, _ = self.utility_wrap.role_list_grants(db_name=db_name)
for g in grants.groups:
assert g.object == "Global"
assert g.object_name == "*"
assert g.privilege == "All"
assert g.db_name == db_name
log.info(self.utility_wrap.role.name)
grant_db, _ = self.utility_wrap.role_list_grant(object="Global", object_name="*", db_name=db_name)
for g in grant_db.groups:
assert g.object == "Global"
assert g.object_name == "*"
assert g.privilege == "All"
assert g.db_name == db_name
grant_default, _ = self.utility_wrap.role_list_grant("Global", "*", db_name=ct.default_db)
assert len(grant_default.groups) == 0
@pytest.mark.tags(CaseLabel.RBAC)
def test_list_grants_db(self, host, port):
"""
target: test list grants with db
method: 1.list grants with granted db and default db
expected: verify role grants with different db
"""
# root connect
self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# create collection in db
db_name = cf.gen_unique_str(prefix)
self.database_wrap.create_database(db_name)
# grant role collection flush privilege
self.init_user_with_privilege("Global", "*", "All", db_name)
self.utility_wrap.role_grant("User", "*", "UpdateUser", db_name)
time.sleep(60)
# list grants with db and verify
grants, _ = self.utility_wrap.role_list_grants(db_name=db_name)
assert len(grants.groups) == 2
for g in grants.groups:
if g.object == "Global":
assert g.object_name == "*"
assert g.privilege == "All"
elif g.object == "User":
assert g.object_name == "*"
assert g.privilege == "UpdateUser"
else:
raise Exception(f"{db_name} should't have the privilege {g.object_name}, Please check ")
# verify the role no grant in default db
grants_default, _ = self.utility_wrap.role_list_grants(db_name=ct.default_db)
assert len(grants_default.groups) == 0
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_connect(self, host, port):
"""
target: test grant user with db and connect with db
method: 1. grand role privilege in the default db
2. user connect with default db
3. verify db privilege
expected: succ
"""
# root connect
self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password,
secure=cf.param_info.param_secure)
# grant global privilege to default db
tmp_user, tmp_pwd, tmp_role = self.init_user_with_privilege("User", "*", "SelectUser", ct.default_db)
time.sleep(60)
# re-connect
self.connection_wrap.disconnect(ct.default_alias)
self.connection_wrap.connect(host=host, port=port, user=tmp_user, password=tmp_pwd,
secure=cf.param_info.param_secure)
# list collections
self.utility_wrap.list_users(False)
self.utility_wrap.list_collections()
self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name,
check_task=CheckTasks.check_permission_deny)
@pytest.mark.tags(CaseLabel.RBAC)
def test_alias_rbac(self, host, port):
"""
target: test rbac related to alias interfaces
method: Create a role and grant privileges related to aliases.
Verify if a user can execute the corresponding alias interface
based on whether the user possesses the role.
expected: Users with the assigned role can access the alias interface,
while those without the role cannot.
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
c_name = cf.gen_unique_str(prefix)
alias_name = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
user2 = cf.gen_unique_str(prefix)
u2, _ = self.utility_wrap.create_user(user=user2, password=password)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_add_user(user)
db_kwargs = {}
# grant user privilege
self.utility_wrap.init_role(r_name)
alias_privileges = [
{"object": "Global", "object_name": "*", "privilege": "CreateAlias"},
{"object": "Global", "object_name": "*", "privilege": "DropAlias"},
{"object": "Global", "object_name": "*", "privilege": "DescribeAlias"},
{"object": "Global", "object_name": "*", "privilege": "ListAliases"},
]
for grant_item in alias_privileges:
self.utility_wrap.role_grant(grant_item["object"], grant_item["object_name"], grant_item["privilege"],
**db_kwargs)
time.sleep(60)
self.init_collection_wrap(name=c_name)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
self.utility_wrap.create_alias(c_name, alias_name)
self.utility_wrap.drop_alias(alias_name)
self.connection_wrap.disconnect(alias=DefaultConfig.DEFAULT_USING)
self.connection_wrap.connect(host=host, port=port, user=user2,
password=password, check_task=ct.CheckTasks.ccr, **db_kwargs)
# user2 can not create or drop alias
self.utility_wrap.create_alias(c_name, alias_name,
check_task=CheckTasks.check_permission_deny)
self.utility_wrap.drop_alias(alias_name,
check_task=CheckTasks.check_permission_deny)
class TestUtilityNegativeRbac(TestcaseBase):
def teardown_method(self, method):
"""
teardown method: drop role and user
"""
log.info("[utility_teardown_method] Start teardown utility test cases ...")
self.connection_wrap.connect(host=cf.param_info.param_host, port=cf.param_info.param_port, user=ct.default_user,
password=ct.default_password, secure=cf.param_info.param_secure)
# drop users
users, _ = self.utility_wrap.list_users(False)
for u in users.groups:
if u.username != ct.default_user:
self.utility_wrap.delete_user(u.username)
user_groups, _ = self.utility_wrap.list_users(False)
assert len(user_groups.groups) == 1
role_groups, _ = self.utility_wrap.list_roles(False)
# drop roles
for role_group in role_groups.groups:
if role_group.role_name not in ['admin', 'public']:
self.utility_wrap.init_role(role_group.role_name)
g_list, _ = self.utility_wrap.role_list_grants()
for g in g_list.groups:
self.utility_wrap.role_revoke(g.object, g.object_name, g.privilege)
self.utility_wrap.role_drop()
role_groups, _ = self.utility_wrap.list_roles(False)
assert len(role_groups.groups) == 2
# drop database
databases, _ = self.database_wrap.list_database()
for db_name in databases:
self.database_wrap.using_database(db_name)
for c_name in self.utility_wrap.list_collections()[0]:
self.utility_wrap.drop_collection(c_name)
if db_name != ct.default_db:
self.database_wrap.drop_database(db_name)
super().teardown_method(method)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", ["longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong"
"longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong"
"longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong"
"longlonglonglong",
"n%$#@!", "123n", " ", "''", "test-role", "ff ff", "中文"])
def test_create_role_with_invalid_name(self, name, host, port):
"""
target: create role with invalid name
method: create role with invalid name
expected: create fail
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role(name)
error = {"err_code": 1100, "err_msg": "invalid parameter"}
self.utility_wrap.create_role(check_task=CheckTasks.err_res, check_items=error)
# get roles
role_groups, _ = self.utility_wrap.list_roles(False)
# drop roles
for role_group in role_groups.groups:
if role_group.role_name not in ['admin', 'public']:
self.utility_wrap.init_role(role_group.role_name)
g_list, _ = self.utility_wrap.role_list_grants()
for g in g_list.groups:
self.utility_wrap.role_revoke(g.object, g.object_name, g.privilege)
self.utility_wrap.role_drop()
role_groups, _ = self.utility_wrap.list_roles(False)
assert len(role_groups.groups) == 2
@pytest.mark.tags(CaseLabel.RBAC)
def test_create_exist_role(self, host, port):
"""
target: check create an exist role fail
method: double create a role with same name
expected: fail to create
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
assert self.utility_wrap.role_is_exist()[0]
error = {"err_code": 65535,
"err_msg": "role [name:\"%s\"] already exists" % r_name}
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role(check_task=CheckTasks.err_res, check_items=error)
self.utility_wrap.role_drop()
assert not self.utility_wrap.role_is_exist()[0]
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", ["admin", "public"])
def test_drop_admin_and_public_role(self, name, host, port):
"""
target: drop admin and public role fail
method: drop admin and public role fail
expected: fail to drop
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(name)
assert self.utility_wrap.role_is_exist()[0]
error = {"err_code": 1401,
"err_msg": "the role[%s] is a default role, which can't be dropped: "
"privilege not permitted" % name}
self.utility_wrap.role_drop(check_task=CheckTasks.err_res, check_items=error)
assert self.utility_wrap.role_is_exist()[0]
@pytest.mark.tags(CaseLabel.RBAC)
def test_drop_role_which_not_exist(self, host, port):
"""
target: drop role which not exist fail
method: drop role which not exist
expected: fail to drop
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
assert not self.utility_wrap.role_is_exist()[0]
error = {"err_code": 36,
"err_msg": "the role isn\'t existed"}
self.utility_wrap.role_drop(check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_add_user_not_exist_role(self, host, port):
"""
target: add user to not exist role
method: create a user,add user to not exist role
expected: fail to add
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
self.utility_wrap.init_role(r_name)
assert not self.utility_wrap.role_is_exist()[0]
error = {"err_code": 65535,
"err_msg": "not found the role, maybe the role isn't existed or internal system error"}
self.utility_wrap.role_add_user(user, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_add_not_exist_user_to_role(self, host, port):
"""
target: add not exist user to role
method: create a role,add not exist user to role
expected: fail to add
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
assert self.utility_wrap.role_is_exist()[0]
error = {"err_code": 65535,
"err_msg": "not found the user, maybe the user isn't existed or internal system error"}
self.utility_wrap.role_remove_user(user)
self.utility_wrap.role_add_user(user, check_task=CheckTasks.err_res, check_items=error)
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("issue #29025")
@pytest.mark.parametrize("name", ["admin", "public"])
def test_remove_root_from_default_role(self, name, host, port):
"""
target: remove root from admin role or public role
method: remove root from admin role or public role
expected: remove success
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role(name)
error = {"err_code": 37,
"err_msg": "fail to operate user to role"}
self.utility_wrap.role_remove_user("root", check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("issue #29023")
def test_remove_user_from_unbind_role(self, host, port):
"""
target: remove user from unbind role
method: create new role and new user, remove user from unbind role
expected: fail to remove
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
assert self.utility_wrap.role_is_exist()[0]
self.utility_wrap.role_grant("Global", "*", "All")
self.utility_wrap.role_add_user(user)
self.utility_wrap.role_list_grants()
error = {"err_code": 37,
"err_msg": "fail to operate user to role"}
self.utility_wrap.role_remove_user(user, check_task=CheckTasks.err_res, check_items=error)
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
def test_remove_user_from_empty_role(self, host, port):
"""
target: remove not exist user from role
method: create new role, remove not exist user from unbind role
expected: fail to remove
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
password = cf.gen_unique_str(prefix)
u, _ = self.utility_wrap.create_user(user=user, password=password)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
assert not self.utility_wrap.role_is_exist()[0]
error = {"err_code": 65535,
"err_msg": "not found the role, maybe the role isn't existed or internal system error"}
self.utility_wrap.role_remove_user(user, check_task=CheckTasks.err_res, check_items=error)
users, _ = self.utility_wrap.role_get_users()
assert user not in users
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip("issue #29023")
def test_remove_not_exist_user_from_role(self, host, port):
"""
target: remove not exist user from role
method: create new role, remove not exist user from unbind role
expected: fail to remove
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
user = cf.gen_unique_str(prefix)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
assert self.utility_wrap.role_is_exist()[0]
error = {"err_code": 37,
"err_msg": "fail to check the username"}
self.utility_wrap.role_remove_user(user, check_task=CheckTasks.err_res, check_items=error)
users, _ = self.utility_wrap.role_get_users()
assert user not in users
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
def test_drop_role_with_bind_privilege(self, host, port):
"""
target: drop role with bind privilege
method: create a new role,grant role privilege,drop it
expected: fail to drop
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
self.utility_wrap.role_grant("Collection", "*", "*")
error = {"err_code": 36,
"err_msg": "fail to drop the role that it has privileges. Use REVOKE API to revoke privileges"}
self.utility_wrap.role_drop(check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_list_grant_by_not_exist_role(self, host, port):
"""
target: list grants by not exist role
method: list grants by not exist role
expected: fail to list
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
error = {"err_code": 65535,
"err_msg": "not found the role, maybe the role isn't existed or internal system error"}
self.utility_wrap.role_list_grants(check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_list_grant_by_role_and_not_exist_object(self, host, port):
"""
target: list grants by role and not exist object
method: list grants by role and not exist object
expected: fail to list
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
o_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
error = {"err_code": 65535,
"err_msg": f"not found the object type[name: {o_name}], supported the object types: [Global User "
f"Collection]"}
self.utility_wrap.role_list_grant(o_name, "*", check_task=CheckTasks.err_res, check_items=error)
self.utility_wrap.role_drop()
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_privilege_with_object_not_exist(self, host, port):
"""
target: grant privilege with not exist object
method: grant privilege with not exist object
expected: fail to grant
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
o_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
error = {"err_code": 65535,
"err_msg": "not found the object type[name: %s], supported the object types: "
"[Global User Collection]" % o_name}
self.utility_wrap.role_grant(o_name, "*", "*", check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_privilege_with_privilege_not_exist(self, host, port):
"""
target: grant privilege with not exist privilege
method: grant privilege with not exist privilege
expected: fail to grant
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
p_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
error = {"err_code": 65535, "err_msg": "not found the privilege name[%s]" % p_name}
self.utility_wrap.role_grant("Global", "*", p_name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_revoke_privilege_with_object_not_exist(self, host, port):
"""
target: revoke privilege with not exist object
method: revoke privilege with not exist object
expected: fail to revoke
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
o_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
error = {"err_code": 65535,
"err_msg": "not found the object type[name: %s], supported the object types: "
"[Collection Global User]" % o_name}
self.utility_wrap.role_revoke(o_name, "*", "*", check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_revoke_privilege_with_privilege_not_exist(self, host, port):
"""
target: revoke privilege with not exist privilege
method: revoke privilege with not exist privilege
expected: fail to revoke
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
r_name = cf.gen_unique_str(prefix)
p_name = cf.gen_unique_str(prefix)
self.utility_wrap.init_role(r_name)
self.utility_wrap.create_role()
error = {"err_code": 65535, "err_msg": "not found the privilege name[%s]" % p_name}
self.utility_wrap.role_revoke("Global", "*", p_name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_privilege_with_db_not_exist(self, host, port):
"""
target: test grant with an not existed db
method: 1.root user create role -> grant privilege -> bind user
2.new user connent with db
3.list collection
expected: database not exist
"""
# root connect
self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
db_name = cf.gen_unique_str("db")
user, pwd, _ = self.init_user_with_privilege("Global", "*", "All", db_name)
# using the granted and not existing db
self.database_wrap.using_database(db_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 2, ct.err_msg: "database not found"})
# re-connect with new user and db
self.connection_wrap.disconnect(alias=ct.default_alias)
self.connection_wrap.connect(host=host, port=port, user=user, password=pwd, db_name=db_name,
secure=cf.param_info.param_secure, check_task=CheckTasks.err_res,
check_items={ct.err_code: 2, ct.err_msg: "database not found"})
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_privilege_with_collection_not_exist(self, host, port):
"""
target: test grant collection object privilege with not existed collection or other's db collection
method: 1.root connect and create db_a and create collection in db_a
2.create db_b and create a user and role, grant not existed collection privilege to role
3.grant db_a's collection with db_b privilege to role
expected: 1.grant with invalid db or collection no error
2.use db and operator collection gets permission deny
"""
# root connect
self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# create collection in db_a_xxx
db_a = cf.gen_unique_str("db_a")
self.database_wrap.create_database(db_a)
# create collection in db_a
self.database_wrap.using_database(db_a)
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
# create db
db_b = cf.gen_unique_str("db_b")
self.database_wrap.create_database(db_b)
# grant role privilege: collection not existed -> no error
user, pwd, role = self.init_user_with_privilege("Collection", "rbac", "Flush", db_name=db_b)
time.sleep(60)
# grant role privilege: db_a collection provilege with database db_b
self.utility_wrap.role_grant("Collection", collection_w.name, "Flush", db_name=db_b)
# list grants and verify collection privilege has a not existed collection
grants, _ = self.utility_wrap.role_list_grants(db_name=db_b)
for g in grants.groups:
assert g.object == "Collection"
assert g.object_name in [collection_w.name, "rbac"]
assert g.privilege == "Flush"
# re-connect with new user and db
self.connection_wrap.disconnect(alias=ct.default_alias)
self.connection_wrap.connect(host=host, port=port, user=user, password=pwd,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# collection flush with db_b permission
self.database_wrap.using_database(db_b)
collection_w.flush(check_task=CheckTasks.err_res,
check_items={ct.err_code: 100, ct.err_msg: "collection not found"})
self.database_wrap.using_database(db_a)
collection_w.flush(check_task=CheckTasks.check_permission_deny)
@pytest.mark.tags(CaseLabel.RBAC)
def test_revoke_db_not_existed(self, host, port):
"""
target: test revoke with not existed db
method: 1. init a user and role by a root user
2. revoke role privilege with a not existed db
expected: revoke failed
"""
# root connect
self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# grant role privilege: collection not existed -> no error
db_name = cf.gen_unique_str("db")
user, pwd, role = self.init_user_with_privilege("Global", "*", "All", ct.default_db)
self.utility_wrap.role_revoke("Global", "*", "All", db_name)
self.database_wrap.using_database(db_name, check_task=CheckTasks.err_res,
check_items={ct.err_code: 800, ct.err_msg: "database not exist"})
self.database_wrap.create_database(db_name)
self.utility_wrap.role_grant("Global", "*", "All", db_name)
self.database_wrap.using_database(db_name)
self.utility_wrap.list_users(True)
@pytest.mark.tags(CaseLabel.RBAC)
def test_list_grant_db_non_existed(self, host, port):
"""
target: test list grant with not existed db
method: list grant with an random db name
expected: empty grant
"""
# root connect
self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# grant role privilege: collection not existed -> no error
self.init_user_with_privilege("Global", "*", "All", ct.default_db)
# list grant with a not-existed db
grant, _ = self.utility_wrap.role_list_grant("Global", "*", "All", cf.gen_unique_str())
assert len(grant.groups) == 0
# list grants with a not-existed db
grants, _ = self.utility_wrap.role_list_grants(cf.gen_unique_str())
assert len(grants.groups) == 0
@pytest.mark.tags(CaseLabel.RBAC)
def test_admin_public_role_privilege_all_dbs(self, host, port):
"""
target: test admin role have privileges in all dbs
method: 1. create a user and bind admin role
2. create different dbs and create collection in db
3. user connect and list collections using different db
expected: verify list succ
"""
# root connect
self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# create a user and bind user to admin role
username = cf.gen_unique_str("user")
pwd = cf.gen_unique_str()
self.utility_wrap.create_user(username, pwd)
self.utility_wrap.init_role("admin")
self.utility_wrap.role_add_user(username)
time.sleep(60)
# create db_a and create collection in db_a
db_a = cf.gen_unique_str("a")
self.database_wrap.create_database(db_a)
self.database_wrap.using_database(db_a)
# collection in db_a
coll_a = cf.gen_unique_str("a")
collection_w_a = self.init_collection_wrap(name=coll_a)
collection_w_a.insert(cf.gen_default_dataframe_data(nb=100))
# create db_b and create collection in db_b
db_b = cf.gen_unique_str("b")
self.database_wrap.create_database(db_b)
self.database_wrap.using_database(db_b)
# collection in db_b
coll_b = cf.gen_unique_str("b")
collection_w_b = self.init_collection_wrap(name=coll_b)
collection_w_b.insert(cf.gen_default_dataframe_data(nb=200))
# re-connect with new user and db
self.connection_wrap.disconnect(alias=ct.default_alias)
self.connection_wrap.connect(host=host, port=port, user=username, password=pwd,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# using db_a
self.database_wrap.using_database(db_a)
assert collection_w_a.num_entities == 100
collections_a, _ = self.utility_wrap.list_collections()
assert coll_a in collections_a
self.utility_wrap.list_users(True)
# using db_b
self.database_wrap.using_database(db_b)
assert collection_w_b.num_entities == 200
collections_b, _ = self.utility_wrap.list_collections()
assert coll_b in collections_b
self.utility_wrap.list_users(True)
# create a user and bind user to public role
p_username = cf.gen_unique_str("public")
p_pwd = cf.gen_unique_str()
self.utility_wrap.create_user(p_username, p_pwd)
self.utility_wrap.init_role("public")
self.utility_wrap.role_add_user(p_username)
time.sleep(60)
# re-connect with new user and db
self.connection_wrap.disconnect(alias=ct.default_alias)
self.connection_wrap.connect(host=host, port=port, user=p_username, password=p_pwd,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# using default db
self.database_wrap.using_database(ct.default_db)
self.utility_wrap.list_collections()
# using db a
self.database_wrap.using_database(db_a)
collections_a, _ = self.utility_wrap.list_collections()
assert coll_a in collections_a
# using db b
self.database_wrap.using_database(db_b)
collections_b, _ = self.utility_wrap.list_collections()
assert coll_b in collections_b
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_not_existed_collection_privilege(self, host, port):
"""
target: test grant privilege with inconsistent collection and db
method: 1. create a collection in the default db
2. create a db
3. create a user and grant all privileges with the collection and db (collection not in this db)
expected: collection not exist
"""
# root connect
self.connection_wrap.connect(host=host, port=port, user=ct.default_user, password=ct.default_password,
secure=cf.param_info.param_secure, check_task=ct.CheckTasks.ccr)
# create a collection in the default db
coll_name = cf.gen_unique_str("coll")
collection_w = self.init_collection_wrap(name=coll_name)
# create a db
db_name = cf.gen_unique_str("db")
self.database_wrap.create_database(db_name)
# grant role privilege: collection not existed in the db -> no error
user, pwd, role = self.init_user_with_privilege("Collection", coll_name, "Flush", db_name)
time.sleep(60)
# re-connect with new user and granted db
self.connection_wrap.disconnect(alias=ct.default_alias)
self.connection_wrap.connect(host=host, port=port, user=user, password=pwd, secure=cf.param_info.param_secure,
db_name=db_name, check_task=ct.CheckTasks.ccr)
# operate collection in the granted db
collection_w.flush(check_task=CheckTasks.err_res,
check_items={ct.err_code: 100, ct.err_msg: "CollectionNotExists"})
# operate collection in the default db
self.database_wrap.using_database(ct.default_db)
collection_w.flush(check_task=CheckTasks.check_permission_deny)
@pytest.mark.tags(CaseLabel.RBAC)
def test_create_over_max_roles(self, host, port):
"""
target: test create roles over max num
method: test create role with random name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
# 2 original roles: admin, public
for i in range(ct.max_role_num - 2):
role_name = "role_" + str(i)
self.utility_wrap.init_role(role_name, check_task=CheckTasks.check_role_property,
check_items={exp_name: role_name})
self.utility_wrap.create_role()
assert self.utility_wrap.role_is_exist()[0]
# now total 10 roles, create a new one will report error
self.utility_wrap.init_role("role_11")
error = {ct.err_code: 35,
ct.err_msg: "unable to create role because the number of roles has reached the limit"}
self.utility_wrap.create_role(check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.L3)
class TestUtilityFlushAll(TestcaseBase):
@pytest.mark.parametrize("with_db", [True, False])
def test_flush_all_multi_collections(self, with_db):
"""
target: test flush multi collections
method: 1.create multi collections
2. insert data into each collections without flushing
3. delete some data
4. flush all collections
5. verify num entities
6. create index -> load -> query deleted ids -> query no-deleted ids
expected: the insert and delete data of all collections are flushed
"""
collection_num = 5
collection_names = []
delete_num = 100
self._connect()
if with_db:
db_name = cf.gen_unique_str("db")
self.database_wrap.create_database(db_name)
self.database_wrap.using_database(db_name)
for i in range(collection_num):
collection_w, _, _, insert_ids, _ = self.init_collection_general(prefix, insert_data=True, is_flush=False,
is_index=True)
collection_w.delete(f'{ct.default_int64_field_name} in {insert_ids[:delete_num]}')
collection_names.append(collection_w.name)
self.utility_wrap.flush_all(timeout=300)
cw = ApiCollectionWrapper()
for c in collection_names:
cw.init_collection(name=c)
assert cw.num_entities_without_flush == ct.default_nb
cw.create_index(ct.default_float_vec_field_name, ct.default_flat_index)
cw.load()
cw.query(f'{ct.default_int64_field_name} in {insert_ids[:100]}', check_task=CheckTasks.check_query_empty)
res, _ = cw.query(f'{ct.default_int64_field_name} not in {insert_ids[:delete_num]}')
assert len(res) == ct.default_nb - delete_num
def test_flush_all_db_collections(self):
"""
target: test flush all db's collections
method: 1. create collections and insert data in multi dbs
2. flush all
3. verify num entities
expected: the insert and delete data of all db's collections are flushed
"""
tmp_nb = 100
db_num = 3
collection_num = 2
collection_names = {}
delete_num = 10
self._connect()
for d in range(db_num):
db_name = cf.gen_unique_str("db")
self.database_wrap.create_database(db_name)
self.database_wrap.using_database(db_name)
db_collection_names = []
for i in range(collection_num):
# create collection
collection_w = self.init_collection_wrap()
# insert
insert_res, _ = collection_w.insert(cf.gen_default_dataframe_data(tmp_nb))
# create index
collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index)
# delete
collection_w.delete(f'{ct.default_int64_field_name} in {insert_res.primary_keys[:delete_num]}')
db_collection_names.append(collection_w.name)
collection_names[db_name] = db_collection_names
self.utility_wrap.list_collections()
# flush all db collections
self.utility_wrap.flush_all(timeout=600)
for _db, _db_collection_names in collection_names.items():
self.database_wrap.using_database(_db)
self.utility_wrap.list_collections()
for c in _db_collection_names:
cw = ApiCollectionWrapper()
cw.init_collection(name=c)
assert cw.num_entities_without_flush == tmp_nb
cw.create_index(ct.default_float_vec_field_name, ct.default_flat_index)
cw.load()
cw.query(f'{ct.default_int64_field_name} in {insert_res.primary_keys[:delete_num]}',
check_task=CheckTasks.check_query_empty)
res, _ = cw.query(f'{ct.default_int64_field_name} not in {insert_res.primary_keys[:delete_num]}')
assert len(res) == tmp_nb - delete_num
def test_flush_empty_db_collections(self):
"""
target: test flush empty db collections
method: 1. create collection and insert in the default db
2. create a db and using the db
3. flush all
expected: verify default db's collection flushed
"""
tmp_nb = 10
self._connect()
# create collection
self.database_wrap.using_database(ct.default_db)
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix))
# insert
insert_res, _ = collection_w.insert(cf.gen_default_dataframe_data(tmp_nb))
# create a db and using db
db_name = cf.gen_unique_str("db")
self.database_wrap.create_database(db_name)
self.database_wrap.using_database(db_name)
# flush all in the db and no exception
self.utility_wrap.flush_all(timeout=120)
# verify default db's collection not flushed
self.database_wrap.using_database(ct.default_db)
assert collection_w.num_entities_without_flush == tmp_nb
def test_flush_all_multi_shards_timeout(self):
"""
target: test flush_all collection with max shards_num
method: 1.create collection with max shards_num
2.insert data
3.flush all with a small timeout and gets exception
4.flush and verify num entities
expected:
"""
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=ct.max_shards_num)
df = cf.gen_default_dataframe_data()
collection_w.insert(df)
error = {ct.err_code: 1, ct.err_msg: "wait for flush all timeout"}
self.utility_wrap.flush_all(timeout=5, check_task=CheckTasks.err_res, check_items=error)
self.utility_wrap.flush_all(timeout=120)
assert collection_w.num_entities_without_flush == ct.default_nb
def test_flush_all_no_collections(self):
"""
target: test flush all without any collections
method: connect and flush all
expected: no exception
"""
self._connect()
self.utility_wrap.flush_all(check_task=None)
def test_flush_all_async(self):
"""
target: test flush all collections with _async
method: flush all collections and _async=True
expected: finish flush all collection within a period of time
"""
collection_num = 5
collection_names = []
delete_num = 100
for i in range(collection_num):
collection_w, _, _, insert_ids, _ = self.init_collection_general(prefix, insert_data=True, is_flush=False,
is_index=True)
collection_w.delete(f'{ct.default_int64_field_name} in {insert_ids[:delete_num]}')
collection_names.append(collection_w.name)
self.utility_wrap.flush_all(_async=True)
_async_timeout = 60
cw = ApiCollectionWrapper()
start = time.time()
while time.time() - start < _async_timeout:
time.sleep(2.0)
flush_flag = False
for c in collection_names:
cw.init_collection(name=c)
if cw.num_entities_without_flush == ct.default_nb:
flush_flag = True
else:
log.debug(f"collection num entities: {cw.num_entities_without_flush} of collection {c}")
flush_flag = False
if flush_flag:
break
if time.time() - start > _async_timeout:
raise MilvusException(1, f"Waiting more than {_async_timeout}s for the flush all")
def test_flush_all_while_insert_delete(self):
"""
target: test flush all while insert and delete
method: 1. prepare multi collections
2. flush_all while inserting and deleting
expected:
"""
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
collection_num = 5
collection_names = []
delete_num = 100
delete_ids = [_i for _i in range(delete_num)]
for i in range(collection_num):
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=4)
df = cf.gen_default_dataframe_data(nb=ct.default_nb, start=0)
collection_w.insert(df)
collection_names.append(collection_w.name)
def do_insert():
cw = ApiCollectionWrapper()
df = cf.gen_default_dataframe_data(nb=ct.default_nb, start=ct.default_nb)
for c_name in collection_names:
cw.init_collection(c_name)
insert_res, _ = cw.insert(df)
assert insert_res.insert_count == ct.default_nb
def do_delete():
cw = ApiCollectionWrapper()
for c_name in collection_names:
cw.init_collection(c_name)
del_res, _ = cw.delete(f"{ct.default_int64_field_name} in {delete_ids}")
assert del_res.delete_count == delete_num
def do_flush_all():
time.sleep(2)
self.utility_wrap.flush_all(timeout=600)
executor = ThreadPoolExecutor(max_workers=3)
insert_task = executor.submit(do_insert)
delete_task = executor.submit(do_delete)
flush_task = executor.submit(do_flush_all)
# wait all tasks completed
wait([insert_task, delete_task, flush_task], return_when=ALL_COMPLETED)
# verify
for c in collection_names:
cw = ApiCollectionWrapper()
cw.init_collection(name=c)
log.debug(f"num entities: {cw.num_entities_without_flush}")
assert cw.num_entities_without_flush >= ct.default_nb
assert cw.num_entities_without_flush <= ct.default_nb * 2
cw.create_index(ct.default_float_vec_field_name, ct.default_flat_index)
cw.load()
cw.query(f'{ct.default_int64_field_name} in {delete_ids}', check_task=CheckTasks.check_query_empty)
res, _ = cw.query(f'{ct.default_int64_field_name} not in {delete_ids}')
assert len(res) == ct.default_nb * 2 - delete_num
class TestUtilityNegativeRbacPrivilegeGroup(TestcaseBase):
def teardown_method(self, method):
"""
teardown method: drop role and user
"""
log.info("[utility_teardown_method] Start teardown utility test cases ...")
self.connection_wrap.connect(host=cf.param_info.param_host, port=cf.param_info.param_port, user=ct.default_user,
password=ct.default_password, secure=cf.param_info.param_secure)
# drop users
users, _ = self.utility_wrap.list_users(False)
for u in users.groups:
if u.username != ct.default_user:
self.utility_wrap.delete_user(u.username)
user_groups, _ = self.utility_wrap.list_users(False)
assert len(user_groups.groups) == 1
role_groups, _ = self.utility_wrap.list_roles(False)
# drop roles
for role_group in role_groups.groups:
if role_group.role_name not in ['admin', 'public']:
self.utility_wrap.init_role(role_group.role_name)
g_list, _ = self.utility_wrap.role_list_grants()
for g in g_list.groups:
self.utility_wrap.role_revoke(g.object, g.object_name, g.privilege)
privilege_groups = self.utility_wrap.list_privilege_groups()[0]
for privilege_group in privilege_groups.groups:
if privilege_group.privilege_group not in ct.built_in_privilege_groups:
self.utility_wrap.drop_privilege_group(privilege_group.privilege_group)
self.utility_wrap.role_drop()
role_groups, _ = self.utility_wrap.list_roles(False)
assert len(role_groups.groups) == 2
# drop database
databases, _ = self.database_wrap.list_database()
for db_name in databases:
self.database_wrap.using_database(db_name)
for c_name in self.utility_wrap.list_collections()[0]:
self.utility_wrap.drop_collection(c_name)
if db_name != ct.default_db:
self.database_wrap.drop_database(db_name)
super().teardown_method(method)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", [1, 1.0])
def test_create_privilege_group_with_privilege_group_name_invalid_type(self, name, host, port):
"""
target: create privilege group with invalid name
method: create privilege group with invalid name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role("role_1")
error = {"err_code": 1,
"err_msg": f"`privilege_group` value {name} is illegal"}
self.utility_wrap.create_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"])
def test_create_privilege_group_with_privilege_group_name_invalid_value_1(self, name, host, port):
"""
target: create privilege group with invalid name
method: create privilege group with invalid name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role("role_1")
error = {"err_code": 1100, "err_msg": f"privilege group name {name} can only contain numbers, letters and underscores: invalid parameter"}
self.utility_wrap.create_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip(reason="issue #37842")
@pytest.mark.parametrize("name", ["longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong"
"longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong"
"longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong"
"longlonglonglong", "123n", " ", "''", "中文"])
def test_create_privilege_group_with_privilege_group_name_invalid_value_2(self, name, host, port):
"""
target: create privilege group with invalid name
method: create privilege group with invalid name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role("role_1")
error = {"err_code": 1402, "err_msg": f"{name}: invalid privilege group name[privilegeGroup=the first character "
f"of a privilege group name %s must be an underscore or letter]"}
self.utility_wrap.create_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_create_privilege_group_with_built_in_privilege_groups_name(self, host, port):
"""
target: create privilege group with built in privilege groups name
method: create privilege group with built in privilege groups name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
for name in ct.built_in_privilege_groups:
error = {"err_code": 1100,
"err_msg": f"privilege group name [{name}] is defined by built in privileges or privilege groups in system: invalid parameter"}
self.utility_wrap.create_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", [1, 1.0])
def test_drop_privilege_group_with_privilege_group_name_invalid_type(self, name, host, port):
"""
target: drop privilege group with invalid name
method: drop privilege group with invalid name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role("role_1")
error = {"err_code": 1,
"err_msg": f"`privilege_group` value {name} is illegal"}
self.utility_wrap.drop_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"])
def test_drop_privilege_group_with_privilege_group_name_invalid_value_1(self, name, host, port):
"""
target: drop privilege group with invalid name
method: drop privilege group with invalid name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role("role_1")
error = {"err_code": 1100, "err_msg": f"privilege group name {name} can only contain numbers, letters and underscores: invalid parameter"}
self.utility_wrap.drop_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.skip(reason="issue #37842")
@pytest.mark.parametrize("name", ["longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong"
"longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong"
"longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong"
"longlonglonglong", "123n", " ", "''", "中文"])
def test_drop_privilege_group_with_privilege_group_name_invalid_value_2(self, name, host, port):
"""
target: drop privilege group with invalid name
method: drop privilege group with invalid name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role("role_1")
error = {"err_code": 1402, "err_msg": f"{name}: invalid privilege group name[privilegeGroup=the first character "
f"of a privilege group name %s must be an underscore or letter]"}
self.utility_wrap.drop_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_drop_not_exist_privilege_group(self, host, port):
"""
target: drop same privilege group twice
method: drop same privilege group twice
expected: drop successfully
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
name = "privilege_group_not_exist"
self.utility_wrap.drop_privilege_group(privilege_group=name)
@pytest.mark.tags(CaseLabel.RBAC)
def test_drop_same_privilege_group_twice(self, host, port):
"""
target: drop same privilege group twice
method: drop same privilege group twice
expected: drop successfully
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
name = "privilege_group_1"
self.utility_wrap.create_privilege_group(privilege_group=name)
privilege_groups = self.utility_wrap.list_privilege_groups()[0]
privilege_groups_extracted = []
for privilege_group in privilege_groups.groups:
privilege_groups_extracted.append(privilege_group.privilege_group)
assert name in privilege_groups_extracted
self.utility_wrap.drop_privilege_group(privilege_group=name)
privilege_groups = self.utility_wrap.list_privilege_groups()[0]
privilege_groups_extracted = []
for privilege_group in privilege_groups.groups:
privilege_groups_extracted.append(privilege_group.privilege_group)
assert name not in privilege_groups_extracted
self.utility_wrap.drop_privilege_group(privilege_group=name)
@pytest.mark.tags(CaseLabel.RBAC)
def test_drop_privilege_group_with_built_in_privilege_groups_name(self, host, port):
"""
target: drop privilege group with built in privilege groups name
method: drop privilege group with built in privilege groups name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
for name in ct.built_in_privilege_groups:
self.utility_wrap.drop_privilege_group(privilege_group=name)
privilege_groups = self.utility_wrap.list_privilege_groups()[0]
privilege_groups_extracted = []
for privilege_group in privilege_groups.groups:
privilege_groups_extracted.append(privilege_group.privilege_group)
assert len(privilege_groups_extracted) == len(ct.built_in_privilege_groups)
@pytest.mark.tags(CaseLabel.RBAC)
def test_drop_privilege_group_granted(self, host, port):
"""
target: drop the privilege group granted to one role
method: drop the privilege group granted to one role
expected: 1. raise exception
2. drop successfully after revoke
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
collection_w = self.init_collection_general(prefix)[0]
role_name = "role_1"
role = self.utility_wrap.init_role(role_name)[0]
role.create()
name = "privilege_group_1"
self.utility_wrap.create_privilege_group(privilege_group=name)
self.utility_wrap.role_grant_v2(name, collection_w.name)
error = {"err_code": 65535,
"err_msg": f"privilege group [{name}] is used by role [{role_name}], Use REVOKE API to revoke it first"}
self.utility_wrap.drop_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error)
self.utility_wrap.role_revoke_v2(name, collection_w.name)
self.utility_wrap.drop_privilege_group(privilege_group=name)
privilege_groups = self.utility_wrap.list_privilege_groups()[0]
privilege_groups_extracted = []
for privilege_group in privilege_groups.groups:
privilege_groups_extracted.append(privilege_group.privilege_group)
assert name not in privilege_groups_extracted
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", [1, 1.0])
def test_add_privileges_to_group_with_privilege_group_name_invalid_type(self, name, host, port):
"""
target: add privilege group with invalid name
method: add privilege group with invalid name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role("role_1")
error = {"err_code": 1,
"err_msg": f"`privilege_group` value {name} is illegal"}
self.utility_wrap.add_privileges_to_group(privilege_group=name, privileges=["Insert"],
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"])
def test_add_privileges_to_group_with_privilege_group_name_invalid_value(self, name, host, port):
"""
target: add privilege group with invalid name
method: add privilege group with invalid name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role("role_1")
error = {"err_code": 1100, "err_msg": f"privilege group name {name} can only contain numbers, letters and underscores: invalid parameter"}
self.utility_wrap.add_privileges_to_group(privilege_group=name, privileges=["Insert"], check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_add_privilege_into_not_exist_privilege_group(self, host, port):
"""
target: add privilege into not exist privilege group
method: add privilege into not exist privilege group
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
name = "privilege_group_not_exist"
error = {"err_code": 1100,
"err_msg": f"there is no privilege group name [{name}] to operate: invalid parameter"}
self.utility_wrap.add_privileges_to_group(privilege_group=name, privileges=["Insert"],
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_add_privilege_into_built_in_privilege_group(self, host, port):
"""
target: add privilege into not exist privilege group
method: add privilege into not exist privilege group
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
for name in ct.built_in_privilege_groups:
error = {"err_code": 1100,
"err_msg": f"the privilege group name [{name}] is defined "
f"by built in privilege groups in system: invalid parameter"}
self.utility_wrap.add_privileges_to_group(privilege_group=name, privileges=["Insert"],
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", [1, 1.0, "n%$#@!", "test-role", "ff ff"])
def test_add_privileges_to_group_with_privilege_invalid_type(self, name, host, port):
"""
target: add privilege group with invalid name
method: add privilege group with invalid name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role("role_1")
error = {"err_code": 1,
"err_msg": f"`privileges` value {name} is illegal"}
self.utility_wrap.add_privileges_to_group(privilege_group="privilege_group_1", privileges=name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_add_privileges_to_group_with_privilege_invalid_value(self, host, port):
"""
target: add privilege group with invalid name
method: add privilege group with invalid name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
name = "privilege_group_1"
self.utility_wrap.create_privilege_group(privilege_group=name)
privilege_name = "invalid_privilege"
error = {"err_code": 1100, "err_msg": f"there is no privilege name or privilege group name [{privilege_name}] "
f"defined in system to operate: invalid parameter"}
self.utility_wrap.add_privileges_to_group(privilege_group="privilege_group_1", privileges=[privilege_name], check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", [1, 1.0])
def test_remove_privileges_to_group_with_privilege_group_name_invalid_type(self, name, host, port):
"""
target: remove privilege group with invalid name
method: remove privilege group with invalid name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role("role_1")
error = {"err_code": 1,
"err_msg": f"`privilege_group` value {name} is illegal"}
self.utility_wrap.remove_privileges_from_group(privilege_group=name, privileges=["Insert"],
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"])
def test_remove_privileges_to_group_with_privilege_group_name_invalid_value(self, name, host, port):
"""
target: remove privilege group with invalid name
method: remove privilege group with invalid name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role("role_1")
error = {"err_code": 1100, "err_msg": f"privilege group name {name} can only contain numbers, letters and underscores: invalid parameter"}
self.utility_wrap.remove_privileges_from_group(privilege_group=name, privileges=["Insert"], check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_remove_privilege_into_not_exist_privilege_group(self, host, port):
"""
target: remove privilege into not exist privilege group
method: remove privilege into not exist privilege group
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
name = "privilege_group_not_exist"
error = {"err_code": 1100,
"err_msg": f"there is no privilege group name [{name}] to operate: invalid parameter"}
self.utility_wrap.remove_privileges_from_group(privilege_group=name, privileges=["Insert"],
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_remove_privilege_into_built_in_privilege_group(self, host, port):
"""
target: remove privilege into not exist privilege group
method: remove privilege into not exist privilege group
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
for name in ct.built_in_privilege_groups:
error = {"err_code": 1100,
"err_msg": f"the privilege group name [{name}] is defined "
f"by built in privilege groups in system: invalid parameter"}
self.utility_wrap.remove_privileges_from_group(privilege_group=name, privileges=["Insert"],
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", [1, 1.0, "n%$#@!", "test-role", "ff ff"])
def test_remove_privileges_to_group_with_privilege_invalid_type(self, name, host, port):
"""
target: remove privilege group with invalid name
method: remove privilege group with invalid name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role("role_1")
error = {"err_code": 1,
"err_msg": f"`privileges` value {name} is illegal"}
self.utility_wrap.remove_privileges_from_group(privilege_group="privilege_group_1", privileges=name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_remove_privileges_to_group_with_privilege_invalid_value(self, host, port):
"""
target: remove privilege group with invalid name
method: remove privilege group with invalid name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
name = "privilege_group_1"
self.utility_wrap.create_privilege_group(privilege_group=name)
privilege_name = "invalid_privilege"
error = {"err_code": 1100, "err_msg": f"there is no privilege name or privilege group name [{privilege_name}] "
f"defined in system to operate: invalid parameter"}
self.utility_wrap.remove_privileges_from_group(privilege_group="privilege_group_1", privileges=[privilege_name],
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", [1, 1.0, "n%$#@!", "test-role", "ff ff"])
def test_remove_privileges_to_group_with_privilege_invalid_type(self, name, host, port):
"""
target: remove privilege group with invalid name
method: remove privilege group with invalid name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.init_role("role_1")
error = {"err_code": 1,
"err_msg": f"`privileges` value {name} is illegal"}
self.utility_wrap.remove_privileges_from_group(privilege_group="privilege_group_1", privileges=name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", [1, 1.0])
def test_grant_v2_privilege_invalid_type(self, name, host, port):
"""
target: grant v2 with invalid privilege name
method: grant v2 with invalid privilege name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
collection_w = self.init_collection_general(prefix)[0]
role = self.utility_wrap.init_role("role_1")[0]
role.create()
error = {"err_code": 1,
"err_msg": f"`privilege` value {name} is illegal"}
self.utility_wrap.role_grant_v2(privilege=name, collection_name=collection_w.name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_v2_privilege_invalid_value(self, host, port):
"""
target: grant v2 with invalid privilege name
method: grant v2 with invalid privilege name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
collection_w = self.init_collection_general(prefix)[0]
role = self.utility_wrap.init_role("role_1")[0]
role.create()
name = "privilege_group_1"
self.utility_wrap.create_privilege_group(privilege_group=name)
privilege_name = "invalid_privilege"
error = {"err_code": 65535, "err_msg": f"not found the privilege name[{privilege_name}]"}
self.utility_wrap.role_grant_v2(privilege=privilege_name, collection_name=collection_w.name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", [1, 1.0])
def test_grant_v2_collection_name_invalid_type(self, name, host, port):
"""
target: grant v2 with invalid collection name
method: grant v2 with invalid collection name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
self.utility_wrap.create_privilege_group(privilege_group="privilege_group_1")
error = {"err_code": 1, "err_msg": f"`privilege` value {name} is illegal"}
self.utility_wrap.role_grant_v2(privilege=name, collection_name=name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_v2_not_exist_collection(self, host, port):
"""
target: grant v2 with not exist collection
method: grant v2 with not exist collection
expected: grant successfully
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
name = "privilege_group_1"
self.utility_wrap.create_privilege_group(privilege_group=name)
self.utility_wrap.role_grant_v2(privilege=name, collection_name="not_exist_collection")
self.utility_wrap.role_revoke_v2(privilege=name, collection_name="not_exist_collection")
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", [1, 1.0])
def test_grant_v2_db_name_invalid_type(self, name, host, port):
"""
target: grant v2 with invalid collection name
method: grant v2 with invalid collection name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
self.utility_wrap.create_privilege_group(privilege_group="privilege_group_1")
error = {"err_code": 1, "err_msg": f"`db_name` value {name} is illegal"}
self.utility_wrap.role_grant_v2(privilege="privilege_group_1", collection_name="collection_name", db_name=name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"])
def test_grant_v2_db_name_invalid_value(self, name, host, port):
"""
target: grant v2 with invalid collection name
method: grant v2 with invalid collection name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
self.utility_wrap.create_privilege_group(privilege_group="privilege_group_1")
error = {"err_code": 802, "err_msg": f"database name can only contain numbers, letters and underscores: "
f"invalid database name[database={name}]"}
self.utility_wrap.role_grant_v2(privilege="privilege_group_1", collection_name="collection_name",
db_name=name, check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_v2_not_exist_db_name(self, host, port):
"""
target: grant v2 with invalid collection name
method: grant v2 with invalid collection name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
self.utility_wrap.create_privilege_group(privilege_group="privilege_group_1")
self.utility_wrap.role_grant_v2(privilege="privilege_group_1", collection_name="collection_name",
db_name="not_exist_db")
self.utility_wrap.role_revoke_v2(privilege="privilege_group_1", collection_name="collection_name",
db_name="not_exist_db")
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("name", [1, 1.0])
def test_revoke_v2_privilege_invalid_type(self, name, host, port):
"""
target: grant v2 with invalid privilege name
method: grant v2 with invalid privilege name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
collection_w = self.init_collection_general(prefix)[0]
role = self.utility_wrap.init_role("role_1")[0]
role.create()
error = {"err_code": 1,
"err_msg": f"`privilege` value {name} is illegal"}
self.utility_wrap.role_revoke_v2(privilege=name, collection_name=collection_w.name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_revoke_v2_privilege_invalid_value(self, host, port):
"""
target: grant v2 with invalid privilege name
method: grant v2 with invalid privilege name
expected: raise exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
collection_w = self.init_collection_general(prefix)[0]
role = self.utility_wrap.init_role("role_1")[0]
role.create()
name = "privilege_group_1"
self.utility_wrap.create_privilege_group(privilege_group=name)
privilege_name = "invalid_privilege"
error = {"err_code": 65535, "err_msg": f"not found the privilege name[{privilege_name}]"}
self.utility_wrap.role_revoke_v2(privilege=privilege_name, collection_name=collection_w.name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_v2_database_built_in_invalid_collection_name(self, host, port):
"""
target: grant/revoke v2 normal case
method: grant/revoke v2 normal case
expected: grant successfully
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
collection_w = self.init_collection_general(prefix)[0]
role = self.utility_wrap.init_role("role_1")[0]
role.create()
collection_name = collection_w.name
for name in ct.built_in_privilege_groups:
if name.startswith("Database") is not True:
continue
error = {"err_code": 1100, "err_msg": f"collectionName should be * for the database "
f"level privilege: {name}: invalid parameter"}
self.utility_wrap.role_grant_v2(privilege=name, collection_name=collection_name,
check_task=CheckTasks.err_res, check_items=error)
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_v2_cluster_built_in_invalid_collection_name(self, host, port):
"""
target: grant/revoke v2 normal case
method: grant/revoke v2 normal case
expected: grant successfully
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
collection_w = self.init_collection_general(prefix)[0]
role = self.utility_wrap.init_role("role_1")[0]
role.create()
for name in ct.built_in_privilege_groups:
if name.startswith("Cluster") is not True:
continue
error = {"err_code": 1100, "err_msg": f"dbName and collectionName should be * for the cluster "
f"level privilege: {name}: invalid parameter"}
self.utility_wrap.role_grant_v2(privilege=name, collection_name="*", db_name="default",
check_task=CheckTasks.err_res, check_items=error)
self.utility_wrap.role_grant_v2(privilege=name, collection_name="*",
check_task=CheckTasks.err_res, check_items=error)
class TestUtilityPositiveRbacPrivilegeGroup(TestcaseBase):
def teardown_method(self, method):
"""
teardown method: drop role and user
"""
log.info("[utility_teardown_method] Start teardown utility test cases ...")
self.connection_wrap.connect(host=cf.param_info.param_host, port=cf.param_info.param_port, user=ct.default_user,
password=ct.default_password, secure=cf.param_info.param_secure)
# drop users
users, _ = self.utility_wrap.list_users(False)
for u in users.groups:
if u.username != ct.default_user:
self.utility_wrap.delete_user(u.username)
user_groups, _ = self.utility_wrap.list_users(False)
assert len(user_groups.groups) == 1
role_groups, _ = self.utility_wrap.list_roles(False)
# drop roles
for role_group in role_groups.groups:
if role_group.role_name not in ['admin', 'public']:
self.utility_wrap.init_role(role_group.role_name)
g_list, _ = self.utility_wrap.role_list_grants()
for g in g_list.groups:
# self.utility_wrap.role_revoke(g.object, g.object_name, g.privilege)
if g.privilege.startswith("Cluster"):
self.utility_wrap.role_revoke_v2(g.privilege, "*", "*")
else:
self.utility_wrap.role_revoke_v2(g.privilege, g.object_name)
privilege_groups = self.utility_wrap.list_privilege_groups()[0]
for privilege_group in privilege_groups.groups:
if privilege_group.privilege_group not in ct.built_in_privilege_groups:
self.utility_wrap.drop_privilege_group(privilege_group.privilege_group)
self.utility_wrap.role_drop()
role_groups, _ = self.utility_wrap.list_roles(False)
assert len(role_groups.groups) == 2
# drop database
databases, _ = self.database_wrap.list_database()
for db_name in databases:
self.database_wrap.using_database(db_name)
for c_name in self.utility_wrap.list_collections()[0]:
self.utility_wrap.drop_collection(c_name)
if db_name != ct.default_db:
self.database_wrap.drop_database(db_name)
super().teardown_method(method)
@pytest.mark.tags(CaseLabel.RBAC)
def test_create_privilege_groups(self, host, port):
"""
target: create valid privilege groups
method: create valid privilege groups
expected: create successfully
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
name_1 = "privilege_group_1"
self.utility_wrap.create_privilege_group(privilege_group=name_1)
name_2 = "privilege_group_2"
self.utility_wrap.create_privilege_group(privilege_group=name_2)
privilege_groups = self.utility_wrap.list_privilege_groups()[0]
privilege_groups_extracted = []
for privilege_group in privilege_groups.groups:
privilege_groups_extracted.append(privilege_group.privilege_group)
assert name_1 in privilege_groups_extracted
assert name_2 in privilege_groups_extracted
@pytest.mark.tags(CaseLabel.RBAC)
def test_create_large_numbers_privilege_groups(self, host, port):
"""
target: create large numbers of privilege groups
method: create 100 privilege groups
expected: create successfully
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
number = 100
# 1. create privilege groups
for i in range(number):
name = f"privilege_group_{i}"
self.utility_wrap.create_privilege_group(privilege_group=name)
privilege_groups = self.utility_wrap.list_privilege_groups()[0]
privilege_groups_extracted = []
for privilege_group in privilege_groups.groups:
privilege_groups_extracted.append(privilege_group.privilege_group)
for i in range(number):
name = f"privilege_group_{i}"
assert name in privilege_groups_extracted
# 2. drop privilege groups
for i in range(number):
name = f"privilege_group_{i}"
self.utility_wrap.drop_privilege_group(privilege_group=name)
privilege_groups = self.utility_wrap.list_privilege_groups()[0]
privilege_groups_extracted = []
for privilege_group in privilege_groups.groups:
privilege_groups_extracted.append(privilege_group.privilege_group)
for i in range(number):
name = f"privilege_group_{i}"
assert name not in privilege_groups_extracted
@pytest.mark.tags(CaseLabel.RBAC)
def test_drop_privilege_groups(self, host, port):
"""
target: drop valid privilege groups
method: create valid privilege groups
expected: drop successfully
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
name_1 = "privilege_group_1"
self.utility_wrap.create_privilege_group(privilege_group=name_1)
name_2 = "privilege_group_2"
self.utility_wrap.create_privilege_group(privilege_group=name_2)
self.utility_wrap.drop_privilege_group(privilege_group=name_1)
self.utility_wrap.drop_privilege_group(privilege_group=name_2)
privilege_groups = self.utility_wrap.list_privilege_groups()[0]
privilege_groups_extracted = []
for privilege_group in privilege_groups.groups:
privilege_groups_extracted.append(privilege_group.privilege_group)
assert name_1 not in privilege_groups_extracted
assert name_2 not in privilege_groups_extracted
@pytest.mark.tags(CaseLabel.RBAC)
def test_add_privileges_to_group(self, host, port):
"""
target: add privilege group with invalid name
method: add privilege group with invalid name
expected: no exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
name_1 = "privilege_group_1"
name_2 = "privilege_group_2"
self.utility_wrap.create_privilege_group(privilege_group=name_1)
self.utility_wrap.add_privileges_to_group(privilege_group=name_1, privileges=["Insert"])
self.utility_wrap.add_privileges_to_group(privilege_group=name_1, privileges=["Insert"])
self.utility_wrap.create_privilege_group(privilege_group=name_2)
self.utility_wrap.add_privileges_to_group(privilege_group=name_2, privileges=["Insert"])
self.utility_wrap.add_privileges_to_group(privilege_group=name_2, privileges=["Insert"])
self.utility_wrap.add_privileges_to_group(privilege_group=name_1, privileges=["Search"])
self.utility_wrap.add_privileges_to_group(privilege_group=name_2, privileges=["Search"])
@pytest.mark.tags(CaseLabel.RBAC)
def test_remove_privileges_to_group(self, host, port):
"""
target: remove privilege group with invalid name
method: remove privilege group with invalid name
expected: no exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
name_1 = "privilege_group_1"
name_2 = "privilege_group_2"
self.utility_wrap.create_privilege_group(privilege_group=name_1)
self.utility_wrap.add_privileges_to_group(privilege_group=name_1, privileges=["Insert"])
self.utility_wrap.add_privileges_to_group(privilege_group=name_1, privileges=["Insert"])
self.utility_wrap.remove_privileges_from_group(privilege_group=name_1, privileges=["Insert"])
self.utility_wrap.remove_privileges_from_group(privilege_group=name_1, privileges=["Insert"])
self.utility_wrap.create_privilege_group(privilege_group=name_2)
self.utility_wrap.add_privileges_to_group(privilege_group=name_2, privileges=["Search"])
self.utility_wrap.remove_privileges_from_group(privilege_group=name_2, privileges=["Search"])
@pytest.mark.tags(CaseLabel.RBAC)
def test_list_built_in_privilege_groups(self, host, port):
"""
target: remove privilege group with invalid name
method: remove privilege group with invalid name
expected: no exception
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
res = self.utility_wrap.list_privilege_groups()[0]
for privilege_groups in res.groups:
if privilege_groups.privilege_group == "CollectionReadOnly":
assert privilege_groups.privileges == ('Query', 'Search', 'IndexDetail', 'GetFlushState', 'GetLoadState',
'GetLoadingProgress', 'HasPartition', 'ShowPartitions',
'DescribeCollection', 'DescribeAlias', 'GetStatistics', 'ListAliases')
elif privilege_groups.privilege_group == "CollectionReadWrite":
assert privilege_groups.privileges == ('Query', 'Search', 'IndexDetail', 'GetFlushState', 'GetLoadState',
'GetLoadingProgress', 'HasPartition', 'ShowPartitions',
'DescribeCollection', 'DescribeAlias', 'GetStatistics', 'ListAliases',
'Load', 'Release', 'Insert', 'Delete', 'Upsert', 'Import', 'Flush',
'Compaction', 'LoadBalance', 'CreateIndex', 'DropIndex',
'CreatePartition', 'DropPartition')
elif privilege_groups.privilege_group == "CollectionAdmin":
assert privilege_groups.privileges == ('Query', 'Search', 'IndexDetail', 'GetFlushState',
'GetLoadState', 'GetLoadingProgress', 'HasPartition',
'ShowPartitions', 'DescribeCollection', 'DescribeAlias',
'GetStatistics', 'ListAliases', 'Load', 'Release', 'Insert',
'Delete', 'Upsert', 'Import', 'Flush', 'Compaction', 'LoadBalance',
'CreateIndex', 'DropIndex', 'CreatePartition', 'DropPartition',
'CreateAlias', 'DropAlias')
elif privilege_groups.privilege_group == "DatabaseReadOnly":
assert privilege_groups.privileges == ('ShowCollections', 'DescribeDatabase')
elif privilege_groups.privilege_group == "DatabaseReadWrite":
assert privilege_groups.privileges == ('ShowCollections', 'DescribeDatabase', 'AlterDatabase')
elif privilege_groups.privilege_group == "DatabaseAdmin":
assert privilege_groups.privileges == ('ShowCollections', 'DescribeDatabase', 'AlterDatabase',
'CreateCollection', 'DropCollection')
elif privilege_groups.privilege_group == "ClusterReadOnly":
assert privilege_groups.privileges == ('ListDatabases', 'SelectOwnership', 'SelectUser', 'DescribeResourceGroup',
'ListResourceGroups', 'ListPrivilegeGroups')
elif privilege_groups.privilege_group == "ClusterReadWrite":
assert privilege_groups.privileges == ('ListDatabases', 'SelectOwnership', 'SelectUser', 'DescribeResourceGroup',
'ListResourceGroups', 'ListPrivilegeGroups', 'FlushAll',
'TransferNode', 'TransferReplica', 'UpdateResourceGroups')
elif privilege_groups.privilege_group == "ClusterAdmin":
assert privilege_groups.privileges == ('ListDatabases', 'SelectOwnership', 'SelectUser', 'DescribeResourceGroup',
'ListResourceGroups', 'ListPrivilegeGroups', 'FlushAll',
'TransferNode', 'TransferReplica', 'UpdateResourceGroups',
'BackupRBAC', 'RestoreRBAC', 'CreateDatabase', 'DropDatabase',
'CreateOwnership', 'DropOwnership', 'ManageOwnership',
'CreateResourceGroup', 'DropResourceGroup', 'UpdateUser',
'RenameCollection', 'CreatePrivilegeGroup', 'DropPrivilegeGroup',
'OperatePrivilegeGroup')
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_revoke_v2_normal(self, host, port):
"""
target: grant/revoke v2 normal case
method: grant/revoke v2 normal case
expected: grant successfully
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
collection_w = self.init_collection_general(prefix)[0]
role = self.utility_wrap.init_role("role_1")[0]
role.create()
name = "privilege_group_1"
self.utility_wrap.create_privilege_group(privilege_group=name)
self.utility_wrap.role_grant_v2(privilege=name, collection_name=collection_w.name)
self.utility_wrap.role_grant_v2(privilege=name, collection_name=collection_w.name)
res = self.utility_wrap.role_list_grants()[0]
is_exist = False
for group in res.groups:
if group.privilege == name:
assert group.object == "Global"
assert group.object_name == collection_w.name
assert group.db_name == "default"
assert group.role_name == "role_1"
assert group.grantor_name == "root"
is_exist = True
assert is_exist == True
self.utility_wrap.role_revoke_v2(privilege=name, collection_name=collection_w.name)
self.utility_wrap.role_revoke_v2(privilege=name, collection_name=collection_w.name)
res = self.utility_wrap.role_list_grants()[0]
not_exist = True
for group in res.groups:
if group.privilege == name:
not_exist = False
assert not_exist == True
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_revoke_v2_another_db(self, host, port):
"""
target: grant/revoke v2 normal case
method: grant/revoke v2 normal case
expected: grant successfully
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
new_db = "db_1"
self.database_wrap.create_database(new_db)
collection_w = self.init_collection_general(prefix)[0]
role = self.utility_wrap.init_role("role_1")[0]
role.create()
name = "privilege_group_1"
self.utility_wrap.create_privilege_group(privilege_group=name)
self.utility_wrap.role_grant_v2(privilege=name, collection_name=collection_w.name, db_name=new_db)
res = self.utility_wrap.role_list_grants(db_name=new_db)[0]
is_exist = False
for group in res.groups:
if group.privilege == name:
assert group.object == "Global"
assert group.object_name == collection_w.name
assert group.db_name == new_db
assert group.role_name == "role_1"
assert group.grantor_name == "root"
is_exist = True
assert is_exist == True
self.utility_wrap.role_revoke_v2(privilege=name, collection_name=collection_w.name, db_name=new_db)
res = self.utility_wrap.role_list_grants(db_name=new_db)[0]
not_exist = True
for group in res.groups:
if group.privilege == name:
not_exist = False
assert not_exist == True
res = self.utility_wrap.role_list_grants(db_name='default')[0]
not_exist = True
for group in res.groups:
if group.privilege == name:
not_exist = False
assert not_exist == True
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_revoke_v2_privilege(self, host, port):
"""
target: grant/revoke v2 normal case
method: grant/revoke v2 normal case
expected: grant successfully
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
collection_w = self.init_collection_general(prefix)[0]
role = self.utility_wrap.init_role("role_1")[0]
role.create()
collection_name = collection_w.name
db_name = "default"
for name in ct.built_in_privilege_groups:
if name.startswith("Database"):
collection_name = "*"
if name.startswith("Cluster"):
collection_name = "*"
db_name = "*"
self.utility_wrap.role_grant_v2(privilege=name, collection_name=collection_name, db_name=db_name)
res = self.utility_wrap.role_list_grants()[0]
is_exist = False
for group in res.groups:
if group.privilege == name:
assert group.object == "Global"
assert group.object_name == collection_name
assert group.db_name == db_name
assert group.role_name == "role_1"
assert group.grantor_name == "root"
is_exist = True
assert is_exist == True
self.utility_wrap.role_revoke_v2(privilege=name, collection_name=collection_name, db_name=db_name)
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_revoke_v2_built_in_privilege_group(self, host, port):
"""
target: grant/revoke v2 normal case
method: grant/revoke v2 normal case
expected: grant successfully
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
collection_w = self.init_collection_general(prefix)[0]
role = self.utility_wrap.init_role("role_1")[0]
role.create()
collection_name = collection_w.name
db_name = "default"
for name in ct.built_in_privilege_groups:
if name.startswith("Database"):
collection_name = "*"
if name.startswith("Cluster"):
collection_name = "*"
db_name = "*"
self.utility_wrap.role_grant_v2(privilege=name, collection_name=collection_name, db_name=db_name)
res = self.utility_wrap.role_list_grants()[0]
is_exist = False
for group in res.groups:
if group.privilege == name:
assert group.object == "Global"
assert group.object_name == collection_name
assert group.db_name == db_name
assert group.role_name == "role_1"
assert group.grantor_name == "root"
is_exist = True
assert is_exist == True
self.utility_wrap.role_revoke_v2(privilege=name, collection_name=collection_name, db_name=db_name)
@pytest.mark.tags(CaseLabel.RBAC)
def test_grant_revoke_v2_duplicate_privilege_and_privilege_group(self, host, port):
"""
target: grant/revoke v2 normal case
method: grant/revoke v2 normal case
expected: grant successfully
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
role = self.utility_wrap.init_role("role_1")[0]
role.create()
self.utility_wrap.role_grant_v2(privilege="CreatePrivilegeGroup", collection_name="*", db_name="*")
self.utility_wrap.role_grant_v2(privilege="ClusterAdmin", collection_name="*", db_name="*")
res = self.utility_wrap.role_list_grants()[0]
privilege_number = 0
for group in res.groups:
if group.privilege == "CreatePrivilegeGroup":
assert group.object == "Global"
assert group.object_name == "*"
assert group.db_name == "*"
assert group.role_name == "role_1"
assert group.grantor_name == "root"
privilege_number += 1
if group.privilege == "ClusterAdmin":
assert group.object == "Global"
assert group.object_name == "*"
assert group.db_name == "*"
assert group.role_name == "role_1"
assert group.grantor_name == "root"
privilege_number += 1
assert privilege_number == 2
new_user = "user1"
self.utility_wrap.create_user(user=new_user, password=ct.default_password)
self.utility_wrap.role_add_user(new_user)
self.connection_wrap.connect(host=host, port=port, user=new_user,
password=ct.default_password)
self.utility_wrap.create_privilege_group("privilege_group_1")
self.utility_wrap.drop_privilege_group("privilege_group_1")
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
self.utility_wrap.role_revoke_v2(privilege="CreatePrivilegeGroup", collection_name="*", db_name="*")
self.connection_wrap.connect(host=host, port=port, user=new_user,
password=ct.default_password)
self.utility_wrap.create_privilege_group("privilege_group_1")
self.utility_wrap.drop_privilege_group("privilege_group_1")
self.utility_wrap.role_revoke_v2(privilege="ClusterAdmin", collection_name="*", db_name="*")
@pytest.mark.tags(CaseLabel.RBAC)
@pytest.mark.parametrize("privilege_group_name", ct.built_in_privilege_groups)
def test_built_in_privilege_groups(self, host, port, privilege_group_name):
"""
target: test all the built in privilege groups
method: test all the built in privilege groups
expected: grant successfully
"""
self.connection_wrap.connect(host=host, port=port, user=ct.default_user,
password=ct.default_password, check_task=ct.CheckTasks.ccr)
collection_w = self.init_collection_general(prefix)[0]
partition_w = self.init_partition_wrap(collection_w, "_default")
collection_w_1 = self.init_collection_general(prefix+'new')[0]
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
role = self.utility_wrap.init_role("role_1")[0]
role.create()
collection_name = collection_w.name
db_name = "default"
if privilege_group_name.startswith("Database"):
collection_name = "*"
if privilege_group_name.startswith("Cluster"):
collection_name = "*"
db_name = "*"
self.utility_wrap.role_grant_v2(privilege=privilege_group_name, collection_name=collection_name, db_name=db_name)
new_user = "user1"
self.utility_wrap.create_user(user=new_user, password=ct.default_password)
new_user_1 = "user2"
self.utility_wrap.create_user(user=new_user_1, password=ct.default_password)
self.utility_wrap.role_add_user(new_user)
privilege_group_privilege_dict = copy.deepcopy(ct.privilege_group_privilege_dict)
res = self.utility_wrap.list_privilege_groups()[0]
for privilege_groups in res.groups:
if privilege_groups.privilege_group == privilege_group_name:
for single_privilege in privilege_groups.privileges:
if single_privilege in privilege_group_privilege_dict.keys():
privilege_group_privilege_dict[single_privilege] = True
break
# check all the granted privileges using api for the built in privilege group
self.connection_wrap.connect(host=host, port=port, user=new_user, password=ct.default_password)
# wait to make sure the grant takes effect
time.sleep(20)
# PrivilegeQuery
if privilege_group_privilege_dict["Query"]:
res = collection_w.query(expr="int64==0")[0]
else:
res = collection_w.query(expr="int64==0", check_task=CheckTasks.check_permission_deny)
# PrivilegeSearch
vectors = [[random.random() for _ in range(ct.default_dim)] for _ in range(1)]
if privilege_group_privilege_dict["Search"]:
collection_w.search(vectors, ct.default_float_vec_field_name, {}, 1,
check_task=CheckTasks.check_search_results,
check_items={
"nq": 1,
"limit": 0})
else:
collection_w.search(vectors, ct.default_float_vec_field_name, {}, 1,
check_task=CheckTasks.check_permission_deny)
# PrivilegeGetLoadState
if privilege_group_privilege_dict["GetLoadState"]:
res = self.utility_wrap.load_state(collection_name=collection_w.name)
res = self.utility_wrap.wait_for_loading_complete(collection_name=collection_w.name)
else:
res = self.utility_wrap.load_state(collection_name=collection_w.name,
check_task=CheckTasks.check_permission_deny)
res = self.utility_wrap.wait_for_loading_complete(collection_name=collection_w.name,
check_task=CheckTasks.check_permission_deny)
# PrivilegeGetLoadingProgress
if privilege_group_privilege_dict["GetLoadingProgress"]:
res = self.utility_wrap.loading_progress(collection_name=collection_w.name)
else:
res = self.utility_wrap.loading_progress(collection_name=collection_w.name,
check_task=CheckTasks.check_permission_deny)
# PrivilegeHasPartition
if privilege_group_privilege_dict["HasPartition"]:
res = collection_w.has_partition(partition_name="_default")[0]
assert res == True
res = partition_w.name
res = partition_w.is_empty_without_flush
res = partition_w.num_entities_without_flush
else:
res = collection_w.has_partition(partition_name="_default",
check_task=CheckTasks.check_permission_deny)
try:
partition_w = self.init_partition_wrap(collection_w, "_default")
res = partition_w.name
res = partition_w.is_empty_without_flush
res = partition_w.num_entities_without_flush
except Exception as e:
log.info(e)
# PrivilegeShowPartitions
if privilege_group_privilege_dict["ShowPartitions"]:
res = collection_w.partitions
else:
try:
res = collection_w.partitions
except Exception as e:
log.info(e)
# PrivilegeShowCollections
# if privilege_group_privilege_dict["ShowCollections"]:
res = self.utility_wrap.list_collections()[0]
assert len(res) == 0
# PrivilegeListAliases
res = self.utility_wrap.list_aliases(collection_name=collection_w.name)
# PrivilegeListDatabases
if privilege_group_privilege_dict["ListDatabases"]:
res = self.database_wrap.list_database()[0]
assert len(res) == 1
assert res[0] == "default"
else:
res = self.database_wrap.list_database()[0]
assert len(res) == 1
assert res[0] == "default"
# PrivilegeDescribeDatabase
if privilege_group_privilege_dict["DescribeDatabase"]:
res = self.database_wrap.describe_database(db_name="default")
else:
self.database_wrap.describe_database(db_name="default", check_task=CheckTasks.check_permission_deny)
# PrivilegeDescribeAlias
res = collection_w.aliases
# PrivilegeGetStatistics
if privilege_group_privilege_dict["GetStatistics"]:
res = collection_w.is_empty_without_flush
res = collection_w.num_entities_without_flush
else:
try:
res = collection_w.is_empty_without_flush
except Exception as e:
log.info(e)
try:
res = collection_w.num_entities_without_flush
except Exception as e:
log.info(e)
# PrivilegeDropIndex
if privilege_group_privilege_dict["DropIndex"]:
collection_w.release()
collection_w.drop_index(index_name=ct.default_float_vec_field_name)
else:
collection_w.drop_index(index_name=ct.default_float_vec_field_name, check_task=CheckTasks.check_permission_deny)
# PrivilegeCreateIndex
if privilege_group_privilege_dict["CreateIndex"]:
collection_w.create_index(field_name=ct.default_float_vec_field_name, index_name="index1")
collection_w.alter_index(index_name="index1", extra_params={})
collection_w.load()
else:
collection_w.create_index(field_name=ct.default_float_vec_field_name, check_task=CheckTasks.check_permission_deny)
collection_w.alter_index(index_name="index1", extra_params={}, check_task=CheckTasks.check_permission_deny)
# PrivilegeCreatePartition
if privilege_group_privilege_dict["CreatePartition"]:
collection_w.create_partition("partition")
else:
collection_w.create_partition("partition", check_task=CheckTasks.check_permission_deny)
# PrivilegeDropPartition
if privilege_group_privilege_dict["DropPartition"]:
collection_w.release()
collection_w.drop_partition("partition")
collection_w.load()
else:
collection_w.drop_partition("partition", check_task=CheckTasks.check_permission_deny)
# PrivilegeLoad
if privilege_group_privilege_dict["Load"]:
collection_w.load()
else:
collection_w.load(check_task=CheckTasks.check_permission_deny)
# PrivilegeRelease
if privilege_group_privilege_dict["Release"]:
collection_w.release()
else:
collection_w.release(check_task=CheckTasks.check_permission_deny)
# PrivilegeInsert
data = cf.gen_default_dataframe_data()
if privilege_group_privilege_dict["Insert"]:
collection_w.insert(data=data)
else:
collection_w.insert(data=data, check_task=CheckTasks.check_permission_deny)
# PrivilegeDelete
if privilege_group_privilege_dict["Delete"]:
collection_w.delete(expr="int64==10")
else:
collection_w.delete(expr="int64==10", check_task=CheckTasks.check_permission_deny)
# PrivilegeUpsert
if privilege_group_privilege_dict["Upsert"]:
collection_w.upsert(data=data)
else:
collection_w.upsert(data=data, check_task=CheckTasks.check_permission_deny)
# PrivilegeImport
if privilege_group_privilege_dict["Import"]:
self.utility_wrap.do_bulk_insert(collection_name=collection_w.name, files=[""],
check_task=CheckTasks.err_res,
check_items={"err_code": 2100,
"err_msg": "unexpected file type, files=[]: importing data failed"})
else:
self.utility_wrap.do_bulk_insert(collection_name=collection_w.name, files=[""],
check_task=CheckTasks.check_permission_deny)
# PrivilegeFlush
if privilege_group_privilege_dict["Flush"]:
collection_w.flush()
partition_w.flush()
else:
collection_w.flush(check_task=CheckTasks.check_permission_deny)
partition_w.flush(check_task=CheckTasks.check_permission_deny)
# PrivilegeCompaction
if privilege_group_privilege_dict["Compaction"]:
collection_w.compact()
else:
collection_w.compact(check_task=CheckTasks.check_permission_deny)
if privilege_group_privilege_dict["LoadBalance"]:
collection_w.load()
self.utility_wrap.load_balance(collection_name=collection_w.name, src_node_id=1,
dst_node_ids=[2, 3], sealed_segment_ids=[2, 3],
check_task=CheckTasks.err_res,
check_items={"err_code": 901,
"err_msg": "destination node not found in "
"the same replica: node not found[node=2]"})
else:
self.utility_wrap.load_balance(collection_name=collection_w.name, src_node_id=1,
dst_node_ids=[2, 3], sealed_segment_ids=[2, 3],
check_task=CheckTasks.check_permission_deny)
# PrivilegeRenameCollection
if privilege_group_privilege_dict["RenameCollection"]:
self.utility_wrap.rename_collection(old_collection_name=collection_w.name,
new_collection_name=collection_w.name+'new')
self.utility_wrap.rename_collection(old_collection_name=collection_w.name+'new',
new_collection_name=collection_w.name)
else:
self.utility_wrap.rename_collection(old_collection_name=collection_w.name,
new_collection_name=collection_w.name,
check_task=CheckTasks.check_permission_deny)
# PrivilegeCreateAlias
if privilege_group_privilege_dict["CreateAlias"]:
self.utility_wrap.create_alias(collection_name=collection_w.name, alias="alias")
self.utility_wrap.alter_alias(collection_name=collection_w.name, alias="alias")
else:
self.utility_wrap.create_alias(collection_name=collection_w.name, alias="alias",
check_task=CheckTasks.check_permission_deny)
self.utility_wrap.alter_alias(collection_name=collection_w.name, alias="alias",
check_task=CheckTasks.check_permission_deny)
# PrivilegeDropAlias
if privilege_group_privilege_dict["DropAlias"]:
self.utility_wrap.drop_alias(alias="alias")
else:
self.utility_wrap.drop_alias(alias="alias", check_task=CheckTasks.check_permission_deny)
# PrivilegeCreateCollection
if privilege_group_privilege_dict["CreateCollection"]:
collection_w_new = self.init_collection_general(prefix, is_index=False, is_flush=False)[0]
collection_w_new.set_properties({"collection.ttl.seconds": 60})
else:
try:
self.init_collection_general(prefix)[0]
except Exception as e:
log.info(e)
collection_w.set_properties({"collection.ttl.seconds": 60}, check_task=CheckTasks.check_permission_deny)
# PrivilegeDropCollection
if privilege_group_privilege_dict["DropCollection"]:
collection_w_new.drop()
self.utility_wrap.drop_collection(collection_w_new.name)
else:
collection_w.drop(check_task=CheckTasks.check_permission_deny)
self.utility_wrap.drop_collection(collection_w.name, check_task=CheckTasks.check_permission_deny)
# PrivilegeCreateOwnership
if privilege_group_privilege_dict["CreateOwnership"]:
role_1 = self.utility_wrap.init_role("role_new")[0]
role_1.create()
self.utility_wrap.create_user(user="user3", password=ct.default_password)
else:
role_1 = self.utility_wrap.init_role("role_new")[0]
self.utility_wrap.create_role(check_task=CheckTasks.check_permission_deny)
self.utility_wrap.create_user(user="user3", password=ct.default_password,
check_task=CheckTasks.check_permission_deny)
# PrivilegeDropOwnership
if privilege_group_privilege_dict["DropOwnership"]:
role_1 = self.utility_wrap.init_role("role_new")[0]
role_1.drop()
self.utility_wrap.delete_user("user3")
else:
role_1 = self.utility_wrap.init_role("role_new")[0]
self.utility_wrap.role_drop(check_task=CheckTasks.check_permission_deny)
self.utility_wrap.delete_user("user3", check_task=CheckTasks.check_permission_deny)
# PrivilegeSelectOwnership
role = self.utility_wrap.init_role("role_1")[0]
if privilege_group_privilege_dict["SelectOwnership"]:
self.utility_wrap.role_get_users()
self.utility_wrap.role_is_exist()
self.utility_wrap.list_usernames()
self.utility_wrap.list_roles(True)
res = self.utility_wrap.role_list_grants()[0]
else:
self.utility_wrap.role_get_users(check_task=CheckTasks.check_permission_deny)
self.utility_wrap.role_is_exist(check_task=CheckTasks.check_permission_deny)
self.utility_wrap.list_usernames(check_task=CheckTasks.check_permission_deny)
self.utility_wrap.list_roles(True, check_task=CheckTasks.check_permission_deny)
# default own the privilege of the role itself
res = self.utility_wrap.role_list_grants()[0]
# PrivilegeManageOwnership
if privilege_group_privilege_dict["ManageOwnership"]:
role = self.utility_wrap.init_role("role_1")[0]
self.utility_wrap.role_grant(object="Collection", object_name=collection_w.name, privilege="Insert")
self.utility_wrap.role_revoke(object="Collection", object_name=collection_w.name, privilege="Insert")
self.utility_wrap.role_grant_v2(privilege="Insert", collection_name=collection_w.name)
self.utility_wrap.role_revoke_v2(privilege="Insert", collection_name=collection_w.name)
self.utility_wrap.create_user("user3", "Milvus")
self.utility_wrap.role_add_user(username="user3")
self.utility_wrap.role_remove_user(username="user3")
else:
role = self.utility_wrap.init_role("role_1")[0]
self.utility_wrap.role_grant(object="Collection", object_name=collection_w.name, privilege="Insert",
check_task=CheckTasks.check_permission_deny)
self.utility_wrap.role_revoke(object="Collection", object_name=collection_w.name, privilege="Insert",
check_task=CheckTasks.check_permission_deny)
self.utility_wrap.role_grant_v2(privilege="Insert", collection_name=collection_w.name,
check_task=CheckTasks.check_permission_deny)
self.utility_wrap.role_revoke_v2(privilege="Insert", collection_name=collection_w.name,
check_task=CheckTasks.check_permission_deny)
self.utility_wrap.role_add_user(username="user3",
check_task=CheckTasks.check_permission_deny)
self.utility_wrap.role_remove_user(username="user3",
check_task=CheckTasks.check_permission_deny)
# PrivilegeUpdateUser
if privilege_group_privilege_dict["UpdateUser"]:
self.utility_wrap.reset_password(user=new_user_1, old_password=ct.default_password,
new_password=ct.default_password)
self.utility_wrap.update_password(user=new_user_1, old_password=ct.default_password,
new_password=ct.default_password)
else:
self.utility_wrap.reset_password(user=new_user_1, old_password=ct.default_password,
new_password=ct.default_password,
check_task=CheckTasks.check_permission_deny)
self.utility_wrap.update_password(user=new_user_1, old_password=ct.default_password,
new_password=ct.default_password,
check_task=CheckTasks.check_permission_deny)
# PrivilegeSelectUser
self.connection_wrap.connect(host=host, port=port, user=new_user, password=ct.default_password)
if privilege_group_privilege_dict["SelectUser"]:
self.utility_wrap.list_user(username="user1", include_role_info=True)
res = self.utility_wrap.list_users(include_role_info=True)[0]
assert len(res.groups) > 1
else:
res = self.utility_wrap.list_user(username="user1", include_role_info=True)[0]
assert len(res.groups) == 1
self.utility_wrap.list_users(include_role_info=True, check_task=CheckTasks.check_permission_deny)[0]
# PrivilegeCreateResourceGroup
if privilege_group_privilege_dict["CreateResourceGroup"]:
self.utility_wrap.create_resource_group(name="rg1")
else:
self.utility_wrap.create_resource_group(name="rg1", check_task=CheckTasks.check_permission_deny)
# PrivilegeDropResourceGroup
if privilege_group_privilege_dict["DropResourceGroup"]:
self.utility_wrap.drop_resource_group(name="rg1")
else:
self.utility_wrap.drop_resource_group(name="rg1", check_task=CheckTasks.check_permission_deny)
# PrivilegeUpdateResourceGroups
if privilege_group_privilege_dict["UpdateResourceGroups"]:
self.utility_wrap.update_resource_group(config={"resource_group_1": ResourceGroupConfig(requests={"node_num": 1})},
check_task=CheckTasks.err_res,
check_items={"err_code": 300,
"err_msg": "resource group not found[rg=resource_group_1]"})
else:
log.info("binbin6")
self.utility_wrap.update_resource_group(config={"resource_group_1": ResourceGroupConfig(requests={"node_num": 1})},
check_task=CheckTasks.check_permission_deny)
# PrivilegeDescribeResourceGroup
if privilege_group_privilege_dict["DescribeResourceGroup"]:
self.utility_wrap.describe_resource_group(name="rg1",
check_task=CheckTasks.err_res,
check_items={"err_code": 300,
"err_msg": "resource group not found[rg=rg1]"})
else:
self.utility_wrap.describe_resource_group(name="rg1", check_task=CheckTasks.check_permission_deny)
# PrivilegeListResourceGroups
if privilege_group_privilege_dict["ListResourceGroups"]:
self.utility_wrap.list_resource_groups()
else:
self.utility_wrap.list_resource_groups(check_task=CheckTasks.check_permission_deny)
# PrivilegeTransferNode
if privilege_group_privilege_dict["TransferNode"]:
self.utility_wrap.transfer_node(source="source", target="target", num_node=2,
check_task=CheckTasks.err_res,
check_items={"err_code": 300,
"err_msg": "resource group not found[rg=source]"}
)
else:
self.utility_wrap.transfer_node(source="source", target="target",
num_node=2, check_task=CheckTasks.check_permission_deny)
# PrivilegeTransferReplica
if privilege_group_privilege_dict["TransferReplica"]:
self.utility_wrap.transfer_replica(source="source", target="target",
collection_name=collection_w.name, num_replica=2,
check_task=CheckTasks.err_res,
check_items={"err_code": 300,
"err_msg": "resource group not found[rg=source]"})
else:
self.utility_wrap.transfer_replica(source="source", target="target",
collection_name=collection_w.name, num_replica=2,
check_task=CheckTasks.check_permission_deny)
# PrivilegeCreateDatabase
if privilege_group_privilege_dict["CreateDatabase"]:
self.database_wrap.create_database(db_name="new_db")
else:
self.database_wrap.create_database(db_name="new_db", check_task=CheckTasks.check_permission_deny)
# PrivilegeDropDatabase
if privilege_group_privilege_dict["DropDatabase"]:
self.database_wrap.drop_database(db_name="new_db")
else:
self.database_wrap.drop_database(db_name="default", check_task=CheckTasks.check_permission_deny)
# PrivilegeAlterDatabase
if privilege_group_privilege_dict["AlterDatabase"]:
self.database_wrap.set_properties(db_name="default", properties={},
check_task=CheckTasks.err_res,
check_items={"err_code": 65535,
"err_msg": "alter database requires either properties "
"or deletekeys to modify or delete keys, "
"both cannot be empty"})
else:
self.database_wrap.set_properties(db_name="default", properties={}, check_task=CheckTasks.check_permission_deny)
# PrivilegeFlushAll
if privilege_group_privilege_dict["FlushAll"]:
self.utility_wrap.flush_all()
else:
self.utility_wrap.flush_all(check_task=CheckTasks.check_permission_deny)
# PrivilegeListPrivilegeGroups
if privilege_group_privilege_dict["ListPrivilegeGroups"]:
self.utility_wrap.list_privilege_groups()
else:
self.utility_wrap.list_privilege_groups(check_task=CheckTasks.check_permission_deny)
# PrivilegeCreatePrivilegeGroup
if privilege_group_privilege_dict["CreatePrivilegeGroup"]:
self.utility_wrap.create_privilege_group("privilege_group_1")
else:
self.utility_wrap.create_privilege_group("privilege_group_1", check_task=CheckTasks.check_permission_deny)
# PrivilegeOperatePrivilegeGroup
if privilege_group_privilege_dict["OperatePrivilegeGroup"]:
self.utility_wrap.add_privileges_to_group("privilege_group_1", ["Insert"])
self.utility_wrap.remove_privileges_from_group("privilege_group_1", ["Insert"])
else:
self.utility_wrap.add_privileges_to_group("privilege_group_1", ["Insert"],
check_task=CheckTasks.check_permission_deny)
self.utility_wrap.remove_privileges_from_group("privilege_group_1", ["Insert"],
check_task=CheckTasks.check_permission_deny)
# PrivilegeDropPrivilegeGroup
if privilege_group_privilege_dict["DropPrivilegeGroup"]:
self.utility_wrap.drop_privilege_group("privilege_group_1")
else:
self.utility_wrap.drop_privilege_group("privilege_group_1", check_task=CheckTasks.check_permission_deny)