"""Template helper methods for rendering strings with Home Assistant data.""" import base64 import json import logging import math import random import re from datetime import datetime from functools import wraps from typing import Any, Iterable import jinja2 from jinja2 import contextfilter, contextfunction from jinja2.sandbox import ImmutableSandboxedEnvironment from jinja2.utils import Namespace # type: ignore from homeassistant.const import ( ATTR_ENTITY_ID, ATTR_LATITUDE, ATTR_LONGITUDE, ATTR_UNIT_OF_MEASUREMENT, MATCH_ALL, STATE_UNKNOWN, ) from homeassistant.core import State, callback, split_entity_id, valid_entity_id from homeassistant.exceptions import TemplateError from homeassistant.helpers import location as loc_helper from homeassistant.helpers.typing import HomeAssistantType, TemplateVarsType from homeassistant.loader import bind_hass from homeassistant.util import convert, dt as dt_util, location as loc_util from homeassistant.util.async_ import run_callback_threadsafe # mypy: allow-untyped-calls, allow-untyped-defs # mypy: no-check-untyped-defs, no-warn-return-any _LOGGER = logging.getLogger(__name__) _SENTINEL = object() DATE_STR_FORMAT = "%Y-%m-%d %H:%M:%S" _RENDER_INFO = "template.render_info" _ENVIRONMENT = "template.environment" _RE_NONE_ENTITIES = re.compile(r"distance\(|closest\(", re.I | re.M) _RE_GET_ENTITIES = re.compile( r"(?:(?:states\.|(?:is_state|is_state_attr|state_attr|states)" r"\((?:[\ \'\"]?))([\w]+\.[\w]+)|([\w]+))", re.I | re.M, ) _RE_JINJA_DELIMITERS = re.compile(r"\{%|\{\{") @bind_hass def attach(hass, obj): """Recursively attach hass to all template instances in list and dict.""" if isinstance(obj, list): for child in obj: attach(hass, child) elif isinstance(obj, dict): for child in obj.values(): attach(hass, child) elif isinstance(obj, Template): obj.hass = hass def render_complex(value, variables=None): """Recursive template creator helper function.""" if isinstance(value, list): return [render_complex(item, variables) for item in value] if isinstance(value, dict): return {key: render_complex(item, variables) for key, item in value.items()} return value.async_render(variables) def extract_entities(template, variables=None): """Extract all entities for state_changed listener from template string.""" if template is None or _RE_JINJA_DELIMITERS.search(template) is None: return [] if _RE_NONE_ENTITIES.search(template): return MATCH_ALL extraction = _RE_GET_ENTITIES.findall(template) extraction_final = [] for result in extraction: if ( result[0] == "trigger.entity_id" and "trigger" in variables and "entity_id" in variables["trigger"] ): extraction_final.append(variables["trigger"]["entity_id"]) elif result[0]: extraction_final.append(result[0]) if ( variables and result[1] in variables and isinstance(variables[result[1]], str) and valid_entity_id(variables[result[1]]) ): extraction_final.append(variables[result[1]]) if extraction_final: return list(set(extraction_final)) return MATCH_ALL def _true(arg: Any) -> bool: return True class RenderInfo: """Holds information about a template render.""" def __init__(self, template): """Initialise.""" self.template = template # Will be set sensibly once frozen. self.filter_lifecycle = _true self._result = None self._exception = None self._all_states = False self._domains = [] self._entities = [] def filter(self, entity_id: str) -> bool: """Template should re-render if the state changes.""" return entity_id in self._entities def _filter_lifecycle(self, entity_id: str) -> bool: """Template should re-render if the state changes.""" return ( split_entity_id(entity_id)[0] in self._domains or entity_id in self._entities ) @property def result(self) -> str: """Results of the template computation.""" if self._exception is not None: raise self._exception # pylint: disable=raising-bad-type return self._result def _freeze(self) -> None: self._entities = frozenset(self._entities) if self._all_states: # Leave lifecycle_filter as True del self._domains elif not self._domains: del self._domains self.filter_lifecycle = self.filter else: self._domains = frozenset(self._domains) self.filter_lifecycle = self._filter_lifecycle class Template: """Class to hold a template and manage caching and rendering.""" def __init__(self, template, hass=None): """Instantiate a template.""" if not isinstance(template, str): raise TypeError("Expected template to be a string") self.template = template self._compiled_code = None self._compiled = None self.hass = hass @property def _env(self): if self.hass is None: return _NO_HASS_ENV ret = self.hass.data.get(_ENVIRONMENT) if ret is None: ret = self.hass.data[_ENVIRONMENT] = TemplateEnvironment(self.hass) return ret def ensure_valid(self): """Return if template is valid.""" if self._compiled_code is not None: return try: self._compiled_code = self._env.compile(self.template) except jinja2.exceptions.TemplateSyntaxError as err: raise TemplateError(err) def extract_entities(self, variables=None): """Extract all entities for state_changed listener.""" return extract_entities(self.template, variables) def render(self, variables: TemplateVarsType = None, **kwargs: Any) -> str: """Render given template.""" if variables is not None: kwargs.update(variables) return run_callback_threadsafe( self.hass.loop, self.async_render, kwargs ).result() @callback def async_render(self, variables: TemplateVarsType = None, **kwargs: Any) -> str: """Render given template. This method must be run in the event loop. """ compiled = self._compiled or self._ensure_compiled() if variables is not None: kwargs.update(variables) try: return compiled.render(kwargs).strip() except jinja2.TemplateError as err: raise TemplateError(err) @callback def async_render_to_info( self, variables: TemplateVarsType = None, **kwargs: Any ) -> RenderInfo: """Render the template and collect an entity filter.""" assert self.hass and _RENDER_INFO not in self.hass.data render_info = self.hass.data[_RENDER_INFO] = RenderInfo(self) # pylint: disable=protected-access try: render_info._result = self.async_render(variables, **kwargs) except TemplateError as ex: render_info._exception = ex finally: del self.hass.data[_RENDER_INFO] render_info._freeze() return render_info def render_with_possible_json_value(self, value, error_value=_SENTINEL): """Render template with value exposed. If valid JSON will expose value_json too. """ return run_callback_threadsafe( self.hass.loop, self.async_render_with_possible_json_value, value, error_value, ).result() @callback def async_render_with_possible_json_value( self, value, error_value=_SENTINEL, variables=None ): """Render template with value exposed. If valid JSON will expose value_json too. This method must be run in the event loop. """ if self._compiled is None: self._ensure_compiled() variables = dict(variables or {}) variables["value"] = value try: variables["value_json"] = json.loads(value) except (ValueError, TypeError): pass try: return self._compiled.render(variables).strip() except jinja2.TemplateError as ex: if error_value is _SENTINEL: _LOGGER.error( "Error parsing value: %s (value: %s, template: %s)", ex, value, self.template, ) return value if error_value is _SENTINEL else error_value def _ensure_compiled(self): """Bind a template to a specific hass instance.""" self.ensure_valid() assert self.hass is not None, "hass variable not set on template" env = self._env self._compiled = jinja2.Template.from_code( env, self._compiled_code, env.globals, None ) return self._compiled def __eq__(self, other): """Compare template with another.""" return ( self.__class__ == other.__class__ and self.template == other.template and self.hass == other.hass ) def __hash__(self): """Hash code for template.""" return hash(self.template) def __repr__(self): """Representation of Template.""" return 'Template("' + self.template + '")' class AllStates: """Class to expose all HA states as attributes.""" def __init__(self, hass): """Initialize all states.""" self._hass = hass def __getattr__(self, name): """Return the domain state.""" if "." in name: if not valid_entity_id(name): raise TemplateError(f"Invalid entity ID '{name}'") return _get_state(self._hass, name) if not valid_entity_id(name + ".entity"): raise TemplateError(f"Invalid domain name '{name}'") return DomainStates(self._hass, name) def _collect_all(self): render_info = self._hass.data.get(_RENDER_INFO) if render_info is not None: # pylint: disable=protected-access render_info._all_states = True def __iter__(self): """Return all states.""" self._collect_all() return iter( _wrap_state(self._hass, state) for state in sorted( self._hass.states.async_all(), key=lambda state: state.entity_id ) ) def __len__(self): """Return number of states.""" self._collect_all() return len(self._hass.states.async_entity_ids()) def __call__(self, entity_id): """Return the states.""" state = _get_state(self._hass, entity_id) return STATE_UNKNOWN if state is None else state.state def __repr__(self): """Representation of All States.""" return "