core/homeassistant/components/template/trigger_entity.py

192 lines
5.9 KiB
Python

"""Trigger entity."""
from __future__ import annotations
import logging
from typing import Any
from homeassistant.const import (
ATTR_ENTITY_PICTURE,
ATTR_FRIENDLY_NAME,
ATTR_ICON,
CONF_DEVICE_CLASS,
CONF_ICON,
CONF_NAME,
CONF_UNIQUE_ID,
)
from homeassistant.core import HomeAssistant, State, callback
from homeassistant.exceptions import TemplateError
from homeassistant.helpers import template
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import TriggerUpdateCoordinator
from .const import CONF_ATTRIBUTES, CONF_AVAILABILITY, CONF_PICTURE
CONF_TO_ATTRIBUTE = {
CONF_ICON: ATTR_ICON,
CONF_NAME: ATTR_FRIENDLY_NAME,
CONF_PICTURE: ATTR_ENTITY_PICTURE,
}
class TriggerEntity(CoordinatorEntity[TriggerUpdateCoordinator]):
"""Template entity based on trigger data."""
domain: str
extra_template_keys: tuple | None = None
extra_template_keys_complex: tuple | None = None
def __init__(
self,
hass: HomeAssistant,
coordinator: TriggerUpdateCoordinator,
config: dict,
) -> None:
"""Initialize the entity."""
super().__init__(coordinator)
entity_unique_id = config.get(CONF_UNIQUE_ID)
self._unique_id: str | None
if entity_unique_id and coordinator.unique_id:
self._unique_id = f"{coordinator.unique_id}-{entity_unique_id}"
else:
self._unique_id = entity_unique_id
self._config = config
self._static_rendered = {}
self._to_render_simple = []
self._to_render_complex: list[str] = []
for itm in (
CONF_AVAILABILITY,
CONF_ICON,
CONF_NAME,
CONF_PICTURE,
):
if itm not in config:
continue
if config[itm].is_static:
self._static_rendered[itm] = config[itm].template
else:
self._to_render_simple.append(itm)
if self.extra_template_keys is not None:
self._to_render_simple.extend(self.extra_template_keys)
if self.extra_template_keys_complex is not None:
self._to_render_complex.extend(self.extra_template_keys_complex)
# We make a copy so our initial render is 'unknown' and not 'unavailable'
self._rendered = dict(self._static_rendered)
self._parse_result = {CONF_AVAILABILITY}
@property
def name(self):
"""Name of the entity."""
return self._rendered.get(CONF_NAME)
@property
def unique_id(self):
"""Return unique ID of the entity."""
return self._unique_id
@property
def device_class(self):
"""Return device class of the entity."""
return self._config.get(CONF_DEVICE_CLASS)
@property
def icon(self) -> str | None:
"""Return icon."""
return self._rendered.get(CONF_ICON)
@property
def entity_picture(self) -> str | None:
"""Return entity picture."""
return self._rendered.get(CONF_PICTURE)
@property
def available(self):
"""Return availability of the entity."""
return (
self._rendered is not self._static_rendered
and
# Check against False so `None` is ok
self._rendered.get(CONF_AVAILABILITY) is not False
)
@property
def extra_state_attributes(self) -> dict[str, Any] | None:
"""Return extra attributes."""
return self._rendered.get(CONF_ATTRIBUTES)
async def async_added_to_hass(self) -> None:
"""Handle being added to Home Assistant."""
template.attach(self.hass, self._config)
await super().async_added_to_hass()
if self.coordinator.data is not None:
self._process_data()
def restore_attributes(self, last_state: State) -> None:
"""Restore attributes."""
for conf_key, attr in CONF_TO_ATTRIBUTE.items():
if conf_key not in self._config or attr not in last_state.attributes:
continue
self._rendered[conf_key] = last_state.attributes[attr]
if CONF_ATTRIBUTES in self._config:
extra_state_attributes = {}
for attr in self._config[CONF_ATTRIBUTES]:
if attr not in last_state.attributes:
continue
extra_state_attributes[attr] = last_state.attributes[attr]
self._rendered[CONF_ATTRIBUTES] = extra_state_attributes
@callback
def _process_data(self) -> None:
"""Process new data."""
this = None
if state := self.hass.states.get(self.entity_id):
this = state.as_dict()
run_variables = self.coordinator.data["run_variables"]
variables = {"this": this, **(run_variables or {})}
try:
rendered = dict(self._static_rendered)
for key in self._to_render_simple:
rendered[key] = self._config[key].async_render(
variables,
parse_result=key in self._parse_result,
)
for key in self._to_render_complex:
rendered[key] = template.render_complex(
self._config[key],
variables,
)
if CONF_ATTRIBUTES in self._config:
rendered[CONF_ATTRIBUTES] = template.render_complex(
self._config[CONF_ATTRIBUTES],
variables,
)
self._rendered = rendered
except TemplateError as err:
logging.getLogger(f"{__package__}.{self.entity_id.split('.')[0]}").error(
"Error rendering %s template for %s: %s", key, self.entity_id, err
)
self._rendered = self._static_rendered
self.async_set_context(self.coordinator.data["context"])
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self._process_data()
self.async_write_ha_state()