parent
14b05b0a91
commit
5b2c6648fb
|
@ -11,13 +11,14 @@ from homeassistant.core import HomeAssistant, callback
|
|||
from homeassistant.util import dt as dt_util
|
||||
|
||||
from . import models
|
||||
from .const import GROUP_ID_ADMIN, GROUP_ID_READ_ONLY
|
||||
from .const import GROUP_ID_ADMIN, GROUP_ID_USER, GROUP_ID_READ_ONLY
|
||||
from .permissions import PermissionLookup, system_policies
|
||||
from .permissions.types import PolicyType # noqa: F401
|
||||
|
||||
STORAGE_VERSION = 1
|
||||
STORAGE_KEY = 'auth'
|
||||
GROUP_NAME_ADMIN = 'Administrators'
|
||||
GROUP_NAME_USER = "Users"
|
||||
GROUP_NAME_READ_ONLY = 'Read Only'
|
||||
|
||||
|
||||
|
@ -305,6 +306,7 @@ class AuthStore:
|
|||
# 1. Data from a recent version which has a single group without policy
|
||||
# 2. Data from old version which has no groups
|
||||
has_admin_group = False
|
||||
has_user_group = False
|
||||
has_read_only_group = False
|
||||
group_without_policy = None
|
||||
|
||||
|
@ -322,6 +324,13 @@ class AuthStore:
|
|||
policy = system_policies.ADMIN_POLICY
|
||||
system_generated = True
|
||||
|
||||
elif group_dict['id'] == GROUP_ID_USER:
|
||||
has_user_group = True
|
||||
|
||||
name = GROUP_NAME_USER
|
||||
policy = system_policies.USER_POLICY
|
||||
system_generated = True
|
||||
|
||||
elif group_dict['id'] == GROUP_ID_READ_ONLY:
|
||||
has_read_only_group = True
|
||||
|
||||
|
@ -369,6 +378,10 @@ class AuthStore:
|
|||
read_only_group = _system_read_only_group()
|
||||
groups[read_only_group.id] = read_only_group
|
||||
|
||||
if not has_user_group:
|
||||
user_group = _system_user_group()
|
||||
groups[user_group.id] = user_group
|
||||
|
||||
for user_dict in data['users']:
|
||||
# Collect the users group.
|
||||
user_groups = []
|
||||
|
@ -483,7 +496,7 @@ class AuthStore:
|
|||
'name': group.name
|
||||
} # type: Dict[str, Any]
|
||||
|
||||
if group.id not in (GROUP_ID_READ_ONLY, GROUP_ID_ADMIN):
|
||||
if not group.system_generated:
|
||||
g_dict['policy'] = group.policy
|
||||
|
||||
groups.append(g_dict)
|
||||
|
@ -536,6 +549,8 @@ class AuthStore:
|
|||
groups = OrderedDict() # type: Dict[str, models.Group]
|
||||
admin_group = _system_admin_group()
|
||||
groups[admin_group.id] = admin_group
|
||||
user_group = _system_user_group()
|
||||
groups[user_group.id] = user_group
|
||||
read_only_group = _system_read_only_group()
|
||||
groups[read_only_group.id] = read_only_group
|
||||
self._groups = groups
|
||||
|
@ -551,6 +566,16 @@ def _system_admin_group() -> models.Group:
|
|||
)
|
||||
|
||||
|
||||
def _system_user_group() -> models.Group:
|
||||
"""Create system user group."""
|
||||
return models.Group(
|
||||
name=GROUP_NAME_USER,
|
||||
id=GROUP_ID_USER,
|
||||
policy=system_policies.USER_POLICY,
|
||||
system_generated=True,
|
||||
)
|
||||
|
||||
|
||||
def _system_read_only_group() -> models.Group:
|
||||
"""Create read only group."""
|
||||
return models.Group(
|
||||
|
|
|
@ -5,4 +5,5 @@ ACCESS_TOKEN_EXPIRATION = timedelta(minutes=30)
|
|||
MFA_SESSION_EXPIRATION = timedelta(minutes=5)
|
||||
|
||||
GROUP_ID_ADMIN = 'system-admin'
|
||||
GROUP_ID_USER = 'system-users'
|
||||
GROUP_ID_READ_ONLY = 'system-read-only'
|
||||
|
|
|
@ -5,6 +5,10 @@ ADMIN_POLICY = {
|
|||
CAT_ENTITIES: True,
|
||||
}
|
||||
|
||||
USER_POLICY = {
|
||||
CAT_ENTITIES: True,
|
||||
}
|
||||
|
||||
READ_ONLY_POLICY = {
|
||||
CAT_ENTITIES: {
|
||||
SUBCAT_ALL: {
|
||||
|
|
|
@ -14,6 +14,17 @@ def test_admin_policy():
|
|||
assert perms.check_entity('light.kitchen', 'edit')
|
||||
|
||||
|
||||
def test_user_policy():
|
||||
"""Test user policy works."""
|
||||
# Make sure it's valid
|
||||
POLICY_SCHEMA(system_policies.USER_POLICY)
|
||||
|
||||
perms = PolicyPermissions(system_policies.USER_POLICY, None)
|
||||
assert perms.check_entity('light.kitchen', 'read')
|
||||
assert perms.check_entity('light.kitchen', 'control')
|
||||
assert perms.check_entity('light.kitchen', 'edit')
|
||||
|
||||
|
||||
def test_read_only_policy():
|
||||
"""Test read only policy works."""
|
||||
# Make sure it's valid
|
||||
|
|
|
@ -64,7 +64,7 @@ async def test_loading_no_group_data_format(hass, hass_storage):
|
|||
|
||||
store = auth_store.AuthStore(hass)
|
||||
groups = await store.async_get_groups()
|
||||
assert len(groups) == 2
|
||||
assert len(groups) == 3
|
||||
admin_group = groups[0]
|
||||
assert admin_group.name == auth_store.GROUP_NAME_ADMIN
|
||||
assert admin_group.system_generated
|
||||
|
@ -73,6 +73,10 @@ async def test_loading_no_group_data_format(hass, hass_storage):
|
|||
assert read_group.name == auth_store.GROUP_NAME_READ_ONLY
|
||||
assert read_group.system_generated
|
||||
assert read_group.id == auth_store.GROUP_ID_READ_ONLY
|
||||
user_group = groups[2]
|
||||
assert user_group.name == auth_store.GROUP_NAME_USER
|
||||
assert user_group.system_generated
|
||||
assert user_group.id == auth_store.GROUP_ID_USER
|
||||
|
||||
users = await store.async_get_users()
|
||||
assert len(users) == 2
|
||||
|
@ -157,7 +161,7 @@ async def test_loading_all_access_group_data_format(hass, hass_storage):
|
|||
|
||||
store = auth_store.AuthStore(hass)
|
||||
groups = await store.async_get_groups()
|
||||
assert len(groups) == 2
|
||||
assert len(groups) == 3
|
||||
admin_group = groups[0]
|
||||
assert admin_group.name == auth_store.GROUP_NAME_ADMIN
|
||||
assert admin_group.system_generated
|
||||
|
@ -166,6 +170,10 @@ async def test_loading_all_access_group_data_format(hass, hass_storage):
|
|||
assert read_group.name == auth_store.GROUP_NAME_READ_ONLY
|
||||
assert read_group.system_generated
|
||||
assert read_group.id == auth_store.GROUP_ID_READ_ONLY
|
||||
user_group = groups[2]
|
||||
assert user_group.name == auth_store.GROUP_NAME_USER
|
||||
assert user_group.system_generated
|
||||
assert user_group.id == auth_store.GROUP_ID_USER
|
||||
|
||||
users = await store.async_get_users()
|
||||
assert len(users) == 2
|
||||
|
@ -189,12 +197,16 @@ async def test_loading_empty_data(hass, hass_storage):
|
|||
"""Test we correctly load with no existing data."""
|
||||
store = auth_store.AuthStore(hass)
|
||||
groups = await store.async_get_groups()
|
||||
assert len(groups) == 2
|
||||
assert len(groups) == 3
|
||||
admin_group = groups[0]
|
||||
assert admin_group.name == auth_store.GROUP_NAME_ADMIN
|
||||
assert admin_group.system_generated
|
||||
assert admin_group.id == auth_store.GROUP_ID_ADMIN
|
||||
read_group = groups[1]
|
||||
user_group = groups[1]
|
||||
assert user_group.name == auth_store.GROUP_NAME_USER
|
||||
assert user_group.system_generated
|
||||
assert user_group.id == auth_store.GROUP_ID_USER
|
||||
read_group = groups[2]
|
||||
assert read_group.name == auth_store.GROUP_NAME_READ_ONLY
|
||||
assert read_group.system_generated
|
||||
assert read_group.id == auth_store.GROUP_ID_READ_ONLY
|
||||
|
@ -217,6 +229,10 @@ async def test_system_groups_store_id_and_name(hass, hass_storage):
|
|||
'id': auth_store.GROUP_ID_ADMIN,
|
||||
'name': auth_store.GROUP_NAME_ADMIN,
|
||||
},
|
||||
{
|
||||
'id': auth_store.GROUP_ID_USER,
|
||||
'name': auth_store.GROUP_NAME_USER,
|
||||
},
|
||||
{
|
||||
'id': auth_store.GROUP_ID_READ_ONLY,
|
||||
'name': auth_store.GROUP_NAME_READ_ONLY,
|
||||
|
|
Loading…
Reference in New Issue