"""The tests for the Script component.""" # pylint: disable=protected-access from datetime import timedelta from unittest import mock import unittest import voluptuous as vol import pytest from homeassistant import exceptions from homeassistant.core import Context, callback # Otherwise can't test just this file (import order issue) import homeassistant.components # noqa import homeassistant.util.dt as dt_util from homeassistant.helpers import script, config_validation as cv from tests.common import fire_time_changed, get_test_home_assistant ENTITY_ID = 'script.test' class TestScriptHelper(unittest.TestCase): """Test the Script component.""" # pylint: disable=invalid-name def setUp(self): """Set up things to be run when tests are started.""" self.hass = get_test_home_assistant() # pylint: disable=invalid-name def tearDown(self): """Stop down everything that was started.""" self.hass.stop() def test_firing_event(self): """Test the firing of events.""" event = 'test_event' context = Context() calls = [] @callback def record_event(event): """Add recorded event to set.""" calls.append(event) self.hass.bus.listen(event, record_event) script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA({ 'event': event, 'event_data': { 'hello': 'world' } })) script_obj.run(context=context) self.hass.block_till_done() assert len(calls) == 1 assert calls[0].context is context assert calls[0].data.get('hello') == 'world' assert not script_obj.can_cancel def test_firing_event_template(self): """Test the firing of events.""" event = 'test_event' context = Context() calls = [] @callback def record_event(event): """Add recorded event to set.""" calls.append(event) self.hass.bus.listen(event, record_event) script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA({ 'event': event, 'event_data_template': { 'dict': { 1: '{{ is_world }}', 2: '{{ is_world }}{{ is_world }}', 3: '{{ is_world }}{{ is_world }}{{ is_world }}', }, 'list': [ '{{ is_world }}', '{{ is_world }}{{ is_world }}' ] } })) script_obj.run({'is_world': 'yes'}, context=context) self.hass.block_till_done() assert len(calls) == 1 assert calls[0].context is context assert calls[0].data == { 'dict': { 1: 'yes', 2: 'yesyes', 3: 'yesyesyes', }, 'list': ['yes', 'yesyes'] } assert not script_obj.can_cancel def test_calling_service(self): """Test the calling of a service.""" calls = [] context = Context() @callback def record_call(service): """Add recorded event to set.""" calls.append(service) self.hass.services.register('test', 'script', record_call) script.call_from_config(self.hass, { 'service': 'test.script', 'data': { 'hello': 'world' } }, context=context) self.hass.block_till_done() assert len(calls) == 1 assert calls[0].context is context assert calls[0].data.get('hello') == 'world' def test_calling_service_template(self): """Test the calling of a service.""" calls = [] context = Context() @callback def record_call(service): """Add recorded event to set.""" calls.append(service) self.hass.services.register('test', 'script', record_call) script.call_from_config(self.hass, { 'service_template': """ {% if True %} test.script {% else %} test.not_script {% endif %}""", 'data_template': { 'hello': """ {% if is_world == 'yes' %} world {% else %} not world {% endif %} """ } }, {'is_world': 'yes'}, context=context) self.hass.block_till_done() assert len(calls) == 1 assert calls[0].context is context assert calls[0].data.get('hello') == 'world' def test_delay(self): """Test the delay.""" event = 'test_event' events = [] context = Context() delay_alias = 'delay step' @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, {'delay': {'seconds': 5}, 'alias': delay_alias}, {'event': event}])) script_obj.run(context=context) self.hass.block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == delay_alias assert len(events) == 1 future = dt_util.utcnow() + timedelta(seconds=5) fire_time_changed(self.hass, future) self.hass.block_till_done() assert not script_obj.is_running assert len(events) == 2 assert events[0].context is context assert events[1].context is context def test_delay_template(self): """Test the delay as a template.""" event = 'test_event' events = [] delay_alias = 'delay step' @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, {'delay': '00:00:{{ 5 }}', 'alias': delay_alias}, {'event': event}])) script_obj.run() self.hass.block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == delay_alias assert len(events) == 1 future = dt_util.utcnow() + timedelta(seconds=5) fire_time_changed(self.hass, future) self.hass.block_till_done() assert not script_obj.is_running assert len(events) == 2 def test_delay_invalid_template(self): """Test the delay as a template that fails.""" event = 'test_event' events = [] @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, {'delay': '{{ invalid_delay }}'}, {'delay': {'seconds': 5}}, {'event': event}])) with mock.patch.object(script, '_LOGGER') as mock_logger: script_obj.run() self.hass.block_till_done() assert mock_logger.error.called assert not script_obj.is_running assert len(events) == 1 def test_delay_complex_template(self): """Test the delay with a working complex template.""" event = 'test_event' events = [] delay_alias = 'delay step' @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, {'delay': { 'seconds': '{{ 5 }}'}, 'alias': delay_alias}, {'event': event}])) script_obj.run() self.hass.block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == delay_alias assert len(events) == 1 future = dt_util.utcnow() + timedelta(seconds=5) fire_time_changed(self.hass, future) self.hass.block_till_done() assert not script_obj.is_running assert len(events) == 2 def test_delay_complex_invalid_template(self): """Test the delay with a complex template that fails.""" event = 'test_event' events = [] @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, {'delay': { 'seconds': '{{ invalid_delay }}' }}, {'delay': { 'seconds': '{{ 5 }}' }}, {'event': event}])) with mock.patch.object(script, '_LOGGER') as mock_logger: script_obj.run() self.hass.block_till_done() assert mock_logger.error.called assert not script_obj.is_running assert len(events) == 1 def test_cancel_while_delay(self): """Test the cancelling while the delay is present.""" event = 'test_event' events = [] @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'delay': {'seconds': 5}}, {'event': event}])) script_obj.run() self.hass.block_till_done() assert script_obj.is_running assert len(events) == 0 script_obj.stop() assert not script_obj.is_running # Make sure the script is really stopped. future = dt_util.utcnow() + timedelta(seconds=5) fire_time_changed(self.hass, future) self.hass.block_till_done() assert not script_obj.is_running assert len(events) == 0 def test_wait_template(self): """Test the wait template.""" event = 'test_event' events = [] context = Context() wait_alias = 'wait step' @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) self.hass.states.set('switch.test', 'on') script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, {'wait_template': "{{states.switch.test.state == 'off'}}", 'alias': wait_alias}, {'event': event}])) script_obj.run(context=context) self.hass.block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == wait_alias assert len(events) == 1 self.hass.states.set('switch.test', 'off') self.hass.block_till_done() assert not script_obj.is_running assert len(events) == 2 assert events[0].context is context assert events[1].context is context def test_wait_template_cancel(self): """Test the wait template cancel action.""" event = 'test_event' events = [] wait_alias = 'wait step' @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) self.hass.states.set('switch.test', 'on') script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, {'wait_template': "{{states.switch.test.state == 'off'}}", 'alias': wait_alias}, {'event': event}])) script_obj.run() self.hass.block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == wait_alias assert len(events) == 1 script_obj.stop() assert not script_obj.is_running assert len(events) == 1 self.hass.states.set('switch.test', 'off') self.hass.block_till_done() assert not script_obj.is_running assert len(events) == 1 def test_wait_template_not_schedule(self): """Test the wait template with correct condition.""" event = 'test_event' events = [] @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) self.hass.states.set('switch.test', 'on') script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, {'wait_template': "{{states.switch.test.state == 'on'}}"}, {'event': event}])) script_obj.run() self.hass.block_till_done() assert not script_obj.is_running assert script_obj.can_cancel assert len(events) == 2 def test_wait_template_timeout_halt(self): """Test the wait template, halt on timeout.""" event = 'test_event' events = [] wait_alias = 'wait step' @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) self.hass.states.set('switch.test', 'on') script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, { 'wait_template': "{{states.switch.test.state == 'off'}}", 'continue_on_timeout': False, 'timeout': 5, 'alias': wait_alias }, {'event': event}])) script_obj.run() self.hass.block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == wait_alias assert len(events) == 1 future = dt_util.utcnow() + timedelta(seconds=5) fire_time_changed(self.hass, future) self.hass.block_till_done() assert not script_obj.is_running assert len(events) == 1 def test_wait_template_timeout_continue(self): """Test the wait template with continuing the script.""" event = 'test_event' events = [] wait_alias = 'wait step' @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) self.hass.states.set('switch.test', 'on') script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, { 'wait_template': "{{states.switch.test.state == 'off'}}", 'timeout': 5, 'continue_on_timeout': True, 'alias': wait_alias }, {'event': event}])) script_obj.run() self.hass.block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == wait_alias assert len(events) == 1 future = dt_util.utcnow() + timedelta(seconds=5) fire_time_changed(self.hass, future) self.hass.block_till_done() assert not script_obj.is_running assert len(events) == 2 def test_wait_template_timeout_default(self): """Test the wait template with default contiune.""" event = 'test_event' events = [] wait_alias = 'wait step' @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) self.hass.states.set('switch.test', 'on') script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, { 'wait_template': "{{states.switch.test.state == 'off'}}", 'timeout': 5, 'alias': wait_alias }, {'event': event}])) script_obj.run() self.hass.block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == wait_alias assert len(events) == 1 future = dt_util.utcnow() + timedelta(seconds=5) fire_time_changed(self.hass, future) self.hass.block_till_done() assert not script_obj.is_running assert len(events) == 2 def test_wait_template_variables(self): """Test the wait template with variables.""" event = 'test_event' events = [] wait_alias = 'wait step' @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) self.hass.states.set('switch.test', 'on') script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, {'wait_template': "{{is_state(data, 'off')}}", 'alias': wait_alias}, {'event': event}])) script_obj.run({ 'data': 'switch.test' }) self.hass.block_till_done() assert script_obj.is_running assert script_obj.can_cancel assert script_obj.last_action == wait_alias assert len(events) == 1 self.hass.states.set('switch.test', 'off') self.hass.block_till_done() assert not script_obj.is_running assert len(events) == 2 def test_passing_variables_to_script(self): """Test if we can pass variables to script.""" calls = [] @callback def record_call(service): """Add recorded event to set.""" calls.append(service) self.hass.services.register('test', 'script', record_call) script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ { 'service': 'test.script', 'data_template': { 'hello': '{{ greeting }}', }, }, {'delay': '{{ delay_period }}'}, { 'service': 'test.script', 'data_template': { 'hello': '{{ greeting2 }}', }, }])) script_obj.run({ 'greeting': 'world', 'greeting2': 'universe', 'delay_period': '00:00:05' }) self.hass.block_till_done() assert script_obj.is_running assert len(calls) == 1 assert calls[-1].data['hello'] == 'world' future = dt_util.utcnow() + timedelta(seconds=5) fire_time_changed(self.hass, future) self.hass.block_till_done() assert not script_obj.is_running assert len(calls) == 2 assert calls[-1].data['hello'] == 'universe' def test_condition(self): """Test if we can use conditions in a script.""" event = 'test_event' events = [] @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) self.hass.states.set('test.entity', 'hello') script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, { 'condition': 'template', 'value_template': '{{ states.test.entity.state == "hello" }}', }, {'event': event}, ])) script_obj.run() self.hass.block_till_done() assert len(events) == 2 self.hass.states.set('test.entity', 'goodbye') script_obj.run() self.hass.block_till_done() assert len(events) == 3 @mock.patch('homeassistant.helpers.script.condition.async_from_config') def test_condition_created_once(self, async_from_config): """Test that the conditions do not get created multiple times.""" event = 'test_event' events = [] @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) self.hass.states.set('test.entity', 'hello') script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, { 'condition': 'template', 'value_template': '{{ states.test.entity.state == "hello" }}', }, {'event': event}, ])) script_obj.run() script_obj.run() self.hass.block_till_done() assert async_from_config.call_count == 1 assert len(script_obj._config_cache) == 1 def test_all_conditions_cached(self): """Test that multiple conditions get cached.""" event = 'test_event' events = [] @callback def record_event(event): """Add recorded event to set.""" events.append(event) self.hass.bus.listen(event, record_event) self.hass.states.set('test.entity', 'hello') script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, { 'condition': 'template', 'value_template': '{{ states.test.entity.state == "hello" }}', }, { 'condition': 'template', 'value_template': '{{ states.test.entity.state != "hello" }}', }, {'event': event}, ])) script_obj.run() self.hass.block_till_done() assert len(script_obj._config_cache) == 2 def test_last_triggered(self): """Test the last_triggered.""" event = 'test_event' script_obj = script.Script(self.hass, cv.SCRIPT_SCHEMA([ {'event': event}, {'delay': {'seconds': 5}}, {'event': event}])) assert script_obj.last_triggered is None time = dt_util.utcnow() with mock.patch('homeassistant.helpers.script.date_util.utcnow', return_value=time): script_obj.run() self.hass.block_till_done() assert script_obj.last_triggered == time async def test_propagate_error_service_not_found(hass): """Test that a script aborts when a service is not found.""" events = [] @callback def record_event(event): events.append(event) hass.bus.async_listen('test_event', record_event) script_obj = script.Script(hass, cv.SCRIPT_SCHEMA([ {'service': 'test.script'}, {'event': 'test_event'}])) with pytest.raises(exceptions.ServiceNotFound): await script_obj.async_run() assert len(events) == 0 async def test_propagate_error_invalid_service_data(hass): """Test that a script aborts when we send invalid service data.""" events = [] @callback def record_event(event): events.append(event) hass.bus.async_listen('test_event', record_event) calls = [] @callback def record_call(service): """Add recorded event to set.""" calls.append(service) hass.services.async_register('test', 'script', record_call, schema=vol.Schema({'text': str})) script_obj = script.Script(hass, cv.SCRIPT_SCHEMA([ {'service': 'test.script', 'data': {'text': 1}}, {'event': 'test_event'}])) with pytest.raises(vol.Invalid): await script_obj.async_run() assert len(events) == 0 assert len(calls) == 0 async def test_propagate_error_service_exception(hass): """Test that a script aborts when a service throws an exception.""" events = [] @callback def record_event(event): events.append(event) hass.bus.async_listen('test_event', record_event) calls = [] @callback def record_call(service): """Add recorded event to set.""" raise ValueError("BROKEN") hass.services.async_register('test', 'script', record_call) script_obj = script.Script(hass, cv.SCRIPT_SCHEMA([ {'service': 'test.script'}, {'event': 'test_event'}])) with pytest.raises(ValueError): await script_obj.async_run() assert len(events) == 0 assert len(calls) == 0