264 lines
8.3 KiB
Python
264 lines
8.3 KiB
Python
"""TemplateEntity utility class."""
|
|
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import logging
|
|
from typing import Any
|
|
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.components.sensor import (
|
|
CONF_STATE_CLASS,
|
|
DEVICE_CLASSES_SCHEMA,
|
|
STATE_CLASSES_SCHEMA,
|
|
SensorEntity,
|
|
)
|
|
from homeassistant.const import (
|
|
ATTR_ENTITY_PICTURE,
|
|
ATTR_FRIENDLY_NAME,
|
|
ATTR_ICON,
|
|
CONF_DEVICE_CLASS,
|
|
CONF_ICON,
|
|
CONF_NAME,
|
|
CONF_UNIQUE_ID,
|
|
CONF_UNIT_OF_MEASUREMENT,
|
|
)
|
|
from homeassistant.core import HomeAssistant, State, callback
|
|
from homeassistant.exceptions import TemplateError
|
|
from homeassistant.util.json import JSON_DECODE_EXCEPTIONS, json_loads
|
|
|
|
from . import config_validation as cv
|
|
from .entity import Entity
|
|
from .template import attach as template_attach, render_complex
|
|
from .typing import ConfigType
|
|
|
|
CONF_AVAILABILITY = "availability"
|
|
CONF_ATTRIBUTES = "attributes"
|
|
CONF_PICTURE = "picture"
|
|
|
|
CONF_TO_ATTRIBUTE = {
|
|
CONF_ICON: ATTR_ICON,
|
|
CONF_NAME: ATTR_FRIENDLY_NAME,
|
|
CONF_PICTURE: ATTR_ENTITY_PICTURE,
|
|
}
|
|
|
|
TEMPLATE_ENTITY_BASE_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Optional(CONF_ICON): cv.template,
|
|
vol.Optional(CONF_NAME): cv.template,
|
|
vol.Optional(CONF_PICTURE): cv.template,
|
|
vol.Optional(CONF_UNIQUE_ID): cv.string,
|
|
}
|
|
)
|
|
|
|
|
|
def make_template_entity_base_schema(default_name: str) -> vol.Schema:
|
|
"""Return a schema with default name."""
|
|
return vol.Schema(
|
|
{
|
|
vol.Optional(CONF_ICON): cv.template,
|
|
vol.Optional(CONF_NAME, default=default_name): cv.template,
|
|
vol.Optional(CONF_PICTURE): cv.template,
|
|
vol.Optional(CONF_UNIQUE_ID): cv.string,
|
|
}
|
|
)
|
|
|
|
|
|
TEMPLATE_SENSOR_BASE_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
|
|
vol.Optional(CONF_STATE_CLASS): STATE_CLASSES_SCHEMA,
|
|
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
|
|
}
|
|
).extend(TEMPLATE_ENTITY_BASE_SCHEMA.schema)
|
|
|
|
|
|
class TriggerBaseEntity(Entity):
|
|
"""Template Base entity based on trigger data."""
|
|
|
|
domain: str
|
|
extra_template_keys: tuple[str, ...] | None = None
|
|
extra_template_keys_complex: tuple[str, ...] | None = None
|
|
_unique_id: str | None
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Initialize the entity."""
|
|
self.hass = hass
|
|
|
|
self._set_unique_id(config.get(CONF_UNIQUE_ID))
|
|
|
|
self._config = config
|
|
|
|
self._static_rendered = {}
|
|
self._to_render_simple: list[str] = []
|
|
self._to_render_complex: list[str] = []
|
|
|
|
for itm in (
|
|
CONF_AVAILABILITY,
|
|
CONF_ICON,
|
|
CONF_NAME,
|
|
CONF_PICTURE,
|
|
):
|
|
if itm not in config or config[itm] is None:
|
|
continue
|
|
if config[itm].is_static:
|
|
self._static_rendered[itm] = config[itm].template
|
|
else:
|
|
self._to_render_simple.append(itm)
|
|
|
|
if self.extra_template_keys is not None:
|
|
self._to_render_simple.extend(self.extra_template_keys)
|
|
|
|
if self.extra_template_keys_complex is not None:
|
|
self._to_render_complex.extend(self.extra_template_keys_complex)
|
|
|
|
# We make a copy so our initial render is 'unknown' and not 'unavailable'
|
|
self._rendered = dict(self._static_rendered)
|
|
self._parse_result = {CONF_AVAILABILITY}
|
|
self._attr_device_class = config.get(CONF_DEVICE_CLASS)
|
|
|
|
@property
|
|
def name(self) -> str | None:
|
|
"""Name of the entity."""
|
|
return self._rendered.get(CONF_NAME)
|
|
|
|
@property
|
|
def unique_id(self) -> str | None:
|
|
"""Return unique ID of the entity."""
|
|
return self._unique_id
|
|
|
|
@property
|
|
def icon(self) -> str | None:
|
|
"""Return icon."""
|
|
return self._rendered.get(CONF_ICON)
|
|
|
|
@property
|
|
def entity_picture(self) -> str | None:
|
|
"""Return entity picture."""
|
|
return self._rendered.get(CONF_PICTURE)
|
|
|
|
@property
|
|
def available(self) -> bool:
|
|
"""Return availability of the entity."""
|
|
return (
|
|
self._rendered is not self._static_rendered
|
|
and
|
|
# Check against False so `None` is ok
|
|
self._rendered.get(CONF_AVAILABILITY) is not False
|
|
)
|
|
|
|
@property
|
|
def extra_state_attributes(self) -> dict[str, Any] | None:
|
|
"""Return extra attributes."""
|
|
return self._rendered.get(CONF_ATTRIBUTES)
|
|
|
|
async def async_added_to_hass(self) -> None:
|
|
"""Handle being added to Home Assistant."""
|
|
await super().async_added_to_hass()
|
|
template_attach(self.hass, self._config)
|
|
|
|
def _set_unique_id(self, unique_id: str | None) -> None:
|
|
"""Set unique id."""
|
|
self._unique_id = unique_id
|
|
|
|
def restore_attributes(self, last_state: State) -> None:
|
|
"""Restore attributes."""
|
|
for conf_key, attr in CONF_TO_ATTRIBUTE.items():
|
|
if conf_key not in self._config or attr not in last_state.attributes:
|
|
continue
|
|
self._rendered[conf_key] = last_state.attributes[attr]
|
|
|
|
if CONF_ATTRIBUTES in self._config:
|
|
extra_state_attributes = {}
|
|
for attr in self._config[CONF_ATTRIBUTES]:
|
|
if attr not in last_state.attributes:
|
|
continue
|
|
extra_state_attributes[attr] = last_state.attributes[attr]
|
|
self._rendered[CONF_ATTRIBUTES] = extra_state_attributes
|
|
|
|
def _render_templates(self, variables: dict[str, Any]) -> None:
|
|
"""Render templates."""
|
|
try:
|
|
rendered = dict(self._static_rendered)
|
|
|
|
for key in self._to_render_simple:
|
|
rendered[key] = self._config[key].async_render(
|
|
variables,
|
|
parse_result=key in self._parse_result,
|
|
)
|
|
|
|
for key in self._to_render_complex:
|
|
rendered[key] = render_complex(
|
|
self._config[key],
|
|
variables,
|
|
)
|
|
|
|
if CONF_ATTRIBUTES in self._config:
|
|
rendered[CONF_ATTRIBUTES] = render_complex(
|
|
self._config[CONF_ATTRIBUTES],
|
|
variables,
|
|
)
|
|
|
|
self._rendered = rendered
|
|
except TemplateError as err:
|
|
logging.getLogger(f"{__package__}.{self.entity_id.split('.')[0]}").error(
|
|
"Error rendering %s template for %s: %s", key, self.entity_id, err
|
|
)
|
|
self._rendered = self._static_rendered
|
|
|
|
|
|
class ManualTriggerEntity(TriggerBaseEntity):
|
|
"""Template entity based on manual trigger data."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Initialize the entity."""
|
|
TriggerBaseEntity.__init__(self, hass, config)
|
|
# Need initial rendering on `name` as it influence the `entity_id`
|
|
self._rendered[CONF_NAME] = config[CONF_NAME].async_render(
|
|
{},
|
|
parse_result=CONF_NAME in self._parse_result,
|
|
)
|
|
|
|
@callback
|
|
def _process_manual_data(self, value: Any | None = None) -> None:
|
|
"""Process new data manually.
|
|
|
|
Implementing class should call this last in update method to render templates.
|
|
Ex: self._process_manual_data(payload)
|
|
"""
|
|
|
|
self.async_write_ha_state()
|
|
this = None
|
|
if state := self.hass.states.get(self.entity_id):
|
|
this = state.as_dict()
|
|
|
|
run_variables: dict[str, Any] = {"value": value}
|
|
# Silently try if variable is a json and store result in `value_json` if it is.
|
|
with contextlib.suppress(*JSON_DECODE_EXCEPTIONS):
|
|
run_variables["value_json"] = json_loads(run_variables["value"])
|
|
variables = {"this": this, **(run_variables or {})}
|
|
|
|
self._render_templates(variables)
|
|
|
|
|
|
class ManualTriggerSensorEntity(ManualTriggerEntity, SensorEntity):
|
|
"""Template entity based on manual trigger data for sensor."""
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
config: ConfigType,
|
|
) -> None:
|
|
"""Initialize the sensor entity."""
|
|
ManualTriggerEntity.__init__(self, hass, config)
|
|
self._attr_native_unit_of_measurement = config.get(CONF_UNIT_OF_MEASUREMENT)
|
|
self._attr_state_class = config.get(CONF_STATE_CLASS)
|