"""The tests for the Script component.""" import asyncio from datetime import timedelta from unittest.mock import Mock, patch import pytest from homeassistant.components import script from homeassistant.components.script import DOMAIN, EVENT_SCRIPT_STARTED, ScriptEntity from homeassistant.const import ( ATTR_ENTITY_ID, ATTR_NAME, SERVICE_RELOAD, SERVICE_TOGGLE, SERVICE_TURN_OFF, SERVICE_TURN_ON, STATE_OFF, ) from homeassistant.core import ( Context, CoreState, HomeAssistant, State, callback, split_entity_id, ) from homeassistant.exceptions import ServiceNotFound from homeassistant.helpers import entity_registry as er, template from homeassistant.helpers.event import async_track_state_change from homeassistant.helpers.script import ( SCRIPT_MODE_CHOICES, SCRIPT_MODE_PARALLEL, SCRIPT_MODE_QUEUED, SCRIPT_MODE_RESTART, SCRIPT_MODE_SINGLE, _async_stop_scripts_at_shutdown, ) from homeassistant.helpers.service import async_get_all_descriptions from homeassistant.setup import async_setup_component from homeassistant.util import yaml import homeassistant.util.dt as dt_util from tests.common import async_fire_time_changed, async_mock_service, mock_restore_cache from tests.components.logbook.common import MockRow, mock_humanify from tests.typing import WebSocketGenerator ENTITY_ID = "script.test" @pytest.fixture def calls(hass): """Track calls to a mock service.""" return async_mock_service(hass, "test", "script") async def test_passing_variables(hass: HomeAssistant) -> None: """Test different ways of passing in variables.""" mock_restore_cache(hass, ()) calls = [] context = Context() @callback def record_call(service): """Add recorded event to set.""" calls.append(service) hass.services.async_register("test", "script", record_call) assert await async_setup_component( hass, "script", { "script": { "test": { "sequence": { "service": "test.script", "data_template": {"hello": "{{ greeting }}"}, } } } }, ) await hass.services.async_call( DOMAIN, "test", {"greeting": "world"}, context=context ) await hass.async_block_till_done() assert len(calls) == 1 assert calls[0].context is context assert calls[0].data["hello"] == "world" await hass.services.async_call( "script", "test", {"greeting": "universe"}, context=context ) await hass.async_block_till_done() assert len(calls) == 2 assert calls[1].context is context assert calls[1].data["hello"] == "universe" @pytest.mark.parametrize("toggle", [False, True]) async def test_turn_on_off_toggle(hass: HomeAssistant, toggle) -> None: """Verify turn_on, turn_off & toggle services.""" event = "test_event" event_mock = Mock() hass.bus.async_listen(event, event_mock) was_on = False @callback def state_listener(entity_id, old_state, new_state): nonlocal was_on was_on = True async_track_state_change(hass, ENTITY_ID, state_listener, to_state="on") if toggle: turn_off_step = {"service": "script.toggle", "entity_id": ENTITY_ID} else: turn_off_step = {"service": "script.turn_off", "entity_id": ENTITY_ID} assert await async_setup_component( hass, "script", { "script": { "test": { "sequence": [{"event": event}, turn_off_step, {"event": event}] } } }, ) assert not script.is_on(hass, ENTITY_ID) if toggle: await hass.services.async_call( DOMAIN, SERVICE_TOGGLE, {ATTR_ENTITY_ID: ENTITY_ID} ) else: await hass.services.async_call(DOMAIN, split_entity_id(ENTITY_ID)[1]) await hass.async_block_till_done() assert not script.is_on(hass, ENTITY_ID) assert was_on assert event_mock.call_count == 1 invalid_configs = [ {"test": {}}, {"test hello world": {"sequence": [{"event": "bla"}]}}, {"test": {"sequence": {"event": "test_event", "service": "homeassistant.turn_on"}}}, ] @pytest.mark.parametrize("value", invalid_configs) async def test_setup_with_invalid_configs(hass: HomeAssistant, value) -> None: """Test setup with invalid configs.""" assert await async_setup_component( hass, "script", {"script": value} ), f"Script loaded with wrong config {value}" assert len(hass.states.async_entity_ids("script")) == 0 @pytest.mark.parametrize( ("object_id", "broken_config", "problem", "details"), ( ( "Bad Script", {}, "has invalid object id", "invalid slug Bad Script", ), ( "bad_script", {}, "could not be validated", "required key not provided @ data['sequence']", ), ( "bad_script", { "sequence": { "condition": "state", # The UUID will fail being resolved to en entity_id "entity_id": "abcdabcdabcdabcdabcdabcdabcdabcd", "state": "blah", }, }, "failed to setup actions", "Unknown entity registry entry abcdabcdabcdabcdabcdabcdabcdabcd.", ), ), ) async def test_bad_config_validation( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, object_id, broken_config, problem, details, ) -> None: """Test bad script configuration which can be detected during validation.""" assert await async_setup_component( hass, script.DOMAIN, { script.DOMAIN: { object_id: {"alias": "bad_script", **broken_config}, "good_script": { "alias": "good_script", "sequence": { "service": "test.automation", "entity_id": "hello.world", }, }, } }, ) # Check we get the expected error message assert ( f"Script with alias 'bad_script' {problem} and has been disabled: {details}" in caplog.text ) # Make sure one bad automation does not prevent other automations from setting up assert hass.states.async_entity_ids("script") == ["script.good_script"] @pytest.mark.parametrize("running", ["no", "same", "different"]) async def test_reload_service(hass: HomeAssistant, running) -> None: """Verify the reload service.""" event = "test_event" event_flag = asyncio.Event() @callback def event_handler(event): event_flag.set() hass.bus.async_listen_once(event, event_handler) hass.states.async_set("test.script", "off") assert await async_setup_component( hass, "script", { "script": { "test": { "sequence": [ {"event": event}, {"wait_template": "{{ is_state('test.script', 'on') }}"}, ] } } }, ) assert hass.states.get(ENTITY_ID) is not None assert hass.services.has_service(script.DOMAIN, "test") if running != "no": _, object_id = split_entity_id(ENTITY_ID) await hass.services.async_call(DOMAIN, object_id) await asyncio.wait_for(event_flag.wait(), 1) assert script.is_on(hass, ENTITY_ID) object_id = "test" if running == "same" else "test2" with patch( "homeassistant.config.load_yaml_config_file", return_value={"script": {object_id: {"sequence": [{"delay": {"seconds": 5}}]}}}, ): await hass.services.async_call(DOMAIN, SERVICE_RELOAD, blocking=True) await hass.async_block_till_done() if running != "same": state = hass.states.get(ENTITY_ID) assert state.attributes["restored"] is True assert not hass.services.has_service(script.DOMAIN, "test") assert hass.states.get("script.test2") is not None assert hass.services.has_service(script.DOMAIN, "test2") else: assert hass.states.get(ENTITY_ID) is not None assert hass.services.has_service(script.DOMAIN, "test") async def test_reload_unchanged_does_not_stop(hass: HomeAssistant, calls) -> None: """Test that reloading stops any running actions as appropriate.""" test_entity = "test.entity" config = { script.DOMAIN: { "test": { "sequence": [ {"event": "running"}, {"wait_template": "{{ is_state('test.entity', 'goodbye') }}"}, {"service": "test.script"}, ], } } } assert await async_setup_component(hass, script.DOMAIN, config) assert hass.states.get(ENTITY_ID) is not None assert hass.services.has_service(script.DOMAIN, "test") running = asyncio.Event() @callback def running_cb(event): running.set() hass.bus.async_listen_once("running", running_cb) hass.states.async_set(test_entity, "hello") # Start the script and wait for it to start _, object_id = split_entity_id(ENTITY_ID) await hass.services.async_call(DOMAIN, object_id) await running.wait() assert len(calls) == 0 with patch( "homeassistant.config.load_yaml_config_file", autospec=True, return_value=config, ): await hass.services.async_call(script.DOMAIN, SERVICE_RELOAD, blocking=True) hass.states.async_set(test_entity, "goodbye") await hass.async_block_till_done() assert len(calls) == 1 @pytest.mark.parametrize( "script_config", ( { "test": { "sequence": [{"service": "test.script"}], } }, # A script using templates { "test": { "sequence": [{"service": "{{ 'test.script' }}"}], } }, # A script using blueprint { "test": { "use_blueprint": { "path": "test_service.yaml", "input": { "service_to_call": "test.script", }, } } }, # A script using blueprint with templated input { "test": { "use_blueprint": { "path": "test_service.yaml", "input": { "service_to_call": "{{ 'test.script' }}", }, } } }, ), ) async def test_reload_unchanged_script( hass: HomeAssistant, calls, script_config ) -> None: """Test an unmodified script is not reloaded.""" with patch( "homeassistant.components.script.ScriptEntity", wraps=ScriptEntity ) as script_entity_init: config = {script.DOMAIN: [script_config]} assert await async_setup_component(hass, script.DOMAIN, config) assert hass.states.get(ENTITY_ID) is not None assert hass.services.has_service(script.DOMAIN, "test") assert script_entity_init.call_count == 1 script_entity_init.reset_mock() # Start the script and wait for it to finish _, object_id = split_entity_id(ENTITY_ID) await hass.services.async_call(DOMAIN, object_id) await hass.async_block_till_done() assert len(calls) == 1 # Reload the scripts without any change with patch( "homeassistant.config.load_yaml_config_file", autospec=True, return_value=config, ): await hass.services.async_call(script.DOMAIN, SERVICE_RELOAD, blocking=True) assert script_entity_init.call_count == 0 script_entity_init.reset_mock() # Start the script and wait for it to start _, object_id = split_entity_id(ENTITY_ID) await hass.services.async_call(DOMAIN, object_id) await hass.async_block_till_done() assert len(calls) == 2 async def test_service_descriptions(hass: HomeAssistant) -> None: """Test that service descriptions are loaded and reloaded correctly.""" # Test 1: has "description" but no "fields" assert await async_setup_component( hass, "script", { "script": { "test": { "description": "test description", "sequence": [{"delay": {"seconds": 5}}], } } }, ) descriptions = await async_get_all_descriptions(hass) assert descriptions[DOMAIN]["test"]["name"] == "test" assert descriptions[DOMAIN]["test"]["description"] == "test description" assert not descriptions[DOMAIN]["test"]["fields"] # Test 2: has "fields" but no "description" with patch( "homeassistant.config.load_yaml_config_file", return_value={ "script": { "test": { "fields": { "test_param": { "description": "test_param description", "example": "test_param example", } }, "sequence": [{"delay": {"seconds": 5}}], } } }, ): await hass.services.async_call(DOMAIN, SERVICE_RELOAD, blocking=True) descriptions = await async_get_all_descriptions(hass) assert descriptions[script.DOMAIN]["test"]["description"] == "" assert ( descriptions[script.DOMAIN]["test"]["fields"]["test_param"]["description"] == "test_param description" ) assert ( descriptions[script.DOMAIN]["test"]["fields"]["test_param"]["example"] == "test_param example" ) # Test 3: has "alias" that will be used as "name" with patch( "homeassistant.config.load_yaml_config_file", return_value={ "script": { "test_name": { "alias": "ABC", "sequence": [{"delay": {"seconds": 5}}], } } }, ): await hass.services.async_call(DOMAIN, SERVICE_RELOAD, blocking=True) descriptions = await async_get_all_descriptions(hass) assert descriptions[DOMAIN]["test_name"]["name"] == "ABC" # Test 4: verify that names from YAML are taken into account as well assert descriptions[DOMAIN]["turn_on"]["name"] == "Turn on" async def test_shared_context(hass: HomeAssistant) -> None: """Test that the shared context is passed down the chain.""" event = "test_event" context = Context() event_mock = Mock() run_mock = Mock() hass.bus.async_listen(event, event_mock) hass.bus.async_listen(EVENT_SCRIPT_STARTED, run_mock) assert await async_setup_component( hass, "script", {"script": {"test": {"sequence": [{"event": event}]}}} ) await hass.services.async_call( DOMAIN, SERVICE_TURN_ON, {ATTR_ENTITY_ID: ENTITY_ID}, context=context ) await hass.async_block_till_done() assert event_mock.call_count == 1 assert run_mock.call_count == 1 args, kwargs = run_mock.call_args assert args[0].context == context # Ensure event data has all attributes set assert args[0].data.get(ATTR_NAME) == "test" assert args[0].data.get(ATTR_ENTITY_ID) == "script.test" # Ensure context carries through the event args, kwargs = event_mock.call_args assert args[0].context == context # Ensure the script state shares the same context state = hass.states.get("script.test") assert state is not None assert state.context == context async def test_logging_script_error( hass: HomeAssistant, caplog: pytest.LogCaptureFixture ) -> None: """Test logging script error.""" assert await async_setup_component( hass, "script", {"script": {"hello": {"sequence": [{"service": "non.existing"}]}}}, ) with pytest.raises(ServiceNotFound) as err: await hass.services.async_call("script", "hello", blocking=True) assert err.value.domain == "non" assert err.value.service == "existing" assert "Error executing script" in caplog.text async def test_turning_no_scripts_off(hass: HomeAssistant) -> None: """Test it is possible to turn two scripts off.""" assert await async_setup_component(hass, "script", {}) # Testing it doesn't raise await hass.services.async_call( DOMAIN, SERVICE_TURN_OFF, {"entity_id": []}, blocking=True ) async def test_async_get_descriptions_script(hass: HomeAssistant) -> None: """Test async_set_service_schema for the script integration.""" script_config = { DOMAIN: { "test1": {"sequence": [{"service": "homeassistant.restart"}]}, "test2": { "description": "test2", "fields": { "param": { "description": "param_description", "example": "param_example", } }, "sequence": [{"service": "homeassistant.restart"}], }, } } await async_setup_component(hass, DOMAIN, script_config) descriptions = await async_get_all_descriptions(hass) assert descriptions[DOMAIN]["test1"]["description"] == "" assert not descriptions[DOMAIN]["test1"]["fields"] assert descriptions[DOMAIN]["test2"]["description"] == "test2" assert ( descriptions[DOMAIN]["test2"]["fields"]["param"]["description"] == "param_description" ) assert ( descriptions[DOMAIN]["test2"]["fields"]["param"]["example"] == "param_example" ) async def test_extraction_functions(hass: HomeAssistant) -> None: """Test extraction functions.""" assert await async_setup_component( hass, DOMAIN, { DOMAIN: { "test1": { "sequence": [ { "service": "test.script", "data": {"entity_id": "light.in_both"}, }, { "service": "test.script", "data": {"entity_id": "light.in_first"}, }, { "entity_id": "light.device_in_both", "domain": "light", "type": "turn_on", "device_id": "device-in-both", }, ] }, "test2": { "sequence": [ { "service": "test.script", "data": {"entity_id": "light.in_both"}, }, { "condition": "state", "entity_id": "sensor.condition", "state": "100", }, {"scene": "scene.hello"}, { "entity_id": "light.device_in_both", "domain": "light", "type": "turn_on", "device_id": "device-in-both", }, { "entity_id": "light.device_in_last", "domain": "light", "type": "turn_on", "device_id": "device-in-last", }, ], }, } }, ) assert set(script.scripts_with_entity(hass, "light.in_both")) == { "script.test1", "script.test2", } assert set(script.entities_in_script(hass, "script.test1")) == { "light.in_both", "light.in_first", } assert set(script.scripts_with_device(hass, "device-in-both")) == { "script.test1", "script.test2", } assert set(script.devices_in_script(hass, "script.test2")) == { "device-in-both", "device-in-last", } async def test_config_basic(hass: HomeAssistant) -> None: """Test passing info in config.""" assert await async_setup_component( hass, "script", { "script": { "test_script": { "alias": "Script Name", "icon": "mdi:party", "sequence": [], } } }, ) test_script = hass.states.get("script.test_script") assert test_script.name == "Script Name" assert test_script.attributes["icon"] == "mdi:party" registry = er.async_get(hass) entry = registry.async_get("script.test_script") assert entry assert entry.unique_id == "test_script" async def test_config_multiple_domains(hass: HomeAssistant) -> None: """Test splitting configuration over multiple domains.""" assert await async_setup_component( hass, "script", { "script": { "first_script": { "alias": "Main domain", "sequence": [], } }, "script second": { "second_script": { "alias": "Secondary domain", "sequence": [], } }, }, ) test_script = hass.states.get("script.first_script") assert test_script assert test_script.name == "Main domain" test_script = hass.states.get("script.second_script") assert test_script assert test_script.name == "Secondary domain" async def test_logbook_humanify_script_started_event(hass: HomeAssistant) -> None: """Test humanifying script started event.""" hass.config.components.add("recorder") await async_setup_component(hass, DOMAIN, {}) await async_setup_component(hass, "logbook", {}) event1, event2 = mock_humanify( hass, [ MockRow( EVENT_SCRIPT_STARTED, {ATTR_ENTITY_ID: "script.hello", ATTR_NAME: "Hello Script"}, ), MockRow( EVENT_SCRIPT_STARTED, {ATTR_ENTITY_ID: "script.bye", ATTR_NAME: "Bye Script"}, ), ], ) assert event1["name"] == "Hello Script" assert event1["domain"] == "script" assert event1["message"] == "started" assert event1["entity_id"] == "script.hello" assert event2["name"] == "Bye Script" assert event2["domain"] == "script" assert event2["message"] == "started" assert event2["entity_id"] == "script.bye" @pytest.mark.parametrize("concurrently", [False, True]) async def test_concurrent_script(hass: HomeAssistant, concurrently) -> None: """Test calling script concurrently or not.""" if concurrently: call_script_2 = { "service": "script.turn_on", "data": {"entity_id": "script.script2"}, } else: call_script_2 = {"service": "script.script2"} assert await async_setup_component( hass, "script", { "script": { "script1": { "mode": "parallel", "sequence": [ call_script_2, { "wait_template": "{{ is_state('input_boolean.test1', 'on') }}" }, {"service": "test.script", "data": {"value": "script1"}}, ], }, "script2": { "mode": "parallel", "sequence": [ {"service": "test.script", "data": {"value": "script2a"}}, { "wait_template": "{{ is_state('input_boolean.test2', 'on') }}" }, {"service": "test.script", "data": {"value": "script2b"}}, ], }, } }, ) service_called = asyncio.Event() service_values = [] async def async_service_handler(service): nonlocal service_values service_values.append(service.data.get("value")) service_called.set() hass.services.async_register("test", "script", async_service_handler) hass.states.async_set("input_boolean.test1", "off") hass.states.async_set("input_boolean.test2", "off") await hass.services.async_call("script", "script1") await asyncio.wait_for(service_called.wait(), 1) service_called.clear() assert service_values[-1] == "script2a" assert script.is_on(hass, "script.script1") assert script.is_on(hass, "script.script2") if not concurrently: hass.states.async_set("input_boolean.test2", "on") await asyncio.wait_for(service_called.wait(), 1) service_called.clear() assert service_values[-1] == "script2b" hass.states.async_set("input_boolean.test1", "on") await asyncio.wait_for(service_called.wait(), 1) service_called.clear() assert service_values[-1] == "script1" assert concurrently == script.is_on(hass, "script.script2") if concurrently: hass.states.async_set("input_boolean.test2", "on") await asyncio.wait_for(service_called.wait(), 1) service_called.clear() assert service_values[-1] == "script2b" await hass.async_block_till_done() assert not script.is_on(hass, "script.script1") assert not script.is_on(hass, "script.script2") async def test_script_variables( hass: HomeAssistant, caplog: pytest.LogCaptureFixture ) -> None: """Test defining scripts.""" assert await async_setup_component( hass, "script", { "script": { "script1": { "variables": { "this_variable": "{{this.entity_id}}", "test_var": "from_config", "templated_config_var": "{{ var_from_service | default('config-default') }}", }, "sequence": [ { "service": "test.script", "data": { "value": "{{ test_var }}", "templated_config_var": "{{ templated_config_var }}", "this_template": "{{this.entity_id}}", "this_variable": "{{this_variable}}", }, }, ], }, "script2": { "variables": { "test_var": "from_config", }, "sequence": [ { "service": "test.script", "data": { "value": "{{ test_var }}", }, }, ], }, "script3": { "variables": { "test_var": "{{ break + 1 }}", }, "sequence": [ { "service": "test.script", "data": { "value": "{{ test_var }}", }, }, ], }, } }, ) mock_calls = async_mock_service(hass, "test", "script") await hass.services.async_call( "script", "script1", {"var_from_service": "hello"}, blocking=True ) assert len(mock_calls) == 1 assert mock_calls[0].data["value"] == "from_config" assert mock_calls[0].data["templated_config_var"] == "hello" # Verify this available to all templates assert mock_calls[0].data.get("this_template") == "script.script1" # Verify this available during trigger variables rendering assert mock_calls[0].data.get("this_variable") == "script.script1" await hass.services.async_call( "script", "script1", {"test_var": "from_service"}, blocking=True ) assert len(mock_calls) == 2 assert mock_calls[1].data["value"] == "from_service" assert mock_calls[1].data["templated_config_var"] == "config-default" # Call script with vars but no templates in it await hass.services.async_call( "script", "script2", {"test_var": "from_service"}, blocking=True ) assert len(mock_calls) == 3 assert mock_calls[2].data["value"] == "from_service" assert "Error rendering variables" not in caplog.text with pytest.raises(template.TemplateError): await hass.services.async_call("script", "script3", blocking=True) assert "Error rendering variables" in caplog.text assert len(mock_calls) == 3 await hass.services.async_call("script", "script3", {"break": 0}, blocking=True) assert len(mock_calls) == 4 assert mock_calls[3].data["value"] == 1 async def test_script_this_var_always( hass: HomeAssistant, caplog: pytest.LogCaptureFixture ) -> None: """Test script always has reference to this, even with no variabls are configured.""" assert await async_setup_component( hass, "script", { "script": { "script1": { "sequence": [ { "service": "test.script", "data": { "this_template": "{{this.entity_id}}", }, }, ], }, }, }, ) mock_calls = async_mock_service(hass, "test", "script") await hass.services.async_call("script", "script1", blocking=True) assert len(mock_calls) == 1 # Verify this available to all templates assert mock_calls[0].data.get("this_template") == "script.script1" assert "Error rendering variables" not in caplog.text async def test_script_restore_last_triggered(hass: HomeAssistant) -> None: """Test if last triggered is restored on start.""" time = dt_util.utcnow() mock_restore_cache( hass, ( State("script.no_last_triggered", STATE_OFF), State("script.last_triggered", STATE_OFF, {"last_triggered": time}), ), ) hass.state = CoreState.starting assert await async_setup_component( hass, "script", { "script": { "no_last_triggered": { "sequence": [{"delay": {"seconds": 5}}], }, "last_triggered": { "sequence": [{"delay": {"seconds": 5}}], }, }, }, ) state = hass.states.get("script.no_last_triggered") assert state assert state.attributes["last_triggered"] is None state = hass.states.get("script.last_triggered") assert state assert state.attributes["last_triggered"] == time @pytest.mark.parametrize( ("script_mode", "warning_msg"), ( (SCRIPT_MODE_PARALLEL, "Maximum number of runs exceeded"), (SCRIPT_MODE_QUEUED, "Disallowed recursion detected"), (SCRIPT_MODE_RESTART, "Disallowed recursion detected"), (SCRIPT_MODE_SINGLE, "Already running"), ), ) async def test_recursive_script( hass: HomeAssistant, script_mode, warning_msg, caplog: pytest.LogCaptureFixture ) -> None: """Test recursive script calls does not deadlock.""" # Make sure we cover all script modes assert [ SCRIPT_MODE_PARALLEL, SCRIPT_MODE_QUEUED, SCRIPT_MODE_RESTART, SCRIPT_MODE_SINGLE, ] == SCRIPT_MODE_CHOICES assert await async_setup_component( hass, "script", { "script": { "script1": { "mode": script_mode, "sequence": [ {"service": "script.script1"}, {"service": "test.script"}, ], }, } }, ) service_called = asyncio.Event() async def async_service_handler(service): service_called.set() hass.services.async_register("test", "script", async_service_handler) await hass.services.async_call("script", "script1") await asyncio.wait_for(service_called.wait(), 1) assert warning_msg in caplog.text @pytest.mark.parametrize( ("script_mode", "warning_msg"), ( (SCRIPT_MODE_PARALLEL, "Maximum number of runs exceeded"), (SCRIPT_MODE_QUEUED, "Disallowed recursion detected"), (SCRIPT_MODE_RESTART, "Disallowed recursion detected"), (SCRIPT_MODE_SINGLE, "Already running"), ), ) async def test_recursive_script_indirect( hass: HomeAssistant, script_mode, warning_msg, caplog: pytest.LogCaptureFixture ) -> None: """Test recursive script calls does not deadlock.""" # Make sure we cover all script modes assert [ SCRIPT_MODE_PARALLEL, SCRIPT_MODE_QUEUED, SCRIPT_MODE_RESTART, SCRIPT_MODE_SINGLE, ] == SCRIPT_MODE_CHOICES assert await async_setup_component( hass, "script", { "script": { "script1": { "mode": script_mode, "sequence": [ {"service": "script.script2"}, ], }, "script2": { "mode": script_mode, "sequence": [ {"service": "script.script3"}, ], }, "script3": { "mode": script_mode, "sequence": [ {"service": "script.script4"}, ], }, "script4": { "mode": script_mode, "sequence": [ {"service": "script.script1"}, {"service": "test.script"}, ], }, } }, ) service_called = asyncio.Event() async def async_service_handler(service): service_called.set() hass.services.async_register("test", "script", async_service_handler) await hass.services.async_call("script", "script1") await asyncio.wait_for(service_called.wait(), 1) assert warning_msg in caplog.text @pytest.mark.parametrize( "script_mode", [SCRIPT_MODE_PARALLEL, SCRIPT_MODE_QUEUED, SCRIPT_MODE_RESTART] ) async def test_recursive_script_turn_on( hass: HomeAssistant, script_mode, caplog: pytest.LogCaptureFixture ) -> None: """Test script turning itself on. - Illegal recursion detection should not be triggered - Home Assistant should not hang on shut down - SCRIPT_MODE_SINGLE is not relevant because suca script can't turn itself on """ # Make sure we cover all script modes assert [ SCRIPT_MODE_PARALLEL, SCRIPT_MODE_QUEUED, SCRIPT_MODE_RESTART, SCRIPT_MODE_SINGLE, ] == SCRIPT_MODE_CHOICES stop_scripts_at_shutdown_called = asyncio.Event() real_stop_scripts_at_shutdown = _async_stop_scripts_at_shutdown async def stop_scripts_at_shutdown(*args): await real_stop_scripts_at_shutdown(*args) stop_scripts_at_shutdown_called.set() with patch( "homeassistant.helpers.script._async_stop_scripts_at_shutdown", wraps=stop_scripts_at_shutdown, ): assert await async_setup_component( hass, script.DOMAIN, { script.DOMAIN: { "script1": { "mode": script_mode, "sequence": [ { "choose": { "conditions": { "condition": "template", "value_template": "{{ request == 'step_2' }}", }, "sequence": {"service": "test.script_done"}, }, "default": { "service": "script.turn_on", "data": { "entity_id": "script.script1", "variables": {"request": "step_2"}, }, }, }, { "service": "script.turn_on", "data": {"entity_id": "script.script1"}, }, ], } } }, ) service_called = asyncio.Event() async def async_service_handler(service): if service.service == "script_done": service_called.set() hass.services.async_register("test", "script_done", async_service_handler) await hass.services.async_call("script", "script1") await asyncio.wait_for(service_called.wait(), 1) # Trigger 1st stage script shutdown hass.state = CoreState.stopping hass.bus.async_fire("homeassistant_stop") await asyncio.wait_for(stop_scripts_at_shutdown_called.wait(), 1) # Trigger 2nd stage script shutdown async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=90)) await hass.async_block_till_done() assert "Disallowed recursion detected" not in caplog.text async def test_setup_with_duplicate_scripts( hass: HomeAssistant, caplog: pytest.LogCaptureFixture ) -> None: """Test setup with duplicate configs.""" assert await async_setup_component( hass, "script", { "script one": { "duplicate": { "sequence": [], }, }, "script two": { "duplicate": { "sequence": [], }, }, }, ) assert "Duplicate script detected with name: 'duplicate'" in caplog.text assert len(hass.states.async_entity_ids("script")) == 1 async def test_websocket_config( hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test config command.""" config = { "alias": "hello", "sequence": [{"service": "light.turn_on"}], } assert await async_setup_component( hass, "script", { "script": { "hello": config, }, }, ) client = await hass_ws_client(hass) await client.send_json( { "id": 5, "type": "script/config", "entity_id": "script.hello", } ) msg = await client.receive_json() assert msg["success"] assert msg["result"] == {"config": config} await client.send_json( { "id": 6, "type": "script/config", "entity_id": "script.not_exist", } ) msg = await client.receive_json() assert not msg["success"] assert msg["error"]["code"] == "not_found" async def test_script_service_changed_entity_id(hass: HomeAssistant) -> None: """Test the script service works for scripts with overridden entity_id.""" entity_reg = er.async_get(hass) entry = entity_reg.async_get_or_create("script", "script", "test") entry = entity_reg.async_update_entity( entry.entity_id, new_entity_id="script.custom_entity_id" ) assert entry.entity_id == "script.custom_entity_id" calls = [] @callback def record_call(service): """Add recorded event to set.""" calls.append(service) hass.services.async_register("test", "script", record_call) # Make sure the service of a script with overridden entity_id works assert await async_setup_component( hass, "script", { "script": { "test": { "sequence": { "service": "test.script", "data_template": {"entity_id": "{{ this.entity_id }}"}, } } } }, ) await hass.services.async_call(DOMAIN, "test", {"greeting": "world"}) await hass.async_block_till_done() assert len(calls) == 1 assert calls[0].data["entity_id"] == "script.custom_entity_id" # Change entity while the script entity is loaded, and make sure the service still works entry = entity_reg.async_update_entity( entry.entity_id, new_entity_id="script.custom_entity_id_2" ) assert entry.entity_id == "script.custom_entity_id_2" await hass.async_block_till_done() await hass.services.async_call(DOMAIN, "test", {"greeting": "world"}) await hass.async_block_till_done() assert len(calls) == 2 assert calls[1].data["entity_id"] == "script.custom_entity_id_2" @pytest.mark.parametrize( ("blueprint_inputs", "problem", "details"), ( ( # No input {}, "Failed to generate script from blueprint", "Missing input service_to_call", ), ( # Missing input {"a_number": 5}, "Failed to generate script from blueprint", "Missing input service_to_call", ), ( # Wrong input { "trigger_event": "blueprint_event", "service_to_call": {"dict": "not allowed"}, "a_number": 5, }, "Blueprint 'Call service' generated invalid script", "value should be a string for dictionary value @ data['sequence'][0]['service']", ), ), ) async def test_blueprint_script_bad_config( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, blueprint_inputs, problem, details, ) -> None: """Test blueprint script with bad inputs.""" assert await async_setup_component( hass, script.DOMAIN, { script.DOMAIN: { "test_script": { "use_blueprint": { "path": "test_service.yaml", "input": blueprint_inputs, } } } }, ) assert problem in caplog.text assert details in caplog.text async def test_blueprint_script_fails_substitution( hass: HomeAssistant, caplog: pytest.LogCaptureFixture ) -> None: """Test blueprint script with bad inputs.""" with patch( "homeassistant.components.blueprint.models.BlueprintInputs.async_substitute", side_effect=yaml.UndefinedSubstitution("blah"), ): assert await async_setup_component( hass, script.DOMAIN, { script.DOMAIN: { "test_script": { "use_blueprint": { "path": "test_service.yaml", "input": { "service_to_call": "test.automation", }, } } } }, ) assert ( "Blueprint 'Call service' failed to generate script with inputs " "{'service_to_call': 'test.automation'}: No substitution found for input blah" in caplog.text )