core/tests/helpers/test_script.py

1081 lines
28 KiB
Python

"""The tests for the Script component."""
# pylint: disable=protected-access
from datetime import timedelta
import functools as ft
from unittest import mock
import asynctest
import jinja2
import pytest
import voluptuous as vol
# Otherwise can't test just this file (import order issue)
from homeassistant import exceptions
import homeassistant.components.scene as scene
from homeassistant.const import ATTR_ENTITY_ID, SERVICE_TURN_ON
from homeassistant.core import Context, callback
from homeassistant.helpers import config_validation as cv, script
import homeassistant.util.dt as dt_util
from tests.common import async_fire_time_changed
ENTITY_ID = "script.test"
async def test_firing_event(hass):
"""Test the firing of events."""
event = "test_event"
context = Context()
calls = []
@callback
def record_event(event):
"""Add recorded event to set."""
calls.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass, cv.SCRIPT_SCHEMA({"event": event, "event_data": {"hello": "world"}})
)
await script_obj.async_run(context=context)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].context is context
assert calls[0].data.get("hello") == "world"
assert not script_obj.can_cancel
async def test_firing_event_template(hass):
"""Test the firing of events."""
event = "test_event"
context = Context()
calls = []
@callback
def record_event(event):
"""Add recorded event to set."""
calls.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
{
"event": event,
"event_data_template": {
"dict": {
1: "{{ is_world }}",
2: "{{ is_world }}{{ is_world }}",
3: "{{ is_world }}{{ is_world }}{{ is_world }}",
},
"list": ["{{ is_world }}", "{{ is_world }}{{ is_world }}"],
},
}
),
)
await script_obj.async_run({"is_world": "yes"}, context=context)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].context is context
assert calls[0].data == {
"dict": {1: "yes", 2: "yesyes", 3: "yesyesyes"},
"list": ["yes", "yesyes"],
}
assert not script_obj.can_cancel
async def test_calling_service(hass):
"""Test the calling of a service."""
calls = []
context = Context()
@callback
def record_call(service):
"""Add recorded event to set."""
calls.append(service)
hass.services.async_register("test", "script", record_call)
hass.async_add_job(
ft.partial(
script.call_from_config,
hass,
{"service": "test.script", "data": {"hello": "world"}},
context=context,
)
)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].context is context
assert calls[0].data.get("hello") == "world"
async def test_activating_scene(hass):
"""Test the activation of a scene."""
calls = []
context = Context()
@callback
def record_call(service):
"""Add recorded event to set."""
calls.append(service)
hass.services.async_register(scene.DOMAIN, SERVICE_TURN_ON, record_call)
hass.async_add_job(
ft.partial(
script.call_from_config, hass, {"scene": "scene.hello"}, context=context
)
)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].context is context
assert calls[0].data.get(ATTR_ENTITY_ID) == "scene.hello"
async def test_calling_service_template(hass):
"""Test the calling of a service."""
calls = []
context = Context()
@callback
def record_call(service):
"""Add recorded event to set."""
calls.append(service)
hass.services.async_register("test", "script", record_call)
hass.async_add_job(
ft.partial(
script.call_from_config,
hass,
{
"service_template": """
{% if True %}
test.script
{% else %}
test.not_script
{% endif %}""",
"data_template": {
"hello": """
{% if is_world == 'yes' %}
world
{% else %}
not world
{% endif %}
"""
},
},
{"is_world": "yes"},
context=context,
)
)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].context is context
assert calls[0].data.get("hello") == "world"
async def test_delay(hass):
"""Test the delay."""
event = "test_event"
events = []
context = Context()
delay_alias = "delay step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{"delay": {"seconds": 5}, "alias": delay_alias},
{"event": event},
]
),
)
await script_obj.async_run(context=context)
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == delay_alias
assert len(events) == 1
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 2
assert events[0].context is context
assert events[1].context is context
async def test_delay_template(hass):
"""Test the delay as a template."""
event = "test_event"
events = []
delay_alias = "delay step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{"delay": "00:00:{{ 5 }}", "alias": delay_alias},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == delay_alias
assert len(events) == 1
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 2
async def test_delay_invalid_template(hass):
"""Test the delay as a template that fails."""
event = "test_event"
events = []
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{"delay": "{{ invalid_delay }}"},
{"delay": {"seconds": 5}},
{"event": event},
]
),
)
with mock.patch.object(script, "_LOGGER") as mock_logger:
await script_obj.async_run()
await hass.async_block_till_done()
assert mock_logger.error.called
assert not script_obj.is_running
assert len(events) == 1
async def test_delay_complex_template(hass):
"""Test the delay with a working complex template."""
event = "test_event"
events = []
delay_alias = "delay step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{"delay": {"seconds": "{{ 5 }}"}, "alias": delay_alias},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == delay_alias
assert len(events) == 1
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 2
async def test_delay_complex_invalid_template(hass):
"""Test the delay with a complex template that fails."""
event = "test_event"
events = []
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{"delay": {"seconds": "{{ invalid_delay }}"}},
{"delay": {"seconds": "{{ 5 }}"}},
{"event": event},
]
),
)
with mock.patch.object(script, "_LOGGER") as mock_logger:
await script_obj.async_run()
await hass.async_block_till_done()
assert mock_logger.error.called
assert not script_obj.is_running
assert len(events) == 1
async def test_cancel_while_delay(hass):
"""Test the cancelling while the delay is present."""
event = "test_event"
events = []
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
script_obj = script.Script(
hass, cv.SCRIPT_SCHEMA([{"delay": {"seconds": 5}}, {"event": event}])
)
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.is_running
assert len(events) == 0
script_obj.async_stop()
assert not script_obj.is_running
# Make sure the script is really stopped.
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 0
async def test_wait_template(hass):
"""Test the wait template."""
event = "test_event"
events = []
context = Context()
wait_alias = "wait step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("switch.test", "on")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"wait_template": "{{states.switch.test.state == 'off'}}",
"alias": wait_alias,
},
{"event": event},
]
),
)
await script_obj.async_run(context=context)
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == wait_alias
assert len(events) == 1
hass.states.async_set("switch.test", "off")
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 2
assert events[0].context is context
assert events[1].context is context
async def test_wait_template_cancel(hass):
"""Test the wait template cancel action."""
event = "test_event"
events = []
wait_alias = "wait step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("switch.test", "on")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"wait_template": "{{states.switch.test.state == 'off'}}",
"alias": wait_alias,
},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == wait_alias
assert len(events) == 1
script_obj.async_stop()
assert not script_obj.is_running
assert len(events) == 1
hass.states.async_set("switch.test", "off")
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 1
async def test_wait_template_not_schedule(hass):
"""Test the wait template with correct condition."""
event = "test_event"
events = []
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("switch.test", "on")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{"wait_template": "{{states.switch.test.state == 'on'}}"},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert not script_obj.is_running
assert script_obj.can_cancel
assert len(events) == 2
async def test_wait_template_timeout_halt(hass):
"""Test the wait template, halt on timeout."""
event = "test_event"
events = []
wait_alias = "wait step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("switch.test", "on")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"wait_template": "{{states.switch.test.state == 'off'}}",
"continue_on_timeout": False,
"timeout": 5,
"alias": wait_alias,
},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == wait_alias
assert len(events) == 1
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 1
async def test_wait_template_timeout_continue(hass):
"""Test the wait template with continuing the script."""
event = "test_event"
events = []
wait_alias = "wait step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("switch.test", "on")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"wait_template": "{{states.switch.test.state == 'off'}}",
"timeout": 5,
"continue_on_timeout": True,
"alias": wait_alias,
},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == wait_alias
assert len(events) == 1
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 2
async def test_wait_template_timeout_default(hass):
"""Test the wait template with default contiune."""
event = "test_event"
events = []
wait_alias = "wait step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("switch.test", "on")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"wait_template": "{{states.switch.test.state == 'off'}}",
"timeout": 5,
"alias": wait_alias,
},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == wait_alias
assert len(events) == 1
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 2
async def test_wait_template_variables(hass):
"""Test the wait template with variables."""
event = "test_event"
events = []
wait_alias = "wait step"
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("switch.test", "on")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{"wait_template": "{{is_state(data, 'off')}}", "alias": wait_alias},
{"event": event},
]
),
)
await script_obj.async_run({"data": "switch.test"})
await hass.async_block_till_done()
assert script_obj.is_running
assert script_obj.can_cancel
assert script_obj.last_action == wait_alias
assert len(events) == 1
hass.states.async_set("switch.test", "off")
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(events) == 2
async def test_passing_variables_to_script(hass):
"""Test if we can pass variables to script."""
calls = []
@callback
def record_call(service):
"""Add recorded event to set."""
calls.append(service)
hass.services.async_register("test", "script", record_call)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{
"service": "test.script",
"data_template": {"hello": "{{ greeting }}"},
},
{"delay": "{{ delay_period }}"},
{
"service": "test.script",
"data_template": {"hello": "{{ greeting2 }}"},
},
]
),
)
await script_obj.async_run(
{"greeting": "world", "greeting2": "universe", "delay_period": "00:00:05"}
)
await hass.async_block_till_done()
assert script_obj.is_running
assert len(calls) == 1
assert calls[-1].data["hello"] == "world"
future = dt_util.utcnow() + timedelta(seconds=5)
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
assert not script_obj.is_running
assert len(calls) == 2
assert calls[-1].data["hello"] == "universe"
async def test_condition(hass):
"""Test if we can use conditions in a script."""
event = "test_event"
events = []
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("test.entity", "hello")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"condition": "template",
"value_template": '{{ states.test.entity.state == "hello" }}',
},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert len(events) == 2
hass.states.async_set("test.entity", "goodbye")
await script_obj.async_run()
await hass.async_block_till_done()
assert len(events) == 3
@asynctest.patch("homeassistant.helpers.script.condition.async_from_config")
async def test_condition_created_once(async_from_config, hass):
"""Test that the conditions do not get created multiple times."""
event = "test_event"
events = []
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("test.entity", "hello")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"condition": "template",
"value_template": '{{ states.test.entity.state == "hello" }}',
},
{"event": event},
]
),
)
await script_obj.async_run()
await script_obj.async_run()
await hass.async_block_till_done()
assert async_from_config.call_count == 1
assert len(script_obj._config_cache) == 1
async def test_all_conditions_cached(hass):
"""Test that multiple conditions get cached."""
event = "test_event"
events = []
@callback
def record_event(event):
"""Add recorded event to set."""
events.append(event)
hass.bus.async_listen(event, record_event)
hass.states.async_set("test.entity", "hello")
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[
{"event": event},
{
"condition": "template",
"value_template": '{{ states.test.entity.state == "hello" }}',
},
{
"condition": "template",
"value_template": '{{ states.test.entity.state != "hello" }}',
},
{"event": event},
]
),
)
await script_obj.async_run()
await hass.async_block_till_done()
assert len(script_obj._config_cache) == 2
async def test_last_triggered(hass):
"""Test the last_triggered."""
event = "test_event"
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[{"event": event}, {"delay": {"seconds": 5}}, {"event": event}]
),
)
assert script_obj.last_triggered is None
time = dt_util.utcnow()
with mock.patch("homeassistant.helpers.script.date_util.utcnow", return_value=time):
await script_obj.async_run()
await hass.async_block_till_done()
assert script_obj.last_triggered == time
async def test_propagate_error_service_not_found(hass):
"""Test that a script aborts when a service is not found."""
events = []
@callback
def record_event(event):
events.append(event)
hass.bus.async_listen("test_event", record_event)
script_obj = script.Script(
hass, cv.SCRIPT_SCHEMA([{"service": "test.script"}, {"event": "test_event"}])
)
with pytest.raises(exceptions.ServiceNotFound):
await script_obj.async_run()
assert len(events) == 0
assert script_obj._cur == -1
async def test_propagate_error_invalid_service_data(hass):
"""Test that a script aborts when we send invalid service data."""
events = []
@callback
def record_event(event):
events.append(event)
hass.bus.async_listen("test_event", record_event)
calls = []
@callback
def record_call(service):
"""Add recorded event to set."""
calls.append(service)
hass.services.async_register(
"test", "script", record_call, schema=vol.Schema({"text": str})
)
script_obj = script.Script(
hass,
cv.SCRIPT_SCHEMA(
[{"service": "test.script", "data": {"text": 1}}, {"event": "test_event"}]
),
)
with pytest.raises(vol.Invalid):
await script_obj.async_run()
assert len(events) == 0
assert len(calls) == 0
assert script_obj._cur == -1
async def test_propagate_error_service_exception(hass):
"""Test that a script aborts when a service throws an exception."""
events = []
@callback
def record_event(event):
events.append(event)
hass.bus.async_listen("test_event", record_event)
calls = []
@callback
def record_call(service):
"""Add recorded event to set."""
raise ValueError("BROKEN")
hass.services.async_register("test", "script", record_call)
script_obj = script.Script(
hass, cv.SCRIPT_SCHEMA([{"service": "test.script"}, {"event": "test_event"}])
)
with pytest.raises(ValueError):
await script_obj.async_run()
assert len(events) == 0
assert len(calls) == 0
assert script_obj._cur == -1
def test_log_exception():
"""Test logged output."""
script_obj = script.Script(
None, cv.SCRIPT_SCHEMA([{"service": "test.script"}, {"event": "test_event"}])
)
script_obj._exception_step = 1
for exc, msg in (
(vol.Invalid("Invalid number"), "Invalid data"),
(
exceptions.TemplateError(jinja2.TemplateError("Unclosed bracket")),
"Error rendering template",
),
(exceptions.Unauthorized(), "Unauthorized"),
(exceptions.ServiceNotFound("light", "turn_on"), "Service not found"),
(ValueError("Cannot parse JSON"), "Unknown error"),
):
logger = mock.Mock()
script_obj.async_log_exception(logger, "Test error", exc)
assert len(logger.mock_calls) == 1
_, _, p_error_desc, p_action_type, p_step, p_error = logger.mock_calls[0][1]
assert p_error_desc == msg
assert p_action_type == script.ACTION_FIRE_EVENT
assert p_step == 2
if isinstance(exc, ValueError):
assert p_error == ""
else:
assert p_error == str(exc)
async def test_referenced_entities():
"""Test referenced entities."""
script_obj = script.Script(
None,
cv.SCRIPT_SCHEMA(
[
{
"service": "test.script",
"data": {"entity_id": "light.service_not_list"},
},
{
"service": "test.script",
"data": {"entity_id": ["light.service_list"]},
},
{
"condition": "state",
"entity_id": "sensor.condition",
"state": "100",
},
{"service": "test.script", "data": {"without": "entity_id"}},
{"scene": "scene.hello"},
{"event": "test_event"},
{"delay": "{{ delay_period }}"},
]
),
)
assert script_obj.referenced_entities == {
"light.service_not_list",
"light.service_list",
"sensor.condition",
"scene.hello",
}
# Test we cache results.
assert script_obj.referenced_entities is script_obj.referenced_entities
async def test_referenced_devices():
"""Test referenced entities."""
script_obj = script.Script(
None,
cv.SCRIPT_SCHEMA(
[
{"domain": "light", "device_id": "script-dev-id"},
{
"condition": "device",
"device_id": "condition-dev-id",
"domain": "switch",
},
]
),
)
assert script_obj.referenced_devices == {"script-dev-id", "condition-dev-id"}
# Test we cache results.
assert script_obj.referenced_devices is script_obj.referenced_devices