Add limited template to at field for time triggers (#126584)

* Add limited template to at field for time triggers

* fix mypy

* Fix comments

* fix-tests

---------

Co-authored-by: Erik Montnemery <erik@montnemery.com>
pull/128991/head^2
Petro31 2024-10-22 15:06:19 -04:00 committed by GitHub
parent 1254667b2c
commit 810bf06e16
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 108 additions and 5 deletions

View File

@ -3,7 +3,7 @@
from collections.abc import Callable
from datetime import datetime, timedelta
from functools import partial
from typing import NamedTuple
from typing import Any, NamedTuple
import voluptuous as vol
@ -26,7 +26,8 @@ from homeassistant.core import (
State,
callback,
)
from homeassistant.helpers import config_validation as cv
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv, template
from homeassistant.helpers.event import (
async_track_point_in_time,
async_track_state_change_event,
@ -37,6 +38,7 @@ from homeassistant.helpers.typing import ConfigType
import homeassistant.util.dt as dt_util
_TIME_TRIGGER_ENTITY = vol.All(str, cv.entity_domain(["input_datetime", "sensor"]))
_TIME_AT_SCHEMA = vol.Any(cv.time, _TIME_TRIGGER_ENTITY)
_TIME_TRIGGER_ENTITY_WITH_OFFSET = vol.Schema(
{
@ -45,16 +47,29 @@ _TIME_TRIGGER_ENTITY_WITH_OFFSET = vol.Schema(
}
)
def valid_at_template(value: Any) -> template.Template:
"""Validate either a jinja2 template, valid time, or valid trigger entity."""
tpl = cv.template(value)
if tpl.is_static:
_TIME_AT_SCHEMA(value)
return tpl
_TIME_TRIGGER_SCHEMA = vol.Any(
cv.time,
_TIME_TRIGGER_ENTITY,
_TIME_TRIGGER_ENTITY_WITH_OFFSET,
valid_at_template,
msg=(
"Expected HH:MM, HH:MM:SS, an Entity ID with domain 'input_datetime' or "
"'sensor', or a combination of a timestamp sensor entity and an offset."
"'sensor', a combination of a timestamp sensor entity and an offset, or Limited Template"
),
)
TRIGGER_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend(
{
vol.Required(CONF_PLATFORM): "time",
@ -78,6 +93,7 @@ async def async_attach_trigger(
) -> CALLBACK_TYPE:
"""Listen for state changes based on configuration."""
trigger_data = trigger_info["trigger_data"]
variables = trigger_info["variables"] or {}
entities: dict[tuple[str, timedelta], CALLBACK_TYPE] = {}
removes: list[CALLBACK_TYPE] = []
job = HassJob(action, f"time trigger {trigger_info}")
@ -202,6 +218,16 @@ async def async_attach_trigger(
to_track: list[TrackEntity] = []
for at_time in config[CONF_AT]:
if isinstance(at_time, template.Template):
render = template.render_complex(at_time, variables, limited=True)
try:
at_time = _TIME_AT_SCHEMA(render)
except vol.Invalid as exc:
raise HomeAssistantError(
f"Limited Template for 'at' rendered a unexpected value '{render}', expected HH:MM, "
f"HH:MM:SS or Entity ID with domain 'input_datetime' or 'sensor'"
) from exc
if isinstance(at_time, str):
# entity
update_entity_trigger(at_time, new_state=hass.states.get(at_time))

View File

@ -159,7 +159,10 @@ async def test_if_fires_using_at_input_datetime(
@pytest.mark.parametrize(
("conf_at", "trigger_deltas"),
[
(["5:00:00", "6:00:00"], [timedelta(0), timedelta(hours=1)]),
(
["5:00:00", "6:00:00", "{{ '7:00:00' }}"],
[timedelta(0), timedelta(hours=1), timedelta(hours=2)],
),
(
[
"5:00:05",
@ -435,10 +438,14 @@ async def test_untrack_time_change(hass: HomeAssistant) -> None:
assert len(mock_track_time_change.mock_calls) == 3
@pytest.mark.parametrize(
("at_sensor"), ["sensor.next_alarm", "{{ 'sensor.next_alarm' }}"]
)
async def test_if_fires_using_at_sensor(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
service_calls: list[ServiceCall],
at_sensor: str,
) -> None:
"""Test for firing at sensor time."""
now = dt_util.now()
@ -461,7 +468,7 @@ async def test_if_fires_using_at_sensor(
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {"platform": "time", "at": "sensor.next_alarm"},
"trigger": {"platform": "time", "at": at_sensor},
"action": {
"service": "test.automation",
"data_template": {"some": some_data},
@ -626,6 +633,9 @@ async def test_if_fires_using_at_sensor_with_offset(
{"platform": "time", "at": "input_datetime.bla"},
{"platform": "time", "at": "sensor.bla"},
{"platform": "time", "at": "12:34"},
{"platform": "time", "at": "{{ '12:34' }}"},
{"platform": "time", "at": "{{ 'input_datetime.bla' }}"},
{"platform": "time", "at": "{{ 'sensor.bla' }}"},
{"platform": "time", "at": {"entity_id": "sensor.bla", "offset": "-00:01"}},
{
"platform": "time",
@ -724,3 +734,70 @@ async def test_datetime_in_past_on_load(
service_calls[2].data["some"]
== f"time-{future.day}-{future.hour}-input_datetime.my_trigger"
)
@pytest.mark.parametrize(
"trigger",
[
{"platform": "time", "at": "{{ 'hello world' }}"},
{"platform": "time", "at": "{{ 74 }}"},
{"platform": "time", "at": "{{ true }}"},
{"platform": "time", "at": "{{ 7.5465 }}"},
],
)
async def test_if_at_template_renders_bad_value(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
trigger: dict[str, str],
) -> None:
"""Test for invalid templates."""
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": trigger,
"action": {
"service": "test.automation",
},
}
},
)
await hass.async_block_till_done()
assert (
"expected HH:MM, HH:MM:SS or Entity ID with domain 'input_datetime' or 'sensor'"
in caplog.text
)
@pytest.mark.parametrize(
"trigger",
[
{"platform": "time", "at": "{{ now().strftime('%H:%M') }}"},
{"platform": "time", "at": "{{ states('sensor.blah') | int(0) }}"},
],
)
async def test_if_at_template_limited_template(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
trigger: dict[str, str],
) -> None:
"""Test for invalid templates."""
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": trigger,
"action": {
"service": "test.automation",
},
}
},
)
await hass.async_block_till_done()
assert "is not supported in limited templates" in caplog.text