core/tests/helpers/test_script.py

1000 lines
25 KiB
Python

"""The tests for the Script component."""
# pylint: disable=protected-access
from datetime import timedelta
import functools as ft
from unittest import mock
import asynctest
import jinja2
import voluptuous as vol
import pytest
from homeassistant import exceptions
from homeassistant.core import Context, callback
# Otherwise can't test just this file (import order issue)
import homeassistant.util.dt as dt_util
from homeassistant.helpers import script, config_validation as cv
from tests.common import async_fire_time_changed
ENTITY_ID = "script.test"
async def test_firing_event(hass):
"""Test the firing of events."""
event = "test_event"
context = Context()
calls = []
@callback
def record_event(event):
"""Add recorded event to set."""
calls.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass, cv.SCRIPT_SCHEMA({"event": event, "event_data": {"hello": "world"}})
)
await script_obj.async_run(context=context)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].context is context
assert calls[0].data.get("hello") == "world"
assert not script_obj.can_cancel
async def test_firing_event_template(hass):
"""Test the firing of events."""
event = "test_event"
context = Context()
calls = []
@callback
def record_event(event):
"""Add recorded event to set."""
calls.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
{
"event": event,
"event_data_template": {
"dict": {
1: "{{ is_world }}",
2: "{{ is_world }}{{ is_world }}",
3: "{{ is_world }}{{ is_world }}{{ is_world }}",
},
"list": ["{{ is_world }}", "{{ is_world }}{{ is_world }}"],
},
}
),
)
await script_obj.async_run({"is_world": "yes"}, context=context)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].context is context
assert calls[0].data == {
"dict": {1: "yes", 2: "yesyes", 3: "yesyesyes"},
"list": ["yes", "yesyes"],
}
assert not script_obj.can_cancel
async def test_calling_service(hass):
"""Test the calling of a service."""
calls = []
context = Context()
@callback
def record_call(service):
"""Add recorded event to set."""
calls.append(service)
hass.services.async_register("test", "script", record_call)
hass.async_add_job(
ft.partial(
script.call_from_config,
hass,
{"service": "test.script", "data": {"hello": "world"}},
context=context,
)
)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].context is context
assert calls[0].data.get("hello") == "world"
async def test_calling_service_template(hass):
"""Test the calling of a service."""
calls = []
context = Context()
@callback
def record_call(service):
"""Add recorded event to set."""
calls.append(service)
hass.services.async_register("test", "script", record_call)
hass.async_add_job(
ft.partial(
script.call_from_config,
hass,
{
"service_template": """
{% if True %}
test.script
{% else %}
test.not_script
{% endif %}""",
"data_template": {
"hello": """
{% if is_world == 'yes' %}
world
{% else %}
not world
{% endif %}
"""
},
},
{"is_world": "yes"},
context=context,
)
)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].context is context
assert calls[0].data.get("hello") == "world"
async def test_delay(hass):
"""Test the delay."""
event = "test_event"
events = []
context = Context()
delay_alias = "delay step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{"delay": {"seconds": 5}, "alias": delay_alias},
{"event": event},
]
),
)
await script_obj.async_run(context=context)
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == delay_alias
assert len(events) == 1
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 2
assert events[0].context is context
assert events[1].context is context
async def test_delay_template(hass):
"""Test the delay as a template."""
event = "test_event"
events = []
delay_alias = "delay step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{"delay": "00:00:{{ 5 }}", "alias": delay_alias},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == delay_alias
assert len(events) == 1
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 2
async def test_delay_invalid_template(hass):
"""Test the delay as a template that fails."""
event = "test_event"
events = []
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{"delay": "{{ invalid_delay }}"},
{"delay": {"seconds": 5}},
{"event": event},
]
),
)
with mock.patch.object(script, "_LOGGER") as mock_logger:
await script_obj.async_run()
await hass.async_block_till_done()
assert mock_logger.error.called
assert not script_obj.is_running
assert len(events) == 1
async def test_delay_complex_template(hass):
"""Test the delay with a working complex template."""
event = "test_event"
events = []
delay_alias = "delay step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{"delay": {"seconds": "{{ 5 }}"}, "alias": delay_alias},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == delay_alias
assert len(events) == 1
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 2
async def test_delay_complex_invalid_template(hass):
"""Test the delay with a complex template that fails."""
event = "test_event"
events = []
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{"delay": {"seconds": "{{ invalid_delay }}"}},
{"delay": {"seconds": "{{ 5 }}"}},
{"event": event},
]
),
)
with mock.patch.object(script, "_LOGGER") as mock_logger:
await script_obj.async_run()
await hass.async_block_till_done()
assert mock_logger.error.called
assert not script_obj.is_running
assert len(events) == 1
async def test_cancel_while_delay(hass):
"""Test the cancelling while the delay is present."""
event = "test_event"
events = []
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass, cv.SCRIPT_SCHEMA([{"delay": {"seconds": 5}}, {"event": event}])
)
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.is_running
assert len(events) == 0
script_obj.async_stop()
assert not script_obj.is_running
# Make sure the script is really stopped.
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 0
async def test_wait_template(hass):
"""Test the wait template."""
event = "test_event"
events = []
context = Context()
wait_alias = "wait step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("switch.test", "on")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"wait_template": "{{states.switch.test.state == 'off'}}",
"alias": wait_alias,
},
{"event": event},
]
),
)
await script_obj.async_run(context=context)
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == wait_alias
assert len(events) == 1
hass.states.async_set("switch.test", "off")
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 2
assert events[0].context is context
assert events[1].context is context
async def test_wait_template_cancel(hass):
"""Test the wait template cancel action."""
event = "test_event"
events = []
wait_alias = "wait step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("switch.test", "on")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"wait_template": "{{states.switch.test.state == 'off'}}",
"alias": wait_alias,
},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == wait_alias
assert len(events) == 1
script_obj.async_stop()
assert not script_obj.is_running
assert len(events) == 1
hass.states.async_set("switch.test", "off")
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 1
async def test_wait_template_not_schedule(hass):
"""Test the wait template with correct condition."""
event = "test_event"
events = []
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("switch.test", "on")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{"wait_template": "{{states.switch.test.state == 'on'}}"},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert not script_obj.is_running
assert script_obj.can_cancel
assert len(events) == 2
async def test_wait_template_timeout_halt(hass):
"""Test the wait template, halt on timeout."""
event = "test_event"
events = []
wait_alias = "wait step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("switch.test", "on")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"wait_template": "{{states.switch.test.state == 'off'}}",
"continue_on_timeout": False,
"timeout": 5,
"alias": wait_alias,
},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == wait_alias
assert len(events) == 1
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 1
async def test_wait_template_timeout_continue(hass):
"""Test the wait template with continuing the script."""
event = "test_event"
events = []
wait_alias = "wait step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("switch.test", "on")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"wait_template": "{{states.switch.test.state == 'off'}}",
"timeout": 5,
"continue_on_timeout": True,
"alias": wait_alias,
},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == wait_alias
assert len(events) == 1
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 2
async def test_wait_template_timeout_default(hass):
"""Test the wait template with default contiune."""
event = "test_event"
events = []
wait_alias = "wait step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("switch.test", "on")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"wait_template": "{{states.switch.test.state == 'off'}}",
"timeout": 5,
"alias": wait_alias,
},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == wait_alias
assert len(events) == 1
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 2
async def test_wait_template_variables(hass):
"""Test the wait template with variables."""
event = "test_event"
events = []
wait_alias = "wait step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("switch.test", "on")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{"wait_template": "{{is_state(data, 'off')}}", "alias": wait_alias},
{"event": event},
]
),
)
await script_obj.async_run({"data": "switch.test"})
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == wait_alias
assert len(events) == 1
hass.states.async_set("switch.test", "off")
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 2
async def test_passing_variables_to_script(hass):
"""Test if we can pass variables to script."""
calls = []
@callback
def record_call(service):
"""Add recorded event to set."""
calls.append(service)
hass.services.async_register("test", "script", record_call)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{
"service": "test.script",
"data_template": {"hello": "{{ greeting }}"},
},
{"delay": "{{ delay_period }}"},
{
"service": "test.script",
"data_template": {"hello": "{{ greeting2 }}"},
},
]
),
)
await script_obj.async_run(
{"greeting": "world", "greeting2": "universe", "delay_period": "00:00:05"}
)
await hass.async_block_till_done()
assert script_obj.is_running
assert len(calls) == 1
assert calls[-1].data["hello"] == "world"
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(calls) == 2
assert calls[-1].data["hello"] == "universe"
async def test_condition(hass):
"""Test if we can use conditions in a script."""
event = "test_event"
events = []
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("test.entity", "hello")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"condition": "template",
"value_template": '{{ states.test.entity.state == "hello" }}',
},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert len(events) == 2
hass.states.async_set("test.entity", "goodbye")
await script_obj.async_run()
await hass.async_block_till_done()
assert len(events) == 3
@asynctest.patch("homeassistant.helpers.script.condition.async_from_config")
async def test_condition_created_once(async_from_config, hass):
"""Test that the conditions do not get created multiple times."""
event = "test_event"
events = []
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("test.entity", "hello")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"condition": "template",
"value_template": '{{ states.test.entity.state == "hello" }}',
},
{"event": event},
]
),
)
await script_obj.async_run()
await script_obj.async_run()
await hass.async_block_till_done()
assert async_from_config.call_count == 1
assert len(script_obj._config_cache) == 1
async def test_all_conditions_cached(hass):
"""Test that multiple conditions get cached."""
event = "test_event"
events = []
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("test.entity", "hello")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"condition": "template",
"value_template": '{{ states.test.entity.state == "hello" }}',
},
{
"condition": "template",
"value_template": '{{ states.test.entity.state != "hello" }}',
},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert len(script_obj._config_cache) == 2
async def test_last_triggered(hass):
"""Test the last_triggered."""
event = "test_event"
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[{"event": event}, {"delay": {"seconds": 5}}, {"event": event}]
),
)
assert script_obj.last_triggered is None
time = dt_util.utcnow()
with mock.patch("homeassistant.helpers.script.date_util.utcnow", return_value=time):
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.last_triggered == time
async def test_propagate_error_service_not_found(hass):
"""Test that a script aborts when a service is not found."""
events = []
@callback
def record_event(event):
events.append(event)
hass.bus.async_listen("test_event", record_event)
script_obj = script.Script(
hass, cv.SCRIPT_SCHEMA([{"service": "test.script"}, {"event": "test_event"}])
)
with pytest.raises(exceptions.ServiceNotFound):
await script_obj.async_run()
assert len(events) == 0
assert script_obj._cur == -1
async def test_propagate_error_invalid_service_data(hass):
"""Test that a script aborts when we send invalid service data."""
events = []
@callback
def record_event(event):
events.append(event)
hass.bus.async_listen("test_event", record_event)
calls = []
@callback
def record_call(service):
"""Add recorded event to set."""
calls.append(service)
hass.services.async_register(
"test", "script", record_call, schema=vol.Schema({"text": str})
)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[{"service": "test.script", "data": {"text": 1}}, {"event": "test_event"}]
),
)
with pytest.raises(vol.Invalid):
await script_obj.async_run()
assert len(events) == 0
assert len(calls) == 0
assert script_obj._cur == -1
async def test_propagate_error_service_exception(hass):
"""Test that a script aborts when a service throws an exception."""
events = []
@callback
def record_event(event):
events.append(event)
hass.bus.async_listen("test_event", record_event)
calls = []
@callback
def record_call(service):
"""Add recorded event to set."""
raise ValueError("BROKEN")
hass.services.async_register("test", "script", record_call)
script_obj = script.Script(
hass, cv.SCRIPT_SCHEMA([{"service": "test.script"}, {"event": "test_event"}])
)
with pytest.raises(ValueError):
await script_obj.async_run()
assert len(events) == 0
assert len(calls) == 0
assert script_obj._cur == -1
def test_log_exception():
"""Test logged output."""
script_obj = script.Script(
None, cv.SCRIPT_SCHEMA([{"service": "test.script"}, {"event": "test_event"}])
)
script_obj._exception_step = 1
for exc, msg in (
(vol.Invalid("Invalid number"), "Invalid data"),
(
exceptions.TemplateError(jinja2.TemplateError("Unclosed bracket")),
"Error rendering template",
),
(exceptions.Unauthorized(), "Unauthorized"),
(exceptions.ServiceNotFound("light", "turn_on"), "Service not found"),
(ValueError("Cannot parse JSON"), "Unknown error"),
):
logger = mock.Mock()
script_obj.async_log_exception(logger, "Test error", exc)
assert len(logger.mock_calls) == 1
_, _, p_error_desc, p_action_type, p_step, p_error = logger.mock_calls[0][1]
assert p_error_desc == msg
assert p_action_type == script.ACTION_FIRE_EVENT
assert p_step == 2
if isinstance(exc, ValueError):
assert p_error == ""
else:
assert p_error == str(exc)