"""The tests for the Script component.""" # pylint: disable=protected-access from datetime import timedelta import functools as ft from unittest import mock import asynctest import jinja2 import voluptuous as vol import pytest from homeassistant import exceptions from homeassistant.core import Context, callback # Otherwise can't test just this file (import order issue) import homeassistant.util.dt as dt_util from homeassistant.helpers import script, config_validation as cv from tests.common import async_fire_time_changed ENTITY_ID = "script.test" async def test_firing_event(hass): """Test the firing of events.""" event = "test_event" context = Context() calls = [] @callback def record_event(event): """Add recorded event to set.""" calls.append(event) hass.bus.async_listen(event, record_event) script_obj = script.Script( hass, cv.SCRIPT_SCHEMA({"event": event, "event_data": {"hello": "world"}}) ) await script_obj.async_run(context=context) await hass.async_block_till_done() assert len(calls) == 1 assert calls[0].context is context assert calls[0].data.get("hello") == "world" assert not script_obj.can_cancel async def test_firing_event_template(hass): """Test the firing of events.""" event = "test_event" context = Context() calls = [] @callback def record_event(event): """Add recorded event to set.""" calls.append(event) hass.bus.async_listen(event, record_event) script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( { "event": event, "event_data_template": { "dict": { 1: "{{ is_world }}", 2: "{{ is_world }}{{ is_world }}", 3: "{{ is_world }}{{ is_world }}{{ is_world }}", }, "list": ["{{ is_world }}", "{{ is_world }}{{ is_world }}"], }, } ), ) await script_obj.async_run({"is_world": "yes"}, context=context) await hass.async_block_till_done() assert len(calls) == 1 assert calls[0].context is context assert calls[0].data == { "dict": {1: "yes", 2: "yesyes", 3: "yesyesyes"}, "list": ["yes", "yesyes"], } assert not script_obj.can_cancel async def test_calling_service(hass): """Test the calling of a service.""" calls = [] context = Context() @callback def record_call(service): """Add recorded event to set.""" calls.append(service) hass.services.async_register("test", "script", record_call) hass.async_add_job( ft.partial( script.call_from_config, hass, {"service": "test.script", "data": {"hello": "world"}}, context=context, ) ) await hass.async_block_till_done() assert len(calls) == 1 assert calls[0].context is context assert calls[0].data.get("hello") == "world" async def test_calling_service_template(hass): """Test the calling of a service.""" calls = [] context = Context() @callback def record_call(service): """Add recorded event to set.""" calls.append(service) hass.services.async_register("test", "script", record_call) hass.async_add_job( ft.partial( script.call_from_config, hass, { "service_template": """ {% if True %} test.script {% else %} test.not_script {% endif %}""", "data_template": { "hello": """ {% if is_world == 'yes' %} world {% else %} not world {% endif %} """ }, }, {"is_world": "yes"}, context=context, ) ) await hass.async_block_till_done() assert len(calls) == 1 assert calls[0].context is context assert calls[0].data.get("hello") == "world" async def test_delay(hass): """Test the delay.""" event = "test_event" events = [] context = Context() delay_alias = "delay step" @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"event": event}, {"delay": {"seconds": 5}, "alias": delay_alias}, {"event": event}, ] ), ) await script_obj.async_run(context=context) await hass.async_block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == delay_alias assert len(events) == 1 future = dt_util.utcnow() + timedelta(seconds=5) async_fire_time_changed(hass, future) await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 2 assert events[0].context is context assert events[1].context is context async def test_delay_template(hass): """Test the delay as a template.""" event = "test_event" events = [] delay_alias = "delay step" @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"event": event}, {"delay": "00:00:{{ 5 }}", "alias": delay_alias}, {"event": event}, ] ), ) await script_obj.async_run() await hass.async_block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == delay_alias assert len(events) == 1 future = dt_util.utcnow() + timedelta(seconds=5) async_fire_time_changed(hass, future) await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 2 async def test_delay_invalid_template(hass): """Test the delay as a template that fails.""" event = "test_event" events = [] @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"event": event}, {"delay": "{{ invalid_delay }}"}, {"delay": {"seconds": 5}}, {"event": event}, ] ), ) with mock.patch.object(script, "_LOGGER") as mock_logger: await script_obj.async_run() await hass.async_block_till_done() assert mock_logger.error.called assert not script_obj.is_running assert len(events) == 1 async def test_delay_complex_template(hass): """Test the delay with a working complex template.""" event = "test_event" events = [] delay_alias = "delay step" @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"event": event}, {"delay": {"seconds": "{{ 5 }}"}, "alias": delay_alias}, {"event": event}, ] ), ) await script_obj.async_run() await hass.async_block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == delay_alias assert len(events) == 1 future = dt_util.utcnow() + timedelta(seconds=5) async_fire_time_changed(hass, future) await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 2 async def test_delay_complex_invalid_template(hass): """Test the delay with a complex template that fails.""" event = "test_event" events = [] @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"event": event}, {"delay": {"seconds": "{{ invalid_delay }}"}}, {"delay": {"seconds": "{{ 5 }}"}}, {"event": event}, ] ), ) with mock.patch.object(script, "_LOGGER") as mock_logger: await script_obj.async_run() await hass.async_block_till_done() assert mock_logger.error.called assert not script_obj.is_running assert len(events) == 1 async def test_cancel_while_delay(hass): """Test the cancelling while the delay is present.""" event = "test_event" events = [] @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) script_obj = script.Script( hass, cv.SCRIPT_SCHEMA([{"delay": {"seconds": 5}}, {"event": event}]) ) await script_obj.async_run() await hass.async_block_till_done() assert script_obj.is_running assert len(events) == 0 script_obj.async_stop() assert not script_obj.is_running # Make sure the script is really stopped. future = dt_util.utcnow() + timedelta(seconds=5) async_fire_time_changed(hass, future) await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 0 async def test_wait_template(hass): """Test the wait template.""" event = "test_event" events = [] context = Context() wait_alias = "wait step" @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) hass.states.async_set("switch.test", "on") script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"event": event}, { "wait_template": "{{states.switch.test.state == 'off'}}", "alias": wait_alias, }, {"event": event}, ] ), ) await script_obj.async_run(context=context) await hass.async_block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == wait_alias assert len(events) == 1 hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 2 assert events[0].context is context assert events[1].context is context async def test_wait_template_cancel(hass): """Test the wait template cancel action.""" event = "test_event" events = [] wait_alias = "wait step" @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) hass.states.async_set("switch.test", "on") script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"event": event}, { "wait_template": "{{states.switch.test.state == 'off'}}", "alias": wait_alias, }, {"event": event}, ] ), ) await script_obj.async_run() await hass.async_block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == wait_alias assert len(events) == 1 script_obj.async_stop() assert not script_obj.is_running assert len(events) == 1 hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 1 async def test_wait_template_not_schedule(hass): """Test the wait template with correct condition.""" event = "test_event" events = [] @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) hass.states.async_set("switch.test", "on") script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"event": event}, {"wait_template": "{{states.switch.test.state == 'on'}}"}, {"event": event}, ] ), ) await script_obj.async_run() await hass.async_block_till_done() assert not script_obj.is_running assert script_obj.can_cancel assert len(events) == 2 async def test_wait_template_timeout_halt(hass): """Test the wait template, halt on timeout.""" event = "test_event" events = [] wait_alias = "wait step" @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) hass.states.async_set("switch.test", "on") script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"event": event}, { "wait_template": "{{states.switch.test.state == 'off'}}", "continue_on_timeout": False, "timeout": 5, "alias": wait_alias, }, {"event": event}, ] ), ) await script_obj.async_run() await hass.async_block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == wait_alias assert len(events) == 1 future = dt_util.utcnow() + timedelta(seconds=5) async_fire_time_changed(hass, future) await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 1 async def test_wait_template_timeout_continue(hass): """Test the wait template with continuing the script.""" event = "test_event" events = [] wait_alias = "wait step" @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) hass.states.async_set("switch.test", "on") script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"event": event}, { "wait_template": "{{states.switch.test.state == 'off'}}", "timeout": 5, "continue_on_timeout": True, "alias": wait_alias, }, {"event": event}, ] ), ) await script_obj.async_run() await hass.async_block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == wait_alias assert len(events) == 1 future = dt_util.utcnow() + timedelta(seconds=5) async_fire_time_changed(hass, future) await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 2 async def test_wait_template_timeout_default(hass): """Test the wait template with default contiune.""" event = "test_event" events = [] wait_alias = "wait step" @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) hass.states.async_set("switch.test", "on") script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"event": event}, { "wait_template": "{{states.switch.test.state == 'off'}}", "timeout": 5, "alias": wait_alias, }, {"event": event}, ] ), ) await script_obj.async_run() await hass.async_block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == wait_alias assert len(events) == 1 future = dt_util.utcnow() + timedelta(seconds=5) async_fire_time_changed(hass, future) await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 2 async def test_wait_template_variables(hass): """Test the wait template with variables.""" event = "test_event" events = [] wait_alias = "wait step" @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) hass.states.async_set("switch.test", "on") script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"event": event}, {"wait_template": "{{is_state(data, 'off')}}", "alias": wait_alias}, {"event": event}, ] ), ) await script_obj.async_run({"data": "switch.test"}) await hass.async_block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == wait_alias assert len(events) == 1 hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert not script_obj.is_running assert len(events) == 2 async def test_passing_variables_to_script(hass): """Test if we can pass variables to script.""" calls = [] @callback def record_call(service): """Add recorded event to set.""" calls.append(service) hass.services.async_register("test", "script", record_call) script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ { "service": "test.script", "data_template": {"hello": "{{ greeting }}"}, }, {"delay": "{{ delay_period }}"}, { "service": "test.script", "data_template": {"hello": "{{ greeting2 }}"}, }, ] ), ) await script_obj.async_run( {"greeting": "world", "greeting2": "universe", "delay_period": "00:00:05"} ) await hass.async_block_till_done() assert script_obj.is_running assert len(calls) == 1 assert calls[-1].data["hello"] == "world" future = dt_util.utcnow() + timedelta(seconds=5) async_fire_time_changed(hass, future) await hass.async_block_till_done() assert not script_obj.is_running assert len(calls) == 2 assert calls[-1].data["hello"] == "universe" async def test_condition(hass): """Test if we can use conditions in a script.""" event = "test_event" events = [] @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) hass.states.async_set("test.entity", "hello") script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"event": event}, { "condition": "template", "value_template": '{{ states.test.entity.state == "hello" }}', }, {"event": event}, ] ), ) await script_obj.async_run() await hass.async_block_till_done() assert len(events) == 2 hass.states.async_set("test.entity", "goodbye") await script_obj.async_run() await hass.async_block_till_done() assert len(events) == 3 @asynctest.patch("homeassistant.helpers.script.condition.async_from_config") async def test_condition_created_once(async_from_config, hass): """Test that the conditions do not get created multiple times.""" event = "test_event" events = [] @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) hass.states.async_set("test.entity", "hello") script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"event": event}, { "condition": "template", "value_template": '{{ states.test.entity.state == "hello" }}', }, {"event": event}, ] ), ) await script_obj.async_run() await script_obj.async_run() await hass.async_block_till_done() assert async_from_config.call_count == 1 assert len(script_obj._config_cache) == 1 async def test_all_conditions_cached(hass): """Test that multiple conditions get cached.""" event = "test_event" events = [] @callback def record_event(event): """Add recorded event to set.""" events.append(event) hass.bus.async_listen(event, record_event) hass.states.async_set("test.entity", "hello") script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [ {"event": event}, { "condition": "template", "value_template": '{{ states.test.entity.state == "hello" }}', }, { "condition": "template", "value_template": '{{ states.test.entity.state != "hello" }}', }, {"event": event}, ] ), ) await script_obj.async_run() await hass.async_block_till_done() assert len(script_obj._config_cache) == 2 async def test_last_triggered(hass): """Test the last_triggered.""" event = "test_event" script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [{"event": event}, {"delay": {"seconds": 5}}, {"event": event}] ), ) assert script_obj.last_triggered is None time = dt_util.utcnow() with mock.patch("homeassistant.helpers.script.date_util.utcnow", return_value=time): await script_obj.async_run() await hass.async_block_till_done() assert script_obj.last_triggered == time async def test_propagate_error_service_not_found(hass): """Test that a script aborts when a service is not found.""" events = [] @callback def record_event(event): events.append(event) hass.bus.async_listen("test_event", record_event) script_obj = script.Script( hass, cv.SCRIPT_SCHEMA([{"service": "test.script"}, {"event": "test_event"}]) ) with pytest.raises(exceptions.ServiceNotFound): await script_obj.async_run() assert len(events) == 0 assert script_obj._cur == -1 async def test_propagate_error_invalid_service_data(hass): """Test that a script aborts when we send invalid service data.""" events = [] @callback def record_event(event): events.append(event) hass.bus.async_listen("test_event", record_event) calls = [] @callback def record_call(service): """Add recorded event to set.""" calls.append(service) hass.services.async_register( "test", "script", record_call, schema=vol.Schema({"text": str}) ) script_obj = script.Script( hass, cv.SCRIPT_SCHEMA( [{"service": "test.script", "data": {"text": 1}}, {"event": "test_event"}] ), ) with pytest.raises(vol.Invalid): await script_obj.async_run() assert len(events) == 0 assert len(calls) == 0 assert script_obj._cur == -1 async def test_propagate_error_service_exception(hass): """Test that a script aborts when a service throws an exception.""" events = [] @callback def record_event(event): events.append(event) hass.bus.async_listen("test_event", record_event) calls = [] @callback def record_call(service): """Add recorded event to set.""" raise ValueError("BROKEN") hass.services.async_register("test", "script", record_call) script_obj = script.Script( hass, cv.SCRIPT_SCHEMA([{"service": "test.script"}, {"event": "test_event"}]) ) with pytest.raises(ValueError): await script_obj.async_run() assert len(events) == 0 assert len(calls) == 0 assert script_obj._cur == -1 def test_log_exception(): """Test logged output.""" script_obj = script.Script( None, cv.SCRIPT_SCHEMA([{"service": "test.script"}, {"event": "test_event"}]) ) script_obj._exception_step = 1 for exc, msg in ( (vol.Invalid("Invalid number"), "Invalid data"), ( exceptions.TemplateError(jinja2.TemplateError("Unclosed bracket")), "Error rendering template", ), (exceptions.Unauthorized(), "Unauthorized"), (exceptions.ServiceNotFound("light", "turn_on"), "Service not found"), (ValueError("Cannot parse JSON"), "Unknown error"), ): logger = mock.Mock() script_obj.async_log_exception(logger, "Test error", exc) assert len(logger.mock_calls) == 1 _, _, p_error_desc, p_action_type, p_step, p_error = logger.mock_calls[0][1] assert p_error_desc == msg assert p_action_type == script.ACTION_FIRE_EVENT assert p_step == 2 if isinstance(exc, ValueError): assert p_error == "" else: assert p_error == str(exc)