101 lines
3.1 KiB
Python
101 lines
3.1 KiB
Python
"""Entity permissions."""
|
|
from __future__ import annotations
|
|
|
|
from collections import OrderedDict
|
|
from collections.abc import Callable
|
|
|
|
import voluptuous as vol
|
|
|
|
from .const import POLICY_CONTROL, POLICY_EDIT, POLICY_READ, SUBCAT_ALL
|
|
from .models import PermissionLookup
|
|
from .types import CategoryType, SubCategoryDict, ValueType
|
|
from .util import SubCatLookupType, compile_policy, lookup_all
|
|
|
|
SINGLE_ENTITY_SCHEMA = vol.Any(
|
|
True,
|
|
vol.Schema(
|
|
{
|
|
vol.Optional(POLICY_READ): True,
|
|
vol.Optional(POLICY_CONTROL): True,
|
|
vol.Optional(POLICY_EDIT): True,
|
|
}
|
|
),
|
|
)
|
|
|
|
ENTITY_DOMAINS = "domains"
|
|
ENTITY_AREAS = "area_ids"
|
|
ENTITY_DEVICE_IDS = "device_ids"
|
|
ENTITY_ENTITY_IDS = "entity_ids"
|
|
|
|
ENTITY_VALUES_SCHEMA = vol.Any(True, vol.Schema({str: SINGLE_ENTITY_SCHEMA}))
|
|
|
|
ENTITY_POLICY_SCHEMA = vol.Any(
|
|
True,
|
|
vol.Schema(
|
|
{
|
|
vol.Optional(SUBCAT_ALL): SINGLE_ENTITY_SCHEMA,
|
|
vol.Optional(ENTITY_AREAS): ENTITY_VALUES_SCHEMA,
|
|
vol.Optional(ENTITY_DEVICE_IDS): ENTITY_VALUES_SCHEMA,
|
|
vol.Optional(ENTITY_DOMAINS): ENTITY_VALUES_SCHEMA,
|
|
vol.Optional(ENTITY_ENTITY_IDS): ENTITY_VALUES_SCHEMA,
|
|
}
|
|
),
|
|
)
|
|
|
|
|
|
def _lookup_domain(
|
|
perm_lookup: PermissionLookup, domains_dict: SubCategoryDict, entity_id: str
|
|
) -> ValueType | None:
|
|
"""Look up entity permissions by domain."""
|
|
return domains_dict.get(entity_id.split(".", 1)[0])
|
|
|
|
|
|
def _lookup_area(
|
|
perm_lookup: PermissionLookup, area_dict: SubCategoryDict, entity_id: str
|
|
) -> ValueType | None:
|
|
"""Look up entity permissions by area."""
|
|
entity_entry = perm_lookup.entity_registry.async_get(entity_id)
|
|
|
|
if entity_entry is None or entity_entry.device_id is None:
|
|
return None
|
|
|
|
device_entry = perm_lookup.device_registry.async_get(entity_entry.device_id)
|
|
|
|
if device_entry is None or device_entry.area_id is None:
|
|
return None
|
|
|
|
return area_dict.get(device_entry.area_id)
|
|
|
|
|
|
def _lookup_device(
|
|
perm_lookup: PermissionLookup, devices_dict: SubCategoryDict, entity_id: str
|
|
) -> ValueType | None:
|
|
"""Look up entity permissions by device."""
|
|
entity_entry = perm_lookup.entity_registry.async_get(entity_id)
|
|
|
|
if entity_entry is None or entity_entry.device_id is None:
|
|
return None
|
|
|
|
return devices_dict.get(entity_entry.device_id)
|
|
|
|
|
|
def _lookup_entity_id(
|
|
perm_lookup: PermissionLookup, entities_dict: SubCategoryDict, entity_id: str
|
|
) -> ValueType | None:
|
|
"""Look up entity permission by entity id."""
|
|
return entities_dict.get(entity_id)
|
|
|
|
|
|
def compile_entities(
|
|
policy: CategoryType, perm_lookup: PermissionLookup
|
|
) -> Callable[[str, str], bool]:
|
|
"""Compile policy into a function that tests policy."""
|
|
subcategories: SubCatLookupType = OrderedDict()
|
|
subcategories[ENTITY_ENTITY_IDS] = _lookup_entity_id
|
|
subcategories[ENTITY_DEVICE_IDS] = _lookup_device
|
|
subcategories[ENTITY_AREAS] = _lookup_area
|
|
subcategories[ENTITY_DOMAINS] = _lookup_domain
|
|
subcategories[SUBCAT_ALL] = lookup_all
|
|
|
|
return compile_policy(policy, subcategories, perm_lookup)
|