2018-10-29 10:28:04 +00:00
|
|
|
"""Entity permissions."""
|
2019-03-11 18:02:37 +00:00
|
|
|
from collections import OrderedDict
|
|
|
|
from typing import Callable, Optional # noqa: F401
|
2018-10-29 10:28:04 +00:00
|
|
|
|
|
|
|
import voluptuous as vol
|
|
|
|
|
2018-11-08 11:57:00 +00:00
|
|
|
from .const import SUBCAT_ALL, POLICY_READ, POLICY_CONTROL, POLICY_EDIT
|
2018-12-05 10:41:00 +00:00
|
|
|
from .models import PermissionLookup
|
2019-03-11 18:02:37 +00:00
|
|
|
from .types import CategoryType, SubCategoryDict, ValueType
|
2019-07-31 19:25:30 +00:00
|
|
|
|
2019-03-11 18:02:37 +00:00
|
|
|
# pylint: disable=unused-import
|
|
|
|
from .util import SubCatLookupType, lookup_all, compile_policy # noqa
|
2018-10-29 10:28:04 +00:00
|
|
|
|
2019-07-31 19:25:30 +00:00
|
|
|
SINGLE_ENTITY_SCHEMA = vol.Any(
|
|
|
|
True,
|
|
|
|
vol.Schema(
|
|
|
|
{
|
|
|
|
vol.Optional(POLICY_READ): True,
|
|
|
|
vol.Optional(POLICY_CONTROL): True,
|
|
|
|
vol.Optional(POLICY_EDIT): True,
|
|
|
|
}
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
ENTITY_DOMAINS = "domains"
|
|
|
|
ENTITY_AREAS = "area_ids"
|
|
|
|
ENTITY_DEVICE_IDS = "device_ids"
|
|
|
|
ENTITY_ENTITY_IDS = "entity_ids"
|
|
|
|
|
|
|
|
ENTITY_VALUES_SCHEMA = vol.Any(True, vol.Schema({str: SINGLE_ENTITY_SCHEMA}))
|
|
|
|
|
|
|
|
ENTITY_POLICY_SCHEMA = vol.Any(
|
|
|
|
True,
|
|
|
|
vol.Schema(
|
|
|
|
{
|
|
|
|
vol.Optional(SUBCAT_ALL): SINGLE_ENTITY_SCHEMA,
|
|
|
|
vol.Optional(ENTITY_AREAS): ENTITY_VALUES_SCHEMA,
|
|
|
|
vol.Optional(ENTITY_DEVICE_IDS): ENTITY_VALUES_SCHEMA,
|
|
|
|
vol.Optional(ENTITY_DOMAINS): ENTITY_VALUES_SCHEMA,
|
|
|
|
vol.Optional(ENTITY_ENTITY_IDS): ENTITY_VALUES_SCHEMA,
|
|
|
|
}
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def _lookup_domain(
|
|
|
|
perm_lookup: PermissionLookup, domains_dict: SubCategoryDict, entity_id: str
|
|
|
|
) -> Optional[ValueType]:
|
2019-03-11 18:02:37 +00:00
|
|
|
"""Look up entity permissions by domain."""
|
|
|
|
return domains_dict.get(entity_id.split(".", 1)[0])
|
|
|
|
|
|
|
|
|
2019-07-31 19:25:30 +00:00
|
|
|
def _lookup_area(
|
|
|
|
perm_lookup: PermissionLookup, area_dict: SubCategoryDict, entity_id: str
|
|
|
|
) -> Optional[ValueType]:
|
2019-03-11 18:02:37 +00:00
|
|
|
"""Look up entity permissions by area."""
|
|
|
|
entity_entry = perm_lookup.entity_registry.async_get(entity_id)
|
|
|
|
|
|
|
|
if entity_entry is None or entity_entry.device_id is None:
|
|
|
|
return None
|
|
|
|
|
2019-07-31 19:25:30 +00:00
|
|
|
device_entry = perm_lookup.device_registry.async_get(entity_entry.device_id)
|
2019-03-11 18:02:37 +00:00
|
|
|
|
|
|
|
if device_entry is None or device_entry.area_id is None:
|
|
|
|
return None
|
|
|
|
|
|
|
|
return area_dict.get(device_entry.area_id)
|
|
|
|
|
|
|
|
|
2019-07-31 19:25:30 +00:00
|
|
|
def _lookup_device(
|
|
|
|
perm_lookup: PermissionLookup, devices_dict: SubCategoryDict, entity_id: str
|
|
|
|
) -> Optional[ValueType]:
|
2019-03-11 18:02:37 +00:00
|
|
|
"""Look up entity permissions by device."""
|
|
|
|
entity_entry = perm_lookup.entity_registry.async_get(entity_id)
|
|
|
|
|
|
|
|
if entity_entry is None or entity_entry.device_id is None:
|
|
|
|
return None
|
|
|
|
|
|
|
|
return devices_dict.get(entity_entry.device_id)
|
|
|
|
|
|
|
|
|
2019-07-31 19:25:30 +00:00
|
|
|
def _lookup_entity_id(
|
|
|
|
perm_lookup: PermissionLookup, entities_dict: SubCategoryDict, entity_id: str
|
|
|
|
) -> Optional[ValueType]:
|
2019-03-11 18:02:37 +00:00
|
|
|
"""Look up entity permission by entity id."""
|
|
|
|
return entities_dict.get(entity_id)
|
2018-10-29 10:28:04 +00:00
|
|
|
|
|
|
|
|
2019-07-31 19:25:30 +00:00
|
|
|
def compile_entities(
|
|
|
|
policy: CategoryType, perm_lookup: PermissionLookup
|
|
|
|
) -> Callable[[str, str], bool]:
|
2018-10-29 10:28:04 +00:00
|
|
|
"""Compile policy into a function that tests policy."""
|
2019-03-11 18:02:37 +00:00
|
|
|
subcategories = OrderedDict() # type: SubCatLookupType
|
|
|
|
subcategories[ENTITY_ENTITY_IDS] = _lookup_entity_id
|
|
|
|
subcategories[ENTITY_DEVICE_IDS] = _lookup_device
|
|
|
|
subcategories[ENTITY_AREAS] = _lookup_area
|
|
|
|
subcategories[ENTITY_DOMAINS] = _lookup_domain
|
|
|
|
subcategories[SUBCAT_ALL] = lookup_all
|
|
|
|
|
|
|
|
return compile_policy(policy, subcategories, perm_lookup)
|