core/homeassistant/helpers/template.py

1007 lines
31 KiB
Python
Raw Normal View History

"""Template helper methods for rendering strings with Home Assistant data."""
import base64
from datetime import datetime
from functools import wraps
2015-12-11 05:38:35 +00:00
import json
import logging
import math
import random
import re
from typing import Any, Dict, Iterable, List, Optional, Union
2016-02-19 05:27:50 +00:00
import jinja2
from jinja2 import contextfilter, contextfunction
from jinja2.sandbox import ImmutableSandboxedEnvironment
from jinja2.utils import Namespace # type: ignore
2016-02-19 05:27:50 +00:00
from homeassistant.const import (
2019-07-31 19:25:30 +00:00
ATTR_ENTITY_ID,
ATTR_LATITUDE,
ATTR_LONGITUDE,
ATTR_UNIT_OF_MEASUREMENT,
MATCH_ALL,
STATE_UNKNOWN,
)
from homeassistant.core import State, callback, split_entity_id, valid_entity_id
from homeassistant.exceptions import TemplateError
2016-02-21 19:13:40 +00:00
from homeassistant.helpers import location as loc_helper
2019-09-20 15:23:34 +00:00
from homeassistant.helpers.typing import HomeAssistantType, TemplateVarsType
from homeassistant.loader import bind_hass
from homeassistant.util import convert, dt as dt_util, location as loc_util
from homeassistant.util.async_ import run_callback_threadsafe
2019-09-20 15:23:34 +00:00
# mypy: allow-untyped-calls, allow-untyped-defs
# mypy: no-check-untyped-defs, no-warn-return-any
_LOGGER = logging.getLogger(__name__)
_SENTINEL = object()
2016-07-23 02:47:43 +00:00
DATE_STR_FORMAT = "%Y-%m-%d %H:%M:%S"
2015-12-10 00:20:09 +00:00
2019-07-31 19:25:30 +00:00
_RENDER_INFO = "template.render_info"
_ENVIRONMENT = "template.environment"
_RE_NONE_ENTITIES = re.compile(r"distance\(|closest\(", re.I | re.M)
_RE_GET_ENTITIES = re.compile(
r"(?:(?:states\.|(?:is_state|is_state_attr|state_attr|states)"
2019-07-31 19:25:30 +00:00
r"\((?:[\ \'\"]?))([\w]+\.[\w]+)|([\w]+))",
re.I | re.M,
)
_RE_JINJA_DELIMITERS = re.compile(r"\{%|\{\{")
2015-12-10 00:20:09 +00:00
2015-12-11 05:38:35 +00:00
@bind_hass
def attach(hass, obj):
"""Recursively attach hass to all template instances in list and dict."""
if isinstance(obj, list):
for child in obj:
attach(hass, child)
elif isinstance(obj, dict):
for child in obj.values():
attach(hass, child)
elif isinstance(obj, Template):
obj.hass = hass
2015-12-11 05:38:35 +00:00
2016-05-12 05:44:44 +00:00
def render_complex(value, variables=None):
"""Recursive template creator helper function."""
if isinstance(value, list):
2019-07-31 19:25:30 +00:00
return [render_complex(item, variables) for item in value]
if isinstance(value, dict):
2019-07-31 19:25:30 +00:00
return {key: render_complex(item, variables) for key, item in value.items()}
if isinstance(value, Template):
return value.async_render(variables)
return value
def extract_entities(
template: Optional[str], variables: Optional[Dict[str, Any]] = None
) -> Union[str, List[str]]:
"""Extract all entities for state_changed listener from template string."""
if template is None or _RE_JINJA_DELIMITERS.search(template) is None:
return []
if _RE_NONE_ENTITIES.search(template):
return MATCH_ALL
extraction = _RE_GET_ENTITIES.findall(template)
extraction_final = []
for result in extraction:
2019-07-31 19:25:30 +00:00
if (
result[0] == "trigger.entity_id"
and variables
2019-07-31 19:25:30 +00:00
and "trigger" in variables
and "entity_id" in variables["trigger"]
):
extraction_final.append(variables["trigger"]["entity_id"])
elif result[0]:
extraction_final.append(result[0])
2019-07-31 19:25:30 +00:00
if (
variables
and result[1] in variables
and isinstance(variables[result[1]], str)
and valid_entity_id(variables[result[1]])
):
extraction_final.append(variables[result[1]])
if extraction_final:
return list(set(extraction_final))
return MATCH_ALL
2019-09-20 15:23:34 +00:00
def _true(arg: Any) -> bool:
return True
class RenderInfo:
"""Holds information about a template render."""
def __init__(self, template):
"""Initialise."""
self.template = template
# Will be set sensibly once frozen.
self.filter_lifecycle = _true
self._result = None
self._exception = None
self._all_states = False
self._domains = []
self._entities = []
def filter(self, entity_id: str) -> bool:
"""Template should re-render if the state changes."""
return entity_id in self._entities
def _filter_lifecycle(self, entity_id: str) -> bool:
"""Template should re-render if the state changes."""
return (
split_entity_id(entity_id)[0] in self._domains
2019-07-31 19:25:30 +00:00
or entity_id in self._entities
)
@property
def result(self) -> str:
"""Results of the template computation."""
if self._exception is not None:
raise self._exception
return self._result
def _freeze(self) -> None:
self._entities = frozenset(self._entities)
if self._all_states:
# Leave lifecycle_filter as True
del self._domains
elif not self._domains:
del self._domains
self.filter_lifecycle = self.filter
else:
self._domains = frozenset(self._domains)
self.filter_lifecycle = self._filter_lifecycle
class Template:
"""Class to hold a template and manage caching and rendering."""
def __init__(self, template, hass=None):
"""Instantiate a template."""
if not isinstance(template, str):
2019-07-31 19:25:30 +00:00
raise TypeError("Expected template to be a string")
self.template: str = template
self._compiled_code = None
self._compiled = None
self.hass = hass
@property
def _env(self):
if self.hass is None:
return _NO_HASS_ENV
ret = self.hass.data.get(_ENVIRONMENT)
if ret is None:
ret = self.hass.data[_ENVIRONMENT] = TemplateEnvironment(self.hass)
return ret
def ensure_valid(self):
"""Return if template is valid."""
if self._compiled_code is not None:
return
try:
self._compiled_code = self._env.compile(self.template)
except jinja2.exceptions.TemplateSyntaxError as err:
raise TemplateError(err)
def extract_entities(
self, variables: Dict[str, Any] = None
) -> Union[str, List[str]]:
"""Extract all entities for state_changed listener."""
return extract_entities(self.template, variables)
2019-09-20 15:23:34 +00:00
def render(self, variables: TemplateVarsType = None, **kwargs: Any) -> str:
"""Render given template."""
if variables is not None:
kwargs.update(variables)
return run_callback_threadsafe(
2019-07-31 19:25:30 +00:00
self.hass.loop, self.async_render, kwargs
).result()
@callback
2019-09-20 15:23:34 +00:00
def async_render(self, variables: TemplateVarsType = None, **kwargs: Any) -> str:
"""Render given template.
This method must be run in the event loop.
"""
compiled = self._compiled or self._ensure_compiled()
if variables is not None:
kwargs.update(variables)
try:
return compiled.render(kwargs).strip()
except jinja2.TemplateError as err:
raise TemplateError(err)
@callback
def async_render_to_info(
2019-09-20 15:23:34 +00:00
self, variables: TemplateVarsType = None, **kwargs: Any
2019-07-31 19:25:30 +00:00
) -> RenderInfo:
"""Render the template and collect an entity filter."""
assert self.hass and _RENDER_INFO not in self.hass.data
render_info = self.hass.data[_RENDER_INFO] = RenderInfo(self)
# pylint: disable=protected-access
try:
render_info._result = self.async_render(variables, **kwargs)
except TemplateError as ex:
render_info._exception = ex
finally:
del self.hass.data[_RENDER_INFO]
render_info._freeze()
return render_info
def render_with_possible_json_value(self, value, error_value=_SENTINEL):
"""Render template with value exposed.
If valid JSON will expose value_json too.
"""
return run_callback_threadsafe(
2019-07-31 19:25:30 +00:00
self.hass.loop,
self.async_render_with_possible_json_value,
value,
error_value,
).result()
@callback
2019-07-31 19:25:30 +00:00
def async_render_with_possible_json_value(
self, value, error_value=_SENTINEL, variables=None
):
"""Render template with value exposed.
If valid JSON will expose value_json too.
This method must be run in the event loop.
"""
if self._compiled is None:
self._ensure_compiled()
variables = dict(variables or {})
2019-07-31 19:25:30 +00:00
variables["value"] = value
try:
2019-07-31 19:25:30 +00:00
variables["value_json"] = json.loads(value)
except (ValueError, TypeError):
pass
try:
return self._compiled.render(variables).strip()
except jinja2.TemplateError as ex:
if error_value is _SENTINEL:
_LOGGER.error(
"Error parsing value: %s (value: %s, template: %s)",
2019-07-31 19:25:30 +00:00
ex,
value,
self.template,
)
return value if error_value is _SENTINEL else error_value
def _ensure_compiled(self):
"""Bind a template to a specific hass instance."""
self.ensure_valid()
2019-07-31 19:25:30 +00:00
assert self.hass is not None, "hass variable not set on template"
env = self._env
self._compiled = jinja2.Template.from_code(
2019-07-31 19:25:30 +00:00
env, self._compiled_code, env.globals, None
)
return self._compiled
2015-12-10 00:20:09 +00:00
def __eq__(self, other):
"""Compare template with another."""
2019-07-31 19:25:30 +00:00
return (
self.__class__ == other.__class__
and self.template == other.template
and self.hass == other.hass
)
def __hash__(self):
"""Hash code for template."""
return hash(self.template)
def __repr__(self):
"""Representation of Template."""
2019-07-31 19:25:30 +00:00
return 'Template("' + self.template + '")'
2015-12-10 00:20:09 +00:00
class AllStates:
2016-02-23 20:06:50 +00:00
"""Class to expose all HA states as attributes."""
2016-03-07 22:39:52 +00:00
2015-12-10 00:20:09 +00:00
def __init__(self, hass):
2016-03-07 22:39:52 +00:00
"""Initialize all states."""
2015-12-10 00:20:09 +00:00
self._hass = hass
def __getattr__(self, name):
2016-03-07 22:39:52 +00:00
"""Return the domain state."""
2019-07-31 19:25:30 +00:00
if "." in name:
if not valid_entity_id(name):
raise TemplateError(f"Invalid entity ID '{name}'")
return _get_state(self._hass, name)
if not valid_entity_id(f"{name}.entity"):
raise TemplateError(f"Invalid domain name '{name}'")
2015-12-10 00:20:09 +00:00
return DomainStates(self._hass, name)
def _collect_all(self):
render_info = self._hass.data.get(_RENDER_INFO)
if render_info is not None:
# pylint: disable=protected-access
render_info._all_states = True
2015-12-10 00:20:09 +00:00
def __iter__(self):
2016-03-07 22:39:52 +00:00
"""Return all states."""
self._collect_all()
return iter(
2019-07-31 19:25:30 +00:00
_wrap_state(self._hass, state)
for state in sorted(
self._hass.states.async_all(), key=lambda state: state.entity_id
)
)
2015-12-10 00:20:09 +00:00
def __len__(self):
"""Return number of states."""
self._collect_all()
return len(self._hass.states.async_entity_ids())
2015-12-13 06:19:37 +00:00
def __call__(self, entity_id):
2016-03-07 22:39:52 +00:00
"""Return the states."""
state = _get_state(self._hass, entity_id)
2015-12-13 06:19:37 +00:00
return STATE_UNKNOWN if state is None else state.state
def __repr__(self):
"""Representation of All States."""
2019-07-31 19:25:30 +00:00
return "<template AllStates>"
2015-12-10 00:20:09 +00:00
class DomainStates:
2016-02-23 20:06:50 +00:00
"""Class to expose a specific HA domain as attributes."""
2015-12-10 00:20:09 +00:00
def __init__(self, hass, domain):
2016-03-07 22:39:52 +00:00
"""Initialize the domain states."""
2015-12-10 00:20:09 +00:00
self._hass = hass
self._domain = domain
def __getattr__(self, name):
2016-03-07 22:39:52 +00:00
"""Return the states."""
entity_id = f"{self._domain}.{name}"
if not valid_entity_id(entity_id):
raise TemplateError(f"Invalid entity ID '{entity_id}'")
return _get_state(self._hass, entity_id)
2020-02-15 21:03:53 +00:00
def _collect_domain(self) -> None:
entity_collect = self._hass.data.get(_RENDER_INFO)
if entity_collect is not None:
# pylint: disable=protected-access
entity_collect._domains.append(self._domain)
2015-12-10 00:20:09 +00:00
def __iter__(self):
2016-03-07 22:39:52 +00:00
"""Return the iteration over all the states."""
self._collect_domain()
2019-07-31 19:25:30 +00:00
return iter(
sorted(
(
_wrap_state(self._hass, state)
for state in self._hass.states.async_all()
if state.domain == self._domain
),
key=lambda state: state.entity_id,
)
)
2020-02-15 21:03:53 +00:00
def __len__(self) -> int:
"""Return number of states."""
self._collect_domain()
return len(self._hass.states.async_entity_ids(self._domain))
2020-02-15 21:03:53 +00:00
def __repr__(self) -> str:
"""Representation of Domain States."""
return f"<template DomainStates('{self._domain}')>"
class TemplateState(State):
"""Class to represent a state object in a template."""
# Inheritance is done so functions that check against State keep working
# pylint: disable=super-init-not-called
def __init__(self, hass, state):
"""Initialize template state."""
self._hass = hass
self._state = state
def _access_state(self):
2019-07-31 19:25:30 +00:00
state = object.__getattribute__(self, "_state")
hass = object.__getattribute__(self, "_hass")
_collect_state(hass, state.entity_id)
return state
@property
2020-02-15 21:03:53 +00:00
def state_with_unit(self) -> str:
"""Return the state concatenated with the unit if available."""
2019-07-31 19:25:30 +00:00
state = object.__getattribute__(self, "_access_state")()
unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
if unit is None:
return state.state
return f"{state.state} {unit}"
def __getattribute__(self, name):
"""Return an attribute of the state."""
# This one doesn't count as an access of the state
# since we either found it by looking direct for the ID
# or got it off an iterator.
2019-07-31 19:25:30 +00:00
if name == "entity_id" or name in object.__dict__:
state = object.__getattribute__(self, "_state")
return getattr(state, name)
if name in TemplateState.__dict__:
return object.__getattribute__(self, name)
2019-07-31 19:25:30 +00:00
state = object.__getattribute__(self, "_access_state")()
return getattr(state, name)
2020-02-15 21:03:53 +00:00
def __repr__(self) -> str:
"""Representation of Template State."""
2019-07-31 19:25:30 +00:00
state = object.__getattribute__(self, "_access_state")()
rep = state.__repr__()
return f"<template {rep[1:]}"
def _collect_state(hass, entity_id):
entity_collect = hass.data.get(_RENDER_INFO)
if entity_collect is not None:
# pylint: disable=protected-access
entity_collect._entities.append(entity_id)
def _wrap_state(hass, state):
"""Wrap a state."""
return None if state is None else TemplateState(hass, state)
def _get_state(hass, entity_id):
state = hass.states.get(entity_id)
if state is None:
# Only need to collect if none, if not none collect first actual
# access to the state properties in the state wrapper.
_collect_state(hass, entity_id)
return None
return _wrap_state(hass, state)
def _resolve_state(hass, entity_id_or_state):
"""Return state or entity_id if given."""
if isinstance(entity_id_or_state, State):
return entity_id_or_state
if isinstance(entity_id_or_state, str):
return _get_state(hass, entity_id_or_state)
return None
2019-09-20 15:23:34 +00:00
def expand(hass: HomeAssistantType, *args: Any) -> Iterable[State]:
"""Expand out any groups into entity states."""
search = list(args)
found = {}
while search:
entity = search.pop()
if isinstance(entity, str):
entity_id = entity
entity = _get_state(hass, entity)
if entity is None:
continue
elif isinstance(entity, State):
entity_id = entity.entity_id
elif isinstance(entity, Iterable):
search += entity
continue
else:
# ignore other types
continue
from homeassistant.components import group
2019-07-31 19:25:30 +00:00
if split_entity_id(entity_id)[0] == group.DOMAIN:
# Collect state will be called in here since it's wrapped
group_entities = entity.attributes.get(ATTR_ENTITY_ID)
if group_entities:
search += group_entities
else:
found[entity_id] = entity
return sorted(found.values(), key=lambda a: a.entity_id)
2016-02-21 05:58:53 +00:00
def closest(hass, *args):
"""Find closest entity.
2016-02-21 19:13:40 +00:00
Closest to home:
closest(states)
closest(states.device_tracker)
closest('group.children')
closest(states.group.children)
2016-02-21 19:13:40 +00:00
Closest to a point:
closest(23.456, 23.456, 'group.children')
closest('zone.school', 'group.children')
closest(states.zone.school, 'group.children')
As a filter:
states | closest
states.device_tracker | closest
['group.children', states.device_tracker] | closest
'group.children' | closest(23.456, 23.456)
states.device_tracker | closest('zone.school')
'group.children' | closest(states.zone.school)
"""
if len(args) == 1:
latitude = hass.config.latitude
longitude = hass.config.longitude
entities = args[0]
elif len(args) == 2:
point_state = _resolve_state(hass, args[0])
if point_state is None:
_LOGGER.warning("Closest:Unable to find state %s", args[0])
return None
if not loc_helper.has_location(point_state):
_LOGGER.warning(
2019-07-31 19:25:30 +00:00
"Closest:State does not contain valid location: %s", point_state
)
return None
latitude = point_state.attributes.get(ATTR_LATITUDE)
longitude = point_state.attributes.get(ATTR_LONGITUDE)
entities = args[1]
else:
latitude = convert(args[0], float)
longitude = convert(args[1], float)
if latitude is None or longitude is None:
_LOGGER.warning(
2019-07-31 19:25:30 +00:00
"Closest:Received invalid coordinates: %s, %s", args[0], args[1]
)
return None
entities = args[2]
states = expand(hass, entities)
# state will already be wrapped here
return loc_helper.closest(latitude, longitude, states)
def closest_filter(hass, *args):
"""Call closest as a filter. Need to reorder arguments."""
new_args = list(args[1:])
new_args.append(args[0])
return closest(hass, *new_args)
def distance(hass, *args):
"""Calculate distance.
2016-02-21 19:13:40 +00:00
Will calculate distance from home to a point or between points.
Points can be passed in using state objects or lat/lng coordinates.
"""
locations = []
to_process = list(args)
2016-02-21 19:13:40 +00:00
while to_process:
value = to_process.pop(0)
point_state = _resolve_state(hass, value)
if point_state is None:
# We expect this and next value to be lat&lng
if not to_process:
_LOGGER.warning(
2019-07-31 19:25:30 +00:00
"Distance:Expected latitude and longitude, got %s", value
)
2016-02-21 19:13:40 +00:00
return None
value_2 = to_process.pop(0)
latitude = convert(value, float)
longitude = convert(value_2, float)
if latitude is None or longitude is None:
2019-07-31 19:25:30 +00:00
_LOGGER.warning(
"Distance:Unable to process latitude and longitude: %s, %s",
2019-07-31 19:25:30 +00:00
value,
value_2,
)
return None
else:
if not loc_helper.has_location(point_state):
2016-02-21 19:13:40 +00:00
_LOGGER.warning(
2019-07-31 19:25:30 +00:00
"distance:State does not contain valid location: %s", point_state
)
2016-02-21 19:13:40 +00:00
return None
latitude = point_state.attributes.get(ATTR_LATITUDE)
longitude = point_state.attributes.get(ATTR_LONGITUDE)
locations.append((latitude, longitude))
2016-02-21 19:13:40 +00:00
if len(locations) == 1:
return hass.config.distance(*locations[0])
2016-02-21 19:13:40 +00:00
return hass.config.units.length(
2019-07-31 19:25:30 +00:00
loc_util.distance(*locations[0] + locations[1]), "m"
)
2016-02-21 19:13:40 +00:00
2019-09-20 15:23:34 +00:00
def is_state(hass: HomeAssistantType, entity_id: str, state: State) -> bool:
"""Test if a state is a specific value."""
state_obj = _get_state(hass, entity_id)
return state_obj is not None and state_obj.state == state
2016-02-21 19:13:40 +00:00
def is_state_attr(hass, entity_id, name, value):
"""Test if a state's attribute is a specific value."""
attr = state_attr(hass, entity_id, name)
return attr is not None and attr == value
2016-02-21 05:58:53 +00:00
def state_attr(hass, entity_id, name):
"""Get a specific attribute from a state."""
state_obj = _get_state(hass, entity_id)
if state_obj is not None:
return state_obj.attributes.get(name)
return None
2016-02-21 05:58:53 +00:00
def forgiving_round(value, precision=0, method="common"):
"""Round accepted strings."""
try:
# support rounding methods like jinja
multiplier = float(10 ** precision)
if method == "ceil":
value = math.ceil(float(value) * multiplier) / multiplier
elif method == "floor":
value = math.floor(float(value) * multiplier) / multiplier
elif method == "half":
value = round(float(value) * 2) / 2
else:
# if method is common or something else, use common rounding
value = round(float(value), precision)
2015-12-12 02:45:53 +00:00
return int(value) if precision == 0 else value
2016-02-21 05:59:16 +00:00
except (ValueError, TypeError):
# If value can't be converted to float
return value
def multiply(value, amount):
2016-02-21 05:59:16 +00:00
"""Filter to convert value to float and multiply it."""
try:
return float(value) * amount
2016-02-21 19:12:37 +00:00
except (ValueError, TypeError):
# If value can't be converted to float
return value
2015-12-13 06:19:37 +00:00
def logarithm(value, base=math.e):
"""Filter to get logarithm of the value with a specific base."""
try:
return math.log(float(value), float(base))
except (ValueError, TypeError):
return value
def sine(value):
"""Filter to get sine of the value."""
try:
return math.sin(float(value))
except (ValueError, TypeError):
return value
def cosine(value):
"""Filter to get cosine of the value."""
try:
return math.cos(float(value))
except (ValueError, TypeError):
return value
def tangent(value):
"""Filter to get tangent of the value."""
try:
return math.tan(float(value))
except (ValueError, TypeError):
return value
def arc_sine(value):
"""Filter to get arc sine of the value."""
try:
return math.asin(float(value))
except (ValueError, TypeError):
return value
def arc_cosine(value):
"""Filter to get arc cosine of the value."""
try:
return math.acos(float(value))
except (ValueError, TypeError):
return value
def arc_tangent(value):
"""Filter to get arc tangent of the value."""
try:
return math.atan(float(value))
except (ValueError, TypeError):
return value
def arc_tangent2(*args):
"""Filter to calculate four quadrant arc tangent of y / x."""
try:
if len(args) == 1 and isinstance(args[0], (list, tuple)):
args = args[0]
return math.atan2(float(args[0]), float(args[1]))
except (ValueError, TypeError):
return args
def square_root(value):
"""Filter to get square root of the value."""
try:
return math.sqrt(float(value))
except (ValueError, TypeError):
return value
def timestamp_custom(value, date_format=DATE_STR_FORMAT, local=True):
"""Filter to convert given timestamp to format."""
try:
date = dt_util.utc_from_timestamp(value)
if local:
date = dt_util.as_local(date)
return date.strftime(date_format)
except (ValueError, TypeError):
# If timestamp can't be converted
return value
2016-07-23 02:47:43 +00:00
def timestamp_local(value):
"""Filter to convert given timestamp to local date/time."""
try:
2019-07-31 19:25:30 +00:00
return dt_util.as_local(dt_util.utc_from_timestamp(value)).strftime(
DATE_STR_FORMAT
)
2016-07-23 02:47:43 +00:00
except (ValueError, TypeError):
# If timestamp can't be converted
return value
def timestamp_utc(value):
"""Filter to convert given timestamp to UTC date/time."""
2016-07-23 02:47:43 +00:00
try:
return dt_util.utc_from_timestamp(value).strftime(DATE_STR_FORMAT)
except (ValueError, TypeError):
# If timestamp can't be converted
return value
def forgiving_as_timestamp(value):
"""Try to convert value to timestamp."""
try:
return dt_util.as_timestamp(value)
except (ValueError, TypeError):
return None
2016-11-11 06:57:44 +00:00
def strptime(string, fmt):
"""Parse a time string to datetime."""
try:
return datetime.strptime(string, fmt)
except (ValueError, AttributeError):
return string
def fail_when_undefined(value):
"""Filter to force a failure when the value is undefined."""
if isinstance(value, jinja2.Undefined):
value()
return value
2016-02-24 18:41:49 +00:00
def forgiving_float(value):
"""Try to convert value to a float."""
try:
return float(value)
except (ValueError, TypeError):
return value
2019-07-31 19:25:30 +00:00
def regex_match(value, find="", ignorecase=False):
"""Match value using regex."""
if not isinstance(value, str):
value = str(value)
flags = re.I if ignorecase else 0
return bool(re.match(find, value, flags))
2019-07-31 19:25:30 +00:00
def regex_replace(value="", find="", replace="", ignorecase=False):
"""Replace using regex."""
if not isinstance(value, str):
value = str(value)
flags = re.I if ignorecase else 0
regex = re.compile(find, flags)
return regex.sub(replace, value)
2019-07-31 19:25:30 +00:00
def regex_search(value, find="", ignorecase=False):
"""Search using regex."""
if not isinstance(value, str):
value = str(value)
flags = re.I if ignorecase else 0
return bool(re.search(find, value, flags))
2019-07-31 19:25:30 +00:00
def regex_findall_index(value, find="", index=0, ignorecase=False):
"""Find all matches using regex and then pick specific match index."""
if not isinstance(value, str):
value = str(value)
flags = re.I if ignorecase else 0
return re.findall(find, value, flags)[index]
def bitwise_and(first_value, second_value):
"""Perform a bitwise and operation."""
return first_value & second_value
def bitwise_or(first_value, second_value):
"""Perform a bitwise or operation."""
return first_value | second_value
def base64_encode(value):
"""Perform base64 encode."""
2019-07-31 19:25:30 +00:00
return base64.b64encode(value.encode("utf-8")).decode("utf-8")
def base64_decode(value):
"""Perform base64 denode."""
2019-07-31 19:25:30 +00:00
return base64.b64decode(value).decode("utf-8")
def ordinal(value):
"""Perform ordinal conversion."""
2019-07-31 19:25:30 +00:00
return str(value) + (
list(["th", "st", "nd", "rd"] + ["th"] * 6)[(int(str(value)[-1])) % 10]
if int(str(value)[-2:]) % 100 not in range(11, 14)
else "th"
)
def from_json(value):
"""Convert a JSON string to an object."""
return json.loads(value)
def to_json(value):
"""Convert an object to a JSON string."""
return json.dumps(value)
@contextfilter
def random_every_time(context, values):
"""Choose a random value.
Unlike Jinja's random filter,
this is context-dependent to avoid caching the chosen value.
"""
return random.choice(values)
2015-12-13 06:19:37 +00:00
class TemplateEnvironment(ImmutableSandboxedEnvironment):
2016-03-07 22:39:52 +00:00
"""The Home Assistant template environment."""
2016-03-09 10:15:04 +00:00
def __init__(self, hass):
"""Initialise template environment."""
super().__init__()
self.hass = hass
2019-07-31 19:25:30 +00:00
self.filters["round"] = forgiving_round
self.filters["multiply"] = multiply
self.filters["log"] = logarithm
self.filters["sin"] = sine
self.filters["cos"] = cosine
self.filters["tan"] = tangent
self.filters["asin"] = arc_sine
self.filters["acos"] = arc_cosine
self.filters["atan"] = arc_tangent
self.filters["atan2"] = arc_tangent2
2019-07-31 19:25:30 +00:00
self.filters["sqrt"] = square_root
self.filters["as_timestamp"] = forgiving_as_timestamp
self.filters["timestamp_custom"] = timestamp_custom
self.filters["timestamp_local"] = timestamp_local
self.filters["timestamp_utc"] = timestamp_utc
self.filters["to_json"] = to_json
self.filters["from_json"] = from_json
2019-07-31 19:25:30 +00:00
self.filters["is_defined"] = fail_when_undefined
self.filters["max"] = max
self.filters["min"] = min
self.filters["random"] = random_every_time
self.filters["base64_encode"] = base64_encode
self.filters["base64_decode"] = base64_decode
self.filters["ordinal"] = ordinal
self.filters["regex_match"] = regex_match
self.filters["regex_replace"] = regex_replace
self.filters["regex_search"] = regex_search
self.filters["regex_findall_index"] = regex_findall_index
self.filters["bitwise_and"] = bitwise_and
self.filters["bitwise_or"] = bitwise_or
self.filters["ord"] = ord
self.globals["log"] = logarithm
self.globals["sin"] = sine
self.globals["cos"] = cosine
self.globals["tan"] = tangent
self.globals["sqrt"] = square_root
self.globals["pi"] = math.pi
self.globals["tau"] = math.pi * 2
self.globals["e"] = math.e
self.globals["asin"] = arc_sine
self.globals["acos"] = arc_cosine
self.globals["atan"] = arc_tangent
self.globals["atan2"] = arc_tangent2
2019-07-31 19:25:30 +00:00
self.globals["float"] = forgiving_float
self.globals["now"] = dt_util.now
self.globals["utcnow"] = dt_util.utcnow
self.globals["as_timestamp"] = forgiving_as_timestamp
self.globals["relative_time"] = dt_util.get_age
self.globals["strptime"] = strptime
if hass is None:
return
# We mark these as a context functions to ensure they get
# evaluated fresh with every execution, rather than executed
# at compile time and the value stored. The context itself
# can be discarded, we only need to get at the hass object.
def hassfunction(func):
"""Wrap function that depend on hass."""
2019-07-31 19:25:30 +00:00
@wraps(func)
def wrapper(*args, **kwargs):
return func(hass, *args[1:], **kwargs)
2019-07-31 19:25:30 +00:00
return contextfunction(wrapper)
2019-07-31 19:25:30 +00:00
self.globals["expand"] = hassfunction(expand)
self.filters["expand"] = contextfilter(self.globals["expand"])
self.globals["closest"] = hassfunction(closest)
self.filters["closest"] = contextfilter(hassfunction(closest_filter))
self.globals["distance"] = hassfunction(distance)
self.globals["is_state"] = hassfunction(is_state)
self.globals["is_state_attr"] = hassfunction(is_state_attr)
self.globals["state_attr"] = hassfunction(state_attr)
self.globals["states"] = AllStates(hass)
2015-12-13 06:19:37 +00:00
def is_safe_callable(self, obj):
2016-02-21 05:59:16 +00:00
"""Test if callback is safe."""
2015-12-13 06:19:37 +00:00
return isinstance(obj, AllStates) or super().is_safe_callable(obj)
def is_safe_attribute(self, obj, attr, value):
"""Test if attribute is safe."""
2019-07-31 19:25:30 +00:00
return isinstance(obj, Namespace) or super().is_safe_attribute(obj, attr, value)
2016-11-19 05:47:59 +00:00
_NO_HASS_ENV = TemplateEnvironment(None)