core/tests/components/script/test_init.py

1418 lines
43 KiB
Python
Raw Normal View History

2016-03-09 09:25:50 +00:00
"""The tests for the Script component."""
import asyncio
from datetime import timedelta
2021-01-01 21:31:56 +00:00
from unittest.mock import Mock, patch
2015-10-15 06:09:52 +00:00
import pytest
from homeassistant.components import script
from homeassistant.components.script import DOMAIN, EVENT_SCRIPT_STARTED, ScriptEntity
from homeassistant.const import (
2019-07-31 19:25:30 +00:00
ATTR_ENTITY_ID,
ATTR_NAME,
SERVICE_RELOAD,
SERVICE_TOGGLE,
SERVICE_TURN_OFF,
SERVICE_TURN_ON,
STATE_OFF,
)
from homeassistant.core import (
Context,
CoreState,
HomeAssistant,
State,
callback,
split_entity_id,
2019-07-31 19:25:30 +00:00
)
from homeassistant.exceptions import ServiceNotFound
2022-04-21 18:04:02 +00:00
from homeassistant.helpers import entity_registry as er, template
from homeassistant.helpers.event import async_track_state_change
from homeassistant.helpers.script import (
SCRIPT_MODE_CHOICES,
SCRIPT_MODE_PARALLEL,
SCRIPT_MODE_QUEUED,
SCRIPT_MODE_RESTART,
SCRIPT_MODE_SINGLE,
_async_stop_scripts_at_shutdown,
)
from homeassistant.helpers.service import async_get_all_descriptions
from homeassistant.setup import async_setup_component
from homeassistant.util import yaml
import homeassistant.util.dt as dt_util
2015-10-15 06:09:52 +00:00
from tests.common import async_fire_time_changed, async_mock_service, mock_restore_cache
from tests.components.logbook.common import MockRow, mock_humanify
from tests.typing import WebSocketGenerator
2015-10-15 06:09:52 +00:00
2019-07-31 19:25:30 +00:00
ENTITY_ID = "script.test"
2015-10-15 06:09:52 +00:00
@pytest.fixture
def calls(hass):
"""Track calls to a mock service."""
return async_mock_service(hass, "test", "script")
async def test_passing_variables(hass: HomeAssistant) -> None:
"""Test different ways of passing in variables."""
mock_restore_cache(hass, ())
calls = []
context = Context()
@callback
def record_call(service):
"""Add recorded event to set."""
calls.append(service)
hass.services.async_register("test", "script", record_call)
assert await async_setup_component(
hass,
"script",
{
"script": {
"test": {
"sequence": {
"service": "test.script",
"data_template": {"hello": "{{ greeting }}"},
2019-07-31 19:25:30 +00:00
}
}
}
},
)
await hass.services.async_call(
DOMAIN, "test", {"greeting": "world"}, context=context
)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].context is context
assert calls[0].data["hello"] == "world"
await hass.services.async_call(
"script", "test", {"greeting": "universe"}, context=context
)
await hass.async_block_till_done()
assert len(calls) == 2
assert calls[1].context is context
assert calls[1].data["hello"] == "universe"
@pytest.mark.parametrize("toggle", [False, True])
async def test_turn_on_off_toggle(hass: HomeAssistant, toggle) -> None:
"""Verify turn_on, turn_off & toggle services."""
event = "test_event"
event_mock = Mock()
hass.bus.async_listen(event, event_mock)
was_on = False
@callback
def state_listener(entity_id, old_state, new_state):
nonlocal was_on
was_on = True
async_track_state_change(hass, ENTITY_ID, state_listener, to_state="on")
if toggle:
turn_off_step = {"service": "script.toggle", "entity_id": ENTITY_ID}
else:
turn_off_step = {"service": "script.turn_off", "entity_id": ENTITY_ID}
assert await async_setup_component(
hass,
"script",
{
"script": {
"test": {
"sequence": [{"event": event}, turn_off_step, {"event": event}]
}
}
},
)
assert not script.is_on(hass, ENTITY_ID)
if toggle:
await hass.services.async_call(
DOMAIN, SERVICE_TOGGLE, {ATTR_ENTITY_ID: ENTITY_ID}
)
else:
await hass.services.async_call(DOMAIN, split_entity_id(ENTITY_ID)[1])
await hass.async_block_till_done()
assert not script.is_on(hass, ENTITY_ID)
assert was_on
assert event_mock.call_count == 1
invalid_configs = [
{"test": {}},
{"test hello world": {"sequence": [{"event": "bla"}]}},
{"test": {"sequence": {"event": "test_event", "service": "homeassistant.turn_on"}}},
]
2019-07-31 19:25:30 +00:00
@pytest.mark.parametrize("value", invalid_configs)
async def test_setup_with_invalid_configs(hass: HomeAssistant, value) -> None:
"""Test setup with invalid configs."""
assert await async_setup_component(
hass, "script", {"script": value}
), f"Script loaded with wrong config {value}"
assert len(hass.states.async_entity_ids("script")) == 0
@pytest.mark.parametrize(
("object_id", "broken_config", "problem", "details"),
(
(
"Bad Script",
{},
"has invalid object id",
"invalid slug Bad Script",
),
(
"bad_script",
{},
"could not be validated",
"required key not provided @ data['sequence']",
),
(
"bad_script",
{
"sequence": {
"condition": "state",
# The UUID will fail being resolved to en entity_id
"entity_id": "abcdabcdabcdabcdabcdabcdabcdabcd",
"state": "blah",
},
},
"failed to setup actions",
"Unknown entity registry entry abcdabcdabcdabcdabcdabcdabcdabcd.",
),
),
)
async def test_bad_config_validation(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
object_id,
broken_config,
problem,
details,
) -> None:
"""Test bad script configuration which can be detected during validation."""
assert await async_setup_component(
hass,
script.DOMAIN,
{
script.DOMAIN: {
object_id: {"alias": "bad_script", **broken_config},
"good_script": {
"alias": "good_script",
"sequence": {
"service": "test.automation",
"entity_id": "hello.world",
},
},
}
},
)
# Check we get the expected error message
assert (
f"Script with alias 'bad_script' {problem} and has been disabled: {details}"
in caplog.text
)
# Make sure one bad automation does not prevent other automations from setting up
assert hass.states.async_entity_ids("script") == ["script.good_script"]
@pytest.mark.parametrize("running", ["no", "same", "different"])
async def test_reload_service(hass: HomeAssistant, running) -> None:
"""Verify the reload service."""
event = "test_event"
event_flag = asyncio.Event()
@callback
def event_handler(event):
event_flag.set()
hass.bus.async_listen_once(event, event_handler)
hass.states.async_set("test.script", "off")
assert await async_setup_component(
hass,
"script",
{
"script": {
"test": {
"sequence": [
{"event": event},
{"wait_template": "{{ is_state('test.script', 'on') }}"},
]
}
}
},
)
assert hass.states.get(ENTITY_ID) is not None
assert hass.services.has_service(script.DOMAIN, "test")
if running != "no":
_, object_id = split_entity_id(ENTITY_ID)
await hass.services.async_call(DOMAIN, object_id)
await asyncio.wait_for(event_flag.wait(), 1)
assert script.is_on(hass, ENTITY_ID)
object_id = "test" if running == "same" else "test2"
with patch(
"homeassistant.config.load_yaml_config_file",
return_value={"script": {object_id: {"sequence": [{"delay": {"seconds": 5}}]}}},
):
await hass.services.async_call(DOMAIN, SERVICE_RELOAD, blocking=True)
await hass.async_block_till_done()
if running != "same":
2022-04-21 18:04:02 +00:00
state = hass.states.get(ENTITY_ID)
assert state.attributes["restored"] is True
assert not hass.services.has_service(script.DOMAIN, "test")
assert hass.states.get("script.test2") is not None
assert hass.services.has_service(script.DOMAIN, "test2")
else:
assert hass.states.get(ENTITY_ID) is not None
assert hass.services.has_service(script.DOMAIN, "test")
async def test_reload_unchanged_does_not_stop(hass: HomeAssistant, calls) -> None:
"""Test that reloading stops any running actions as appropriate."""
test_entity = "test.entity"
config = {
script.DOMAIN: {
"test": {
"sequence": [
{"event": "running"},
{"wait_template": "{{ is_state('test.entity', 'goodbye') }}"},
{"service": "test.script"},
],
}
}
}
assert await async_setup_component(hass, script.DOMAIN, config)
assert hass.states.get(ENTITY_ID) is not None
assert hass.services.has_service(script.DOMAIN, "test")
running = asyncio.Event()
@callback
def running_cb(event):
running.set()
hass.bus.async_listen_once("running", running_cb)
hass.states.async_set(test_entity, "hello")
# Start the script and wait for it to start
_, object_id = split_entity_id(ENTITY_ID)
await hass.services.async_call(DOMAIN, object_id)
await running.wait()
assert len(calls) == 0
with patch(
"homeassistant.config.load_yaml_config_file",
autospec=True,
return_value=config,
):
await hass.services.async_call(script.DOMAIN, SERVICE_RELOAD, blocking=True)
hass.states.async_set(test_entity, "goodbye")
await hass.async_block_till_done()
assert len(calls) == 1
@pytest.mark.parametrize(
"script_config",
(
{
"test": {
"sequence": [{"service": "test.script"}],
}
},
# A script using templates
{
"test": {
"sequence": [{"service": "{{ 'test.script' }}"}],
}
},
# A script using blueprint
{
"test": {
"use_blueprint": {
"path": "test_service.yaml",
"input": {
"service_to_call": "test.script",
},
}
}
},
# A script using blueprint with templated input
{
"test": {
"use_blueprint": {
"path": "test_service.yaml",
"input": {
"service_to_call": "{{ 'test.script' }}",
},
}
}
},
),
)
async def test_reload_unchanged_script(
hass: HomeAssistant, calls, script_config
) -> None:
"""Test an unmodified script is not reloaded."""
with patch(
"homeassistant.components.script.ScriptEntity", wraps=ScriptEntity
) as script_entity_init:
config = {script.DOMAIN: [script_config]}
assert await async_setup_component(hass, script.DOMAIN, config)
assert hass.states.get(ENTITY_ID) is not None
assert hass.services.has_service(script.DOMAIN, "test")
assert script_entity_init.call_count == 1
script_entity_init.reset_mock()
# Start the script and wait for it to finish
_, object_id = split_entity_id(ENTITY_ID)
await hass.services.async_call(DOMAIN, object_id)
await hass.async_block_till_done()
assert len(calls) == 1
# Reload the scripts without any change
with patch(
"homeassistant.config.load_yaml_config_file",
autospec=True,
return_value=config,
):
await hass.services.async_call(script.DOMAIN, SERVICE_RELOAD, blocking=True)
assert script_entity_init.call_count == 0
script_entity_init.reset_mock()
# Start the script and wait for it to start
_, object_id = split_entity_id(ENTITY_ID)
await hass.services.async_call(DOMAIN, object_id)
await hass.async_block_till_done()
assert len(calls) == 2
async def test_service_descriptions(hass: HomeAssistant) -> None:
"""Test that service descriptions are loaded and reloaded correctly."""
# Test 1: has "description" but no "fields"
assert await async_setup_component(
hass,
"script",
{
"script": {
"test": {
"description": "test description",
"sequence": [{"delay": {"seconds": 5}}],
}
}
},
)
descriptions = await async_get_all_descriptions(hass)
assert descriptions[DOMAIN]["test"]["name"] == "test"
assert descriptions[DOMAIN]["test"]["description"] == "test description"
assert not descriptions[DOMAIN]["test"]["fields"]
# Test 2: has "fields" but no "description"
with patch(
"homeassistant.config.load_yaml_config_file",
return_value={
"script": {
"test": {
"fields": {
"test_param": {
"description": "test_param description",
"example": "test_param example",
}
},
"sequence": [{"delay": {"seconds": 5}}],
}
}
},
):
await hass.services.async_call(DOMAIN, SERVICE_RELOAD, blocking=True)
descriptions = await async_get_all_descriptions(hass)
assert descriptions[script.DOMAIN]["test"]["description"] == ""
assert (
descriptions[script.DOMAIN]["test"]["fields"]["test_param"]["description"]
== "test_param description"
)
assert (
descriptions[script.DOMAIN]["test"]["fields"]["test_param"]["example"]
== "test_param example"
)
# Test 3: has "alias" that will be used as "name"
with patch(
"homeassistant.config.load_yaml_config_file",
return_value={
"script": {
"test_name": {
"alias": "ABC",
"sequence": [{"delay": {"seconds": 5}}],
}
}
},
):
await hass.services.async_call(DOMAIN, SERVICE_RELOAD, blocking=True)
descriptions = await async_get_all_descriptions(hass)
assert descriptions[DOMAIN]["test_name"]["name"] == "ABC"
# Test 4: verify that names from YAML are taken into account as well
assert descriptions[DOMAIN]["turn_on"]["name"] == "Turn on"
async def test_shared_context(hass: HomeAssistant) -> None:
"""Test that the shared context is passed down the chain."""
2019-07-31 19:25:30 +00:00
event = "test_event"
context = Context()
event_mock = Mock()
run_mock = Mock()
hass.bus.async_listen(event, event_mock)
hass.bus.async_listen(EVENT_SCRIPT_STARTED, run_mock)
2019-07-31 19:25:30 +00:00
assert await async_setup_component(
hass, "script", {"script": {"test": {"sequence": [{"event": event}]}}}
)
await hass.services.async_call(
DOMAIN, SERVICE_TURN_ON, {ATTR_ENTITY_ID: ENTITY_ID}, context=context
)
await hass.async_block_till_done()
assert event_mock.call_count == 1
assert run_mock.call_count == 1
args, kwargs = run_mock.call_args
assert args[0].context == context
# Ensure event data has all attributes set
2019-07-31 19:25:30 +00:00
assert args[0].data.get(ATTR_NAME) == "test"
assert args[0].data.get(ATTR_ENTITY_ID) == "script.test"
# Ensure context carries through the event
args, kwargs = event_mock.call_args
assert args[0].context == context
# Ensure the script state shares the same context
2019-07-31 19:25:30 +00:00
state = hass.states.get("script.test")
assert state is not None
assert state.context == context
async def test_logging_script_error(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test logging script error."""
2019-07-31 19:25:30 +00:00
assert await async_setup_component(
hass,
"script",
{"script": {"hello": {"sequence": [{"service": "non.existing"}]}}},
)
with pytest.raises(ServiceNotFound) as err:
2019-07-31 19:25:30 +00:00
await hass.services.async_call("script", "hello", blocking=True)
2019-07-31 19:25:30 +00:00
assert err.value.domain == "non"
assert err.value.service == "existing"
assert "Error executing script" in caplog.text
async def test_turning_no_scripts_off(hass: HomeAssistant) -> None:
"""Test it is possible to turn two scripts off."""
2019-07-31 19:25:30 +00:00
assert await async_setup_component(hass, "script", {})
# Testing it doesn't raise
await hass.services.async_call(
2019-07-31 19:25:30 +00:00
DOMAIN, SERVICE_TURN_OFF, {"entity_id": []}, blocking=True
)
async def test_async_get_descriptions_script(hass: HomeAssistant) -> None:
"""Test async_set_service_schema for the script integration."""
script_config = {
DOMAIN: {
"test1": {"sequence": [{"service": "homeassistant.restart"}]},
"test2": {
"description": "test2",
"fields": {
"param": {
"description": "param_description",
"example": "param_example",
}
},
"sequence": [{"service": "homeassistant.restart"}],
},
}
}
await async_setup_component(hass, DOMAIN, script_config)
descriptions = await async_get_all_descriptions(hass)
assert descriptions[DOMAIN]["test1"]["description"] == ""
assert not descriptions[DOMAIN]["test1"]["fields"]
assert descriptions[DOMAIN]["test2"]["description"] == "test2"
assert (
descriptions[DOMAIN]["test2"]["fields"]["param"]["description"]
== "param_description"
)
assert (
descriptions[DOMAIN]["test2"]["fields"]["param"]["example"] == "param_example"
)
async def test_extraction_functions(hass: HomeAssistant) -> None:
"""Test extraction functions."""
assert await async_setup_component(
hass,
DOMAIN,
{
DOMAIN: {
"test1": {
"sequence": [
{
"service": "test.script",
"data": {"entity_id": "light.in_both"},
},
{
"service": "test.script",
"data": {"entity_id": "light.in_first"},
},
{
"entity_id": "light.device_in_both",
"domain": "light",
"type": "turn_on",
"device_id": "device-in-both",
},
]
},
"test2": {
"sequence": [
{
"service": "test.script",
"data": {"entity_id": "light.in_both"},
},
{
"condition": "state",
"entity_id": "sensor.condition",
"state": "100",
},
{"scene": "scene.hello"},
{
"entity_id": "light.device_in_both",
"domain": "light",
"type": "turn_on",
"device_id": "device-in-both",
},
{
"entity_id": "light.device_in_last",
"domain": "light",
"type": "turn_on",
"device_id": "device-in-last",
},
],
},
}
},
)
assert set(script.scripts_with_entity(hass, "light.in_both")) == {
"script.test1",
"script.test2",
}
assert set(script.entities_in_script(hass, "script.test1")) == {
"light.in_both",
"light.in_first",
}
assert set(script.scripts_with_device(hass, "device-in-both")) == {
"script.test1",
"script.test2",
}
assert set(script.devices_in_script(hass, "script.test2")) == {
"device-in-both",
"device-in-last",
}
2020-02-17 16:44:36 +00:00
async def test_config_basic(hass: HomeAssistant) -> None:
2020-02-17 16:44:36 +00:00
"""Test passing info in config."""
assert await async_setup_component(
hass,
"script",
{
"script": {
"test_script": {
"alias": "Script Name",
"icon": "mdi:party",
"sequence": [],
}
}
},
)
test_script = hass.states.get("script.test_script")
assert test_script.name == "Script Name"
assert test_script.attributes["icon"] == "mdi:party"
2022-04-21 18:04:02 +00:00
registry = er.async_get(hass)
entry = registry.async_get("script.test_script")
assert entry
assert entry.unique_id == "test_script"
async def test_config_multiple_domains(hass: HomeAssistant) -> None:
"""Test splitting configuration over multiple domains."""
assert await async_setup_component(
hass,
"script",
{
"script": {
"first_script": {
"alias": "Main domain",
"sequence": [],
}
},
"script second": {
"second_script": {
"alias": "Secondary domain",
"sequence": [],
}
},
},
)
test_script = hass.states.get("script.first_script")
assert test_script
assert test_script.name == "Main domain"
test_script = hass.states.get("script.second_script")
assert test_script
assert test_script.name == "Secondary domain"
async def test_logbook_humanify_script_started_event(hass: HomeAssistant) -> None:
"""Test humanifying script started event."""
hass.config.components.add("recorder")
await async_setup_component(hass, DOMAIN, {})
await async_setup_component(hass, "logbook", {})
event1, event2 = mock_humanify(
hass,
[
MockRow(
EVENT_SCRIPT_STARTED,
{ATTR_ENTITY_ID: "script.hello", ATTR_NAME: "Hello Script"},
),
MockRow(
EVENT_SCRIPT_STARTED,
{ATTR_ENTITY_ID: "script.bye", ATTR_NAME: "Bye Script"},
),
],
)
assert event1["name"] == "Hello Script"
assert event1["domain"] == "script"
assert event1["message"] == "started"
assert event1["entity_id"] == "script.hello"
assert event2["name"] == "Bye Script"
assert event2["domain"] == "script"
assert event2["message"] == "started"
assert event2["entity_id"] == "script.bye"
@pytest.mark.parametrize("concurrently", [False, True])
async def test_concurrent_script(hass: HomeAssistant, concurrently) -> None:
"""Test calling script concurrently or not."""
if concurrently:
call_script_2 = {
"service": "script.turn_on",
"data": {"entity_id": "script.script2"},
}
else:
call_script_2 = {"service": "script.script2"}
assert await async_setup_component(
hass,
"script",
{
"script": {
"script1": {
"mode": "parallel",
"sequence": [
call_script_2,
{
"wait_template": "{{ is_state('input_boolean.test1', 'on') }}"
},
{"service": "test.script", "data": {"value": "script1"}},
],
},
"script2": {
"mode": "parallel",
"sequence": [
{"service": "test.script", "data": {"value": "script2a"}},
{
"wait_template": "{{ is_state('input_boolean.test2', 'on') }}"
},
{"service": "test.script", "data": {"value": "script2b"}},
],
},
}
},
)
service_called = asyncio.Event()
service_values = []
async def async_service_handler(service):
nonlocal service_values
service_values.append(service.data.get("value"))
service_called.set()
hass.services.async_register("test", "script", async_service_handler)
hass.states.async_set("input_boolean.test1", "off")
hass.states.async_set("input_boolean.test2", "off")
await hass.services.async_call("script", "script1")
await asyncio.wait_for(service_called.wait(), 1)
service_called.clear()
assert service_values[-1] == "script2a"
assert script.is_on(hass, "script.script1")
assert script.is_on(hass, "script.script2")
if not concurrently:
hass.states.async_set("input_boolean.test2", "on")
await asyncio.wait_for(service_called.wait(), 1)
service_called.clear()
assert service_values[-1] == "script2b"
hass.states.async_set("input_boolean.test1", "on")
await asyncio.wait_for(service_called.wait(), 1)
service_called.clear()
assert service_values[-1] == "script1"
assert concurrently == script.is_on(hass, "script.script2")
if concurrently:
hass.states.async_set("input_boolean.test2", "on")
await asyncio.wait_for(service_called.wait(), 1)
service_called.clear()
assert service_values[-1] == "script2b"
await hass.async_block_till_done()
assert not script.is_on(hass, "script.script1")
assert not script.is_on(hass, "script.script2")
async def test_script_variables(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test defining scripts."""
assert await async_setup_component(
hass,
"script",
{
"script": {
"script1": {
"variables": {
"this_variable": "{{this.entity_id}}",
"test_var": "from_config",
"templated_config_var": "{{ var_from_service | default('config-default') }}",
},
"sequence": [
{
"service": "test.script",
"data": {
"value": "{{ test_var }}",
"templated_config_var": "{{ templated_config_var }}",
"this_template": "{{this.entity_id}}",
"this_variable": "{{this_variable}}",
},
},
],
},
"script2": {
"variables": {
"test_var": "from_config",
},
"sequence": [
{
"service": "test.script",
"data": {
"value": "{{ test_var }}",
},
},
],
},
2020-09-11 10:24:16 +00:00
"script3": {
"variables": {
"test_var": "{{ break + 1 }}",
},
"sequence": [
{
"service": "test.script",
"data": {
"value": "{{ test_var }}",
},
},
],
},
}
},
)
mock_calls = async_mock_service(hass, "test", "script")
await hass.services.async_call(
"script", "script1", {"var_from_service": "hello"}, blocking=True
)
assert len(mock_calls) == 1
assert mock_calls[0].data["value"] == "from_config"
assert mock_calls[0].data["templated_config_var"] == "hello"
# Verify this available to all templates
assert mock_calls[0].data.get("this_template") == "script.script1"
# Verify this available during trigger variables rendering
assert mock_calls[0].data.get("this_variable") == "script.script1"
await hass.services.async_call(
"script", "script1", {"test_var": "from_service"}, blocking=True
)
assert len(mock_calls) == 2
assert mock_calls[1].data["value"] == "from_service"
assert mock_calls[1].data["templated_config_var"] == "config-default"
# Call script with vars but no templates in it
await hass.services.async_call(
"script", "script2", {"test_var": "from_service"}, blocking=True
)
assert len(mock_calls) == 3
assert mock_calls[2].data["value"] == "from_service"
2020-09-11 10:24:16 +00:00
assert "Error rendering variables" not in caplog.text
with pytest.raises(template.TemplateError):
await hass.services.async_call("script", "script3", blocking=True)
assert "Error rendering variables" in caplog.text
assert len(mock_calls) == 3
await hass.services.async_call("script", "script3", {"break": 0}, blocking=True)
assert len(mock_calls) == 4
assert mock_calls[3].data["value"] == 1
async def test_script_this_var_always(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test script always has reference to this, even with no variabls are configured."""
assert await async_setup_component(
hass,
"script",
{
"script": {
"script1": {
"sequence": [
{
"service": "test.script",
"data": {
"this_template": "{{this.entity_id}}",
},
},
],
},
},
},
)
mock_calls = async_mock_service(hass, "test", "script")
await hass.services.async_call("script", "script1", blocking=True)
assert len(mock_calls) == 1
# Verify this available to all templates
assert mock_calls[0].data.get("this_template") == "script.script1"
assert "Error rendering variables" not in caplog.text
async def test_script_restore_last_triggered(hass: HomeAssistant) -> None:
"""Test if last triggered is restored on start."""
time = dt_util.utcnow()
mock_restore_cache(
hass,
(
State("script.no_last_triggered", STATE_OFF),
State("script.last_triggered", STATE_OFF, {"last_triggered": time}),
),
)
hass.state = CoreState.starting
assert await async_setup_component(
hass,
"script",
{
"script": {
"no_last_triggered": {
"sequence": [{"delay": {"seconds": 5}}],
},
"last_triggered": {
"sequence": [{"delay": {"seconds": 5}}],
},
},
},
)
state = hass.states.get("script.no_last_triggered")
assert state
assert state.attributes["last_triggered"] is None
state = hass.states.get("script.last_triggered")
assert state
assert state.attributes["last_triggered"] == time
@pytest.mark.parametrize(
("script_mode", "warning_msg"),
(
(SCRIPT_MODE_PARALLEL, "Maximum number of runs exceeded"),
(SCRIPT_MODE_QUEUED, "Disallowed recursion detected"),
(SCRIPT_MODE_RESTART, "Disallowed recursion detected"),
(SCRIPT_MODE_SINGLE, "Already running"),
),
)
async def test_recursive_script(
hass: HomeAssistant, script_mode, warning_msg, caplog: pytest.LogCaptureFixture
) -> None:
"""Test recursive script calls does not deadlock."""
# Make sure we cover all script modes
2023-01-30 10:03:23 +00:00
assert [
SCRIPT_MODE_PARALLEL,
SCRIPT_MODE_QUEUED,
SCRIPT_MODE_RESTART,
SCRIPT_MODE_SINGLE,
2023-01-30 10:03:23 +00:00
] == SCRIPT_MODE_CHOICES
assert await async_setup_component(
hass,
"script",
{
"script": {
"script1": {
"mode": script_mode,
"sequence": [
{"service": "script.script1"},
{"service": "test.script"},
],
},
}
},
)
service_called = asyncio.Event()
async def async_service_handler(service):
service_called.set()
hass.services.async_register("test", "script", async_service_handler)
await hass.services.async_call("script", "script1")
await asyncio.wait_for(service_called.wait(), 1)
assert warning_msg in caplog.text
@pytest.mark.parametrize(
("script_mode", "warning_msg"),
(
(SCRIPT_MODE_PARALLEL, "Maximum number of runs exceeded"),
(SCRIPT_MODE_QUEUED, "Disallowed recursion detected"),
(SCRIPT_MODE_RESTART, "Disallowed recursion detected"),
(SCRIPT_MODE_SINGLE, "Already running"),
),
)
async def test_recursive_script_indirect(
hass: HomeAssistant, script_mode, warning_msg, caplog: pytest.LogCaptureFixture
) -> None:
"""Test recursive script calls does not deadlock."""
# Make sure we cover all script modes
2023-01-30 10:03:23 +00:00
assert [
SCRIPT_MODE_PARALLEL,
SCRIPT_MODE_QUEUED,
SCRIPT_MODE_RESTART,
SCRIPT_MODE_SINGLE,
2023-01-30 10:03:23 +00:00
] == SCRIPT_MODE_CHOICES
assert await async_setup_component(
hass,
"script",
{
"script": {
"script1": {
"mode": script_mode,
"sequence": [
{"service": "script.script2"},
],
},
"script2": {
"mode": script_mode,
"sequence": [
{"service": "script.script3"},
],
},
"script3": {
"mode": script_mode,
"sequence": [
{"service": "script.script4"},
],
},
"script4": {
"mode": script_mode,
"sequence": [
{"service": "script.script1"},
{"service": "test.script"},
],
},
}
},
)
service_called = asyncio.Event()
async def async_service_handler(service):
service_called.set()
hass.services.async_register("test", "script", async_service_handler)
await hass.services.async_call("script", "script1")
await asyncio.wait_for(service_called.wait(), 1)
assert warning_msg in caplog.text
@pytest.mark.parametrize(
"script_mode", [SCRIPT_MODE_PARALLEL, SCRIPT_MODE_QUEUED, SCRIPT_MODE_RESTART]
)
async def test_recursive_script_turn_on(
hass: HomeAssistant, script_mode, caplog: pytest.LogCaptureFixture
) -> None:
"""Test script turning itself on.
- Illegal recursion detection should not be triggered
- Home Assistant should not hang on shut down
- SCRIPT_MODE_SINGLE is not relevant because suca script can't turn itself on
"""
# Make sure we cover all script modes
2023-01-30 10:03:23 +00:00
assert [
SCRIPT_MODE_PARALLEL,
SCRIPT_MODE_QUEUED,
SCRIPT_MODE_RESTART,
SCRIPT_MODE_SINGLE,
2023-01-30 10:03:23 +00:00
] == SCRIPT_MODE_CHOICES
stop_scripts_at_shutdown_called = asyncio.Event()
real_stop_scripts_at_shutdown = _async_stop_scripts_at_shutdown
async def stop_scripts_at_shutdown(*args):
await real_stop_scripts_at_shutdown(*args)
stop_scripts_at_shutdown_called.set()
with patch(
"homeassistant.helpers.script._async_stop_scripts_at_shutdown",
wraps=stop_scripts_at_shutdown,
):
assert await async_setup_component(
hass,
script.DOMAIN,
{
script.DOMAIN: {
"script1": {
"mode": script_mode,
"sequence": [
{
"choose": {
"conditions": {
"condition": "template",
"value_template": "{{ request == 'step_2' }}",
},
"sequence": {"service": "test.script_done"},
},
"default": {
"service": "script.turn_on",
"data": {
"entity_id": "script.script1",
"variables": {"request": "step_2"},
},
},
},
{
"service": "script.turn_on",
"data": {"entity_id": "script.script1"},
},
],
}
}
},
)
service_called = asyncio.Event()
async def async_service_handler(service):
if service.service == "script_done":
service_called.set()
hass.services.async_register("test", "script_done", async_service_handler)
await hass.services.async_call("script", "script1")
await asyncio.wait_for(service_called.wait(), 1)
# Trigger 1st stage script shutdown
hass.state = CoreState.stopping
hass.bus.async_fire("homeassistant_stop")
await asyncio.wait_for(stop_scripts_at_shutdown_called.wait(), 1)
# Trigger 2nd stage script shutdown
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=90))
await hass.async_block_till_done()
assert "Disallowed recursion detected" not in caplog.text
async def test_setup_with_duplicate_scripts(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test setup with duplicate configs."""
assert await async_setup_component(
hass,
"script",
{
"script one": {
"duplicate": {
"sequence": [],
},
},
"script two": {
"duplicate": {
"sequence": [],
},
},
},
)
assert "Duplicate script detected with name: 'duplicate'" in caplog.text
assert len(hass.states.async_entity_ids("script")) == 1
async def test_websocket_config(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
2022-09-28 11:52:58 +00:00
"""Test config command."""
config = {
"alias": "hello",
"sequence": [{"service": "light.turn_on"}],
}
assert await async_setup_component(
hass,
"script",
{
"script": {
"hello": config,
},
},
)
client = await hass_ws_client(hass)
await client.send_json(
{
"id": 5,
"type": "script/config",
"entity_id": "script.hello",
}
)
msg = await client.receive_json()
assert msg["success"]
assert msg["result"] == {"config": config}
await client.send_json(
{
"id": 6,
"type": "script/config",
"entity_id": "script.not_exist",
}
)
msg = await client.receive_json()
assert not msg["success"]
assert msg["error"]["code"] == "not_found"
async def test_script_service_changed_entity_id(hass: HomeAssistant) -> None:
"""Test the script service works for scripts with overridden entity_id."""
entity_reg = er.async_get(hass)
entry = entity_reg.async_get_or_create("script", "script", "test")
entry = entity_reg.async_update_entity(
entry.entity_id, new_entity_id="script.custom_entity_id"
)
assert entry.entity_id == "script.custom_entity_id"
calls = []
@callback
def record_call(service):
"""Add recorded event to set."""
calls.append(service)
hass.services.async_register("test", "script", record_call)
# Make sure the service of a script with overridden entity_id works
assert await async_setup_component(
hass,
"script",
{
"script": {
"test": {
"sequence": {
"service": "test.script",
"data_template": {"entity_id": "{{ this.entity_id }}"},
}
}
}
},
)
await hass.services.async_call(DOMAIN, "test", {"greeting": "world"})
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].data["entity_id"] == "script.custom_entity_id"
# Change entity while the script entity is loaded, and make sure the service still works
entry = entity_reg.async_update_entity(
entry.entity_id, new_entity_id="script.custom_entity_id_2"
)
assert entry.entity_id == "script.custom_entity_id_2"
await hass.async_block_till_done()
await hass.services.async_call(DOMAIN, "test", {"greeting": "world"})
await hass.async_block_till_done()
assert len(calls) == 2
assert calls[1].data["entity_id"] == "script.custom_entity_id_2"
@pytest.mark.parametrize(
("blueprint_inputs", "problem", "details"),
(
(
# No input
{},
"Failed to generate script from blueprint",
"Missing input service_to_call",
),
(
# Missing input
{"a_number": 5},
"Failed to generate script from blueprint",
"Missing input service_to_call",
),
(
# Wrong input
{
"trigger_event": "blueprint_event",
"service_to_call": {"dict": "not allowed"},
"a_number": 5,
},
"Blueprint 'Call service' generated invalid script",
"value should be a string for dictionary value @ data['sequence'][0]['service']",
),
),
)
async def test_blueprint_script_bad_config(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
blueprint_inputs,
problem,
details,
) -> None:
"""Test blueprint script with bad inputs."""
assert await async_setup_component(
hass,
script.DOMAIN,
{
script.DOMAIN: {
"test_script": {
"use_blueprint": {
"path": "test_service.yaml",
"input": blueprint_inputs,
}
}
}
},
)
assert problem in caplog.text
assert details in caplog.text
async def test_blueprint_script_fails_substitution(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test blueprint script with bad inputs."""
with patch(
"homeassistant.components.blueprint.models.BlueprintInputs.async_substitute",
side_effect=yaml.UndefinedSubstitution("blah"),
):
assert await async_setup_component(
hass,
script.DOMAIN,
{
script.DOMAIN: {
"test_script": {
"use_blueprint": {
"path": "test_service.yaml",
"input": {
"service_to_call": "test.automation",
},
}
}
}
},
)
assert (
"Blueprint 'Call service' failed to generate script with inputs "
"{'service_to_call': 'test.automation'}: No substitution found for input blah"
in caplog.text
)