427 lines
13 KiB
Python
427 lines
13 KiB
Python
"""Support for locks which integrates with other components."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Generator, Sequence
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.components.lock import (
|
|
DOMAIN as LOCK_DOMAIN,
|
|
ENTITY_ID_FORMAT,
|
|
PLATFORM_SCHEMA as LOCK_PLATFORM_SCHEMA,
|
|
LockEntity,
|
|
LockEntityFeature,
|
|
LockState,
|
|
)
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import (
|
|
ATTR_CODE,
|
|
CONF_NAME,
|
|
CONF_OPTIMISTIC,
|
|
CONF_STATE,
|
|
CONF_UNIQUE_ID,
|
|
CONF_VALUE_TEMPLATE,
|
|
)
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.exceptions import ServiceValidationError, TemplateError
|
|
from homeassistant.helpers import config_validation as cv, template
|
|
from homeassistant.helpers.entity_platform import (
|
|
AddConfigEntryEntitiesCallback,
|
|
AddEntitiesCallback,
|
|
)
|
|
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
|
|
|
from .const import DOMAIN
|
|
from .coordinator import TriggerUpdateCoordinator
|
|
from .entity import AbstractTemplateEntity
|
|
from .helpers import (
|
|
async_setup_template_entry,
|
|
async_setup_template_platform,
|
|
async_setup_template_preview,
|
|
)
|
|
from .template_entity import (
|
|
TEMPLATE_ENTITY_AVAILABILITY_SCHEMA_LEGACY,
|
|
TEMPLATE_ENTITY_COMMON_CONFIG_ENTRY_SCHEMA,
|
|
TEMPLATE_ENTITY_OPTIMISTIC_SCHEMA,
|
|
TemplateEntity,
|
|
make_template_entity_common_modern_schema,
|
|
)
|
|
from .trigger_entity import TriggerEntity
|
|
|
|
CONF_CODE_FORMAT_TEMPLATE = "code_format_template"
|
|
CONF_CODE_FORMAT = "code_format"
|
|
CONF_LOCK = "lock"
|
|
CONF_UNLOCK = "unlock"
|
|
CONF_OPEN = "open"
|
|
|
|
DEFAULT_NAME = "Template Lock"
|
|
DEFAULT_OPTIMISTIC = False
|
|
|
|
LEGACY_FIELDS = {
|
|
CONF_CODE_FORMAT_TEMPLATE: CONF_CODE_FORMAT,
|
|
CONF_VALUE_TEMPLATE: CONF_STATE,
|
|
}
|
|
|
|
LOCK_COMMON_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Optional(CONF_CODE_FORMAT): cv.template,
|
|
vol.Required(CONF_LOCK): cv.SCRIPT_SCHEMA,
|
|
vol.Optional(CONF_OPEN): cv.SCRIPT_SCHEMA,
|
|
vol.Optional(CONF_STATE): cv.template,
|
|
vol.Required(CONF_UNLOCK): cv.SCRIPT_SCHEMA,
|
|
}
|
|
)
|
|
|
|
LOCK_YAML_SCHEMA = LOCK_COMMON_SCHEMA.extend(TEMPLATE_ENTITY_OPTIMISTIC_SCHEMA).extend(
|
|
make_template_entity_common_modern_schema(DEFAULT_NAME).schema
|
|
)
|
|
|
|
PLATFORM_SCHEMA = LOCK_PLATFORM_SCHEMA.extend(
|
|
{
|
|
vol.Optional(CONF_CODE_FORMAT_TEMPLATE): cv.template,
|
|
vol.Required(CONF_LOCK): cv.SCRIPT_SCHEMA,
|
|
vol.Optional(CONF_NAME): cv.string,
|
|
vol.Optional(CONF_OPEN): cv.SCRIPT_SCHEMA,
|
|
vol.Optional(CONF_OPTIMISTIC, default=DEFAULT_OPTIMISTIC): cv.boolean,
|
|
vol.Optional(CONF_UNIQUE_ID): cv.string,
|
|
vol.Required(CONF_UNLOCK): cv.SCRIPT_SCHEMA,
|
|
vol.Required(CONF_VALUE_TEMPLATE): cv.template,
|
|
}
|
|
).extend(TEMPLATE_ENTITY_AVAILABILITY_SCHEMA_LEGACY.schema)
|
|
|
|
LOCK_CONFIG_ENTRY_SCHEMA = LOCK_COMMON_SCHEMA.extend(
|
|
TEMPLATE_ENTITY_COMMON_CONFIG_ENTRY_SCHEMA.schema
|
|
)
|
|
|
|
|
|
async def async_setup_platform(
|
|
hass: HomeAssistant,
|
|
config: ConfigType,
|
|
async_add_entities: AddEntitiesCallback,
|
|
discovery_info: DiscoveryInfoType | None = None,
|
|
) -> None:
|
|
"""Set up the template fans."""
|
|
await async_setup_template_platform(
|
|
hass,
|
|
LOCK_DOMAIN,
|
|
config,
|
|
StateLockEntity,
|
|
TriggerLockEntity,
|
|
async_add_entities,
|
|
discovery_info,
|
|
LEGACY_FIELDS,
|
|
)
|
|
|
|
|
|
async def async_setup_entry(
|
|
hass: HomeAssistant,
|
|
config_entry: ConfigEntry,
|
|
async_add_entities: AddConfigEntryEntitiesCallback,
|
|
) -> None:
|
|
"""Initialize config entry."""
|
|
await async_setup_template_entry(
|
|
hass,
|
|
config_entry,
|
|
async_add_entities,
|
|
StateLockEntity,
|
|
LOCK_CONFIG_ENTRY_SCHEMA,
|
|
)
|
|
|
|
|
|
@callback
|
|
def async_create_preview_lock(
|
|
hass: HomeAssistant, name: str, config: dict[str, Any]
|
|
) -> StateLockEntity:
|
|
"""Create a preview."""
|
|
return async_setup_template_preview(
|
|
hass,
|
|
name,
|
|
config,
|
|
StateLockEntity,
|
|
LOCK_CONFIG_ENTRY_SCHEMA,
|
|
)
|
|
|
|
|
|
class AbstractTemplateLock(AbstractTemplateEntity, LockEntity):
|
|
"""Representation of a template lock features."""
|
|
|
|
_entity_id_format = ENTITY_ID_FORMAT
|
|
_optimistic_entity = True
|
|
|
|
# The super init is not called because TemplateEntity and TriggerEntity will call AbstractTemplateEntity.__init__.
|
|
# This ensures that the __init__ on AbstractTemplateEntity is not called twice.
|
|
def __init__(self, config: dict[str, Any]) -> None: # pylint: disable=super-init-not-called
|
|
"""Initialize the features."""
|
|
|
|
self._state: LockState | None = None
|
|
self._code_format_template = config.get(CONF_CODE_FORMAT)
|
|
self._code_format: str | None = None
|
|
self._code_format_template_error: TemplateError | None = None
|
|
|
|
def _iterate_scripts(
|
|
self, config: dict[str, Any]
|
|
) -> Generator[tuple[str, Sequence[dict[str, Any]], LockEntityFeature | int]]:
|
|
for action_id, supported_feature in (
|
|
(CONF_LOCK, 0),
|
|
(CONF_UNLOCK, 0),
|
|
(CONF_OPEN, LockEntityFeature.OPEN),
|
|
):
|
|
if (action_config := config.get(action_id)) is not None:
|
|
yield (action_id, action_config, supported_feature)
|
|
|
|
@property
|
|
def is_locked(self) -> bool:
|
|
"""Return true if lock is locked."""
|
|
return self._state == LockState.LOCKED
|
|
|
|
@property
|
|
def is_jammed(self) -> bool:
|
|
"""Return true if lock is jammed."""
|
|
return self._state == LockState.JAMMED
|
|
|
|
@property
|
|
def is_unlocking(self) -> bool:
|
|
"""Return true if lock is unlocking."""
|
|
return self._state == LockState.UNLOCKING
|
|
|
|
@property
|
|
def is_locking(self) -> bool:
|
|
"""Return true if lock is locking."""
|
|
return self._state == LockState.LOCKING
|
|
|
|
@property
|
|
def is_open(self) -> bool:
|
|
"""Return true if lock is open."""
|
|
return self._state == LockState.OPEN
|
|
|
|
@property
|
|
def is_opening(self) -> bool:
|
|
"""Return true if lock is opening."""
|
|
return self._state == LockState.OPENING
|
|
|
|
@property
|
|
def code_format(self) -> str | None:
|
|
"""Regex for code format or None if no code is required."""
|
|
return self._code_format
|
|
|
|
def _handle_state(self, result: Any) -> None:
|
|
if isinstance(result, bool):
|
|
self._state = LockState.LOCKED if result else LockState.UNLOCKED
|
|
return
|
|
|
|
if isinstance(result, str):
|
|
if result.lower() in (
|
|
"true",
|
|
"on",
|
|
"locked",
|
|
):
|
|
self._state = LockState.LOCKED
|
|
elif result.lower() in (
|
|
"false",
|
|
"off",
|
|
"unlocked",
|
|
):
|
|
self._state = LockState.UNLOCKED
|
|
else:
|
|
try:
|
|
self._state = LockState(result.lower())
|
|
except ValueError:
|
|
self._state = None
|
|
return
|
|
|
|
self._state = None
|
|
|
|
@callback
|
|
def _update_code_format(self, render: str | TemplateError | None):
|
|
"""Update code format from the template."""
|
|
if isinstance(render, TemplateError):
|
|
self._code_format = None
|
|
self._code_format_template_error = render
|
|
elif render in (None, "None", ""):
|
|
self._code_format = None
|
|
self._code_format_template_error = None
|
|
else:
|
|
self._code_format = render
|
|
self._code_format_template_error = None
|
|
|
|
async def async_lock(self, **kwargs: Any) -> None:
|
|
"""Lock the device."""
|
|
# Check if we need to raise for incorrect code format
|
|
# template before processing the action.
|
|
self._raise_template_error_if_available()
|
|
|
|
if self._attr_assumed_state:
|
|
self._state = LockState.LOCKED
|
|
self.async_write_ha_state()
|
|
|
|
tpl_vars = {ATTR_CODE: kwargs.get(ATTR_CODE) if kwargs else None}
|
|
|
|
await self.async_run_script(
|
|
self._action_scripts[CONF_LOCK],
|
|
run_variables=tpl_vars,
|
|
context=self._context,
|
|
)
|
|
|
|
async def async_unlock(self, **kwargs: Any) -> None:
|
|
"""Unlock the device."""
|
|
# Check if we need to raise for incorrect code format
|
|
# template before processing the action.
|
|
self._raise_template_error_if_available()
|
|
|
|
if self._attr_assumed_state:
|
|
self._state = LockState.UNLOCKED
|
|
self.async_write_ha_state()
|
|
|
|
tpl_vars = {ATTR_CODE: kwargs.get(ATTR_CODE) if kwargs else None}
|
|
|
|
await self.async_run_script(
|
|
self._action_scripts[CONF_UNLOCK],
|
|
run_variables=tpl_vars,
|
|
context=self._context,
|
|
)
|
|
|
|
async def async_open(self, **kwargs: Any) -> None:
|
|
"""Open the device."""
|
|
# Check if we need to raise for incorrect code format
|
|
# template before processing the action.
|
|
self._raise_template_error_if_available()
|
|
|
|
if self._attr_assumed_state:
|
|
self._state = LockState.OPEN
|
|
self.async_write_ha_state()
|
|
|
|
tpl_vars = {ATTR_CODE: kwargs.get(ATTR_CODE) if kwargs else None}
|
|
|
|
await self.async_run_script(
|
|
self._action_scripts[CONF_OPEN],
|
|
run_variables=tpl_vars,
|
|
context=self._context,
|
|
)
|
|
|
|
def _raise_template_error_if_available(self):
|
|
"""Raise an error if the rendered code format is not valid."""
|
|
if self._code_format_template_error is not None:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="code_format_template_error",
|
|
translation_placeholders={
|
|
"entity_id": self.entity_id,
|
|
"code_format_template": self._code_format_template.template,
|
|
"cause": str(self._code_format_template_error),
|
|
},
|
|
)
|
|
|
|
|
|
class StateLockEntity(TemplateEntity, AbstractTemplateLock):
|
|
"""Representation of a template lock."""
|
|
|
|
_attr_should_poll = False
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
config: dict[str, Any],
|
|
unique_id: str | None,
|
|
) -> None:
|
|
"""Initialize the lock."""
|
|
TemplateEntity.__init__(self, hass, config, unique_id)
|
|
AbstractTemplateLock.__init__(self, config)
|
|
name = self._attr_name
|
|
if TYPE_CHECKING:
|
|
assert name is not None
|
|
|
|
for action_id, action_config, supported_feature in self._iterate_scripts(
|
|
config
|
|
):
|
|
self.add_script(action_id, action_config, name, DOMAIN)
|
|
self._attr_supported_features |= supported_feature
|
|
|
|
@callback
|
|
def _update_state(self, result: str | TemplateError) -> None:
|
|
"""Update the state from the template."""
|
|
super()._update_state(result)
|
|
if isinstance(result, TemplateError):
|
|
self._state = None
|
|
return
|
|
|
|
self._handle_state(result)
|
|
|
|
@callback
|
|
def _async_setup_templates(self) -> None:
|
|
"""Set up templates."""
|
|
if self._template is not None:
|
|
self.add_template_attribute(
|
|
"_state",
|
|
self._template,
|
|
None,
|
|
self._update_state,
|
|
)
|
|
if self._code_format_template:
|
|
self.add_template_attribute(
|
|
"_code_format_template",
|
|
self._code_format_template,
|
|
None,
|
|
self._update_code_format,
|
|
)
|
|
super()._async_setup_templates()
|
|
|
|
|
|
class TriggerLockEntity(TriggerEntity, AbstractTemplateLock):
|
|
"""Lock entity based on trigger data."""
|
|
|
|
domain = LOCK_DOMAIN
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
coordinator: TriggerUpdateCoordinator,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Initialize the entity."""
|
|
TriggerEntity.__init__(self, hass, coordinator, config)
|
|
AbstractTemplateLock.__init__(self, config)
|
|
|
|
self._attr_name = name = self._rendered.get(CONF_NAME, DEFAULT_NAME)
|
|
|
|
if CONF_STATE in config:
|
|
self._to_render_simple.append(CONF_STATE)
|
|
|
|
if isinstance(config.get(CONF_CODE_FORMAT), template.Template):
|
|
self._to_render_simple.append(CONF_CODE_FORMAT)
|
|
self._parse_result.add(CONF_CODE_FORMAT)
|
|
|
|
for action_id, action_config, supported_feature in self._iterate_scripts(
|
|
config
|
|
):
|
|
self.add_script(action_id, action_config, name, DOMAIN)
|
|
self._attr_supported_features |= supported_feature
|
|
|
|
@callback
|
|
def _handle_coordinator_update(self) -> None:
|
|
"""Handle update of the data."""
|
|
self._process_data()
|
|
|
|
if not self.available:
|
|
self.async_write_ha_state()
|
|
return
|
|
|
|
write_ha_state = False
|
|
for key, updater in (
|
|
(CONF_STATE, self._handle_state),
|
|
(CONF_CODE_FORMAT, self._update_code_format),
|
|
):
|
|
if (rendered := self._rendered.get(key)) is not None:
|
|
updater(rendered)
|
|
write_ha_state = True
|
|
|
|
if not self._attr_assumed_state:
|
|
write_ha_state = True
|
|
elif self._attr_assumed_state and len(self._rendered) > 0:
|
|
# In case any non optimistic template
|
|
write_ha_state = True
|
|
|
|
if write_ha_state:
|
|
self.async_write_ha_state()
|