"""Test event helpers.""" # pylint: disable=protected-access import asyncio from datetime import datetime, timedelta from astral import Astral import jinja2 import pytest from homeassistant.components import sun from homeassistant.const import MATCH_ALL import homeassistant.core as ha from homeassistant.core import callback from homeassistant.exceptions import TemplateError from homeassistant.helpers.entity_registry import EVENT_ENTITY_REGISTRY_UPDATED from homeassistant.helpers.event import ( TrackStates, TrackTemplate, TrackTemplateResult, async_call_later, async_track_point_in_time, async_track_point_in_utc_time, async_track_same_state, async_track_state_added_domain, async_track_state_change, async_track_state_change_event, async_track_state_change_filtered, async_track_state_removed_domain, async_track_sunrise, async_track_sunset, async_track_template, async_track_template_result, async_track_time_change, async_track_time_interval, async_track_utc_time_change, track_point_in_utc_time, ) from homeassistant.helpers.template import Template from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util from tests.async_mock import patch from tests.common import async_fire_time_changed DEFAULT_TIME_ZONE = dt_util.DEFAULT_TIME_ZONE def teardown(): """Stop everything that was started.""" dt_util.set_default_time_zone(DEFAULT_TIME_ZONE) async def test_track_point_in_time(hass): """Test track point in time.""" before_birthday = datetime(1985, 7, 9, 12, 0, 0, tzinfo=dt_util.UTC) birthday_paulus = datetime(1986, 7, 9, 12, 0, 0, tzinfo=dt_util.UTC) after_birthday = datetime(1987, 7, 9, 12, 0, 0, tzinfo=dt_util.UTC) runs = [] async_track_point_in_utc_time( hass, callback(lambda x: runs.append(x)), birthday_paulus ) async_fire_time_changed(hass, before_birthday) await hass.async_block_till_done() assert len(runs) == 0 async_fire_time_changed(hass, birthday_paulus) await hass.async_block_till_done() assert len(runs) == 1 # A point in time tracker will only fire once, this should do nothing async_fire_time_changed(hass, birthday_paulus) await hass.async_block_till_done() assert len(runs) == 1 async_track_point_in_utc_time( hass, callback(lambda x: runs.append(x)), birthday_paulus ) async_fire_time_changed(hass, after_birthday) await hass.async_block_till_done() assert len(runs) == 2 unsub = async_track_point_in_time( hass, callback(lambda x: runs.append(x)), birthday_paulus ) unsub() async_fire_time_changed(hass, after_birthday) await hass.async_block_till_done() assert len(runs) == 2 async def test_track_state_change_from_to_state_match(hass): """Test track_state_change with from and to state matchers.""" from_and_to_state_runs = [] only_from_runs = [] only_to_runs = [] match_all_runs = [] no_to_from_specified_runs = [] def from_and_to_state_callback(entity_id, old_state, new_state): from_and_to_state_runs.append(1) def only_from_state_callback(entity_id, old_state, new_state): only_from_runs.append(1) def only_to_state_callback(entity_id, old_state, new_state): only_to_runs.append(1) def match_all_callback(entity_id, old_state, new_state): match_all_runs.append(1) def no_to_from_specified_callback(entity_id, old_state, new_state): no_to_from_specified_runs.append(1) async_track_state_change( hass, "light.Bowl", from_and_to_state_callback, "on", "off" ) async_track_state_change(hass, "light.Bowl", only_from_state_callback, "on", None) async_track_state_change( hass, "light.Bowl", only_to_state_callback, None, ["off", "standby"] ) async_track_state_change( hass, "light.Bowl", match_all_callback, MATCH_ALL, MATCH_ALL ) async_track_state_change(hass, "light.Bowl", no_to_from_specified_callback) hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(from_and_to_state_runs) == 0 assert len(only_from_runs) == 0 assert len(only_to_runs) == 0 assert len(match_all_runs) == 1 assert len(no_to_from_specified_runs) == 1 hass.states.async_set("light.Bowl", "off") await hass.async_block_till_done() assert len(from_and_to_state_runs) == 1 assert len(only_from_runs) == 1 assert len(only_to_runs) == 1 assert len(match_all_runs) == 2 assert len(no_to_from_specified_runs) == 2 hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(from_and_to_state_runs) == 1 assert len(only_from_runs) == 1 assert len(only_to_runs) == 1 assert len(match_all_runs) == 3 assert len(no_to_from_specified_runs) == 3 hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(from_and_to_state_runs) == 1 assert len(only_from_runs) == 1 assert len(only_to_runs) == 1 assert len(match_all_runs) == 3 assert len(no_to_from_specified_runs) == 3 hass.states.async_set("light.Bowl", "off") await hass.async_block_till_done() assert len(from_and_to_state_runs) == 2 assert len(only_from_runs) == 2 assert len(only_to_runs) == 2 assert len(match_all_runs) == 4 assert len(no_to_from_specified_runs) == 4 hass.states.async_set("light.Bowl", "off") await hass.async_block_till_done() assert len(from_and_to_state_runs) == 2 assert len(only_from_runs) == 2 assert len(only_to_runs) == 2 assert len(match_all_runs) == 4 assert len(no_to_from_specified_runs) == 4 async def test_track_state_change(hass): """Test track_state_change.""" # 2 lists to track how often our callbacks get called specific_runs = [] wildcard_runs = [] wildercard_runs = [] def specific_run_callback(entity_id, old_state, new_state): specific_runs.append(1) # This is the rare use case async_track_state_change(hass, "light.Bowl", specific_run_callback, "on", "off") @ha.callback def wildcard_run_callback(entity_id, old_state, new_state): wildcard_runs.append((old_state, new_state)) # This is the most common use case async_track_state_change(hass, "light.Bowl", wildcard_run_callback) async def wildercard_run_callback(entity_id, old_state, new_state): wildercard_runs.append((old_state, new_state)) async_track_state_change(hass, MATCH_ALL, wildercard_run_callback) # Adding state to state machine hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(specific_runs) == 0 assert len(wildcard_runs) == 1 assert len(wildercard_runs) == 1 assert wildcard_runs[-1][0] is None assert wildcard_runs[-1][1] is not None # Set same state should not trigger a state change/listener hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(specific_runs) == 0 assert len(wildcard_runs) == 1 assert len(wildercard_runs) == 1 # State change off -> on hass.states.async_set("light.Bowl", "off") await hass.async_block_till_done() assert len(specific_runs) == 1 assert len(wildcard_runs) == 2 assert len(wildercard_runs) == 2 # State change off -> off hass.states.async_set("light.Bowl", "off", {"some_attr": 1}) await hass.async_block_till_done() assert len(specific_runs) == 1 assert len(wildcard_runs) == 3 assert len(wildercard_runs) == 3 # State change off -> on hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(specific_runs) == 1 assert len(wildcard_runs) == 4 assert len(wildercard_runs) == 4 hass.states.async_remove("light.bowl") await hass.async_block_till_done() assert len(specific_runs) == 1 assert len(wildcard_runs) == 5 assert len(wildercard_runs) == 5 assert wildcard_runs[-1][0] is not None assert wildcard_runs[-1][1] is None assert wildercard_runs[-1][0] is not None assert wildercard_runs[-1][1] is None # Set state for different entity id hass.states.async_set("switch.kitchen", "on") await hass.async_block_till_done() assert len(specific_runs) == 1 assert len(wildcard_runs) == 5 assert len(wildercard_runs) == 6 async def test_async_track_state_change_filtered(hass): """Test async_track_state_change_filtered.""" single_entity_id_tracker = [] multiple_entity_id_tracker = [] @ha.callback def single_run_callback(event): old_state = event.data.get("old_state") new_state = event.data.get("new_state") single_entity_id_tracker.append((old_state, new_state)) @ha.callback def multiple_run_callback(event): old_state = event.data.get("old_state") new_state = event.data.get("new_state") multiple_entity_id_tracker.append((old_state, new_state)) @ha.callback def callback_that_throws(event): raise ValueError track_single = async_track_state_change_filtered( hass, TrackStates(False, {"light.bowl"}, None), single_run_callback ) assert track_single.listeners == { "all": False, "domains": None, "entities": {"light.bowl"}, } track_multi = async_track_state_change_filtered( hass, TrackStates(False, {"light.bowl"}, {"switch"}), multiple_run_callback ) assert track_multi.listeners == { "all": False, "domains": {"switch"}, "entities": {"light.bowl"}, } track_throws = async_track_state_change_filtered( hass, TrackStates(False, {"light.bowl"}, {"switch"}), callback_that_throws ) assert track_throws.listeners == { "all": False, "domains": {"switch"}, "entities": {"light.bowl"}, } # Adding state to state machine hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 1 assert single_entity_id_tracker[-1][0] is None assert single_entity_id_tracker[-1][1] is not None assert len(multiple_entity_id_tracker) == 1 assert multiple_entity_id_tracker[-1][0] is None assert multiple_entity_id_tracker[-1][1] is not None # Set same state should not trigger a state change/listener hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 1 assert len(multiple_entity_id_tracker) == 1 # State change off -> on hass.states.async_set("light.Bowl", "off") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 2 assert len(multiple_entity_id_tracker) == 2 # State change off -> off hass.states.async_set("light.Bowl", "off", {"some_attr": 1}) await hass.async_block_till_done() assert len(single_entity_id_tracker) == 3 assert len(multiple_entity_id_tracker) == 3 # State change off -> on hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 4 assert len(multiple_entity_id_tracker) == 4 hass.states.async_remove("light.bowl") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 5 assert single_entity_id_tracker[-1][0] is not None assert single_entity_id_tracker[-1][1] is None assert len(multiple_entity_id_tracker) == 5 assert multiple_entity_id_tracker[-1][0] is not None assert multiple_entity_id_tracker[-1][1] is None # Set state for different entity id hass.states.async_set("switch.kitchen", "on") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 5 assert len(multiple_entity_id_tracker) == 6 track_single.async_remove() # Ensure unsubing the listener works hass.states.async_set("light.Bowl", "off") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 5 assert len(multiple_entity_id_tracker) == 7 assert track_multi.listeners == { "all": False, "domains": {"switch"}, "entities": {"light.bowl"}, } track_multi.async_update_listeners(TrackStates(False, {"light.bowl"}, None)) assert track_multi.listeners == { "all": False, "domains": None, "entities": {"light.bowl"}, } hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(multiple_entity_id_tracker) == 8 hass.states.async_set("switch.kitchen", "off") await hass.async_block_till_done() assert len(multiple_entity_id_tracker) == 8 track_multi.async_update_listeners(TrackStates(True, None, None)) hass.states.async_set("switch.kitchen", "off") await hass.async_block_till_done() assert len(multiple_entity_id_tracker) == 8 hass.states.async_set("switch.any", "off") await hass.async_block_till_done() assert len(multiple_entity_id_tracker) == 9 track_multi.async_remove() track_throws.async_remove() async def test_async_track_state_change_event(hass): """Test async_track_state_change_event.""" single_entity_id_tracker = [] multiple_entity_id_tracker = [] @ha.callback def single_run_callback(event): old_state = event.data.get("old_state") new_state = event.data.get("new_state") single_entity_id_tracker.append((old_state, new_state)) @ha.callback def multiple_run_callback(event): old_state = event.data.get("old_state") new_state = event.data.get("new_state") multiple_entity_id_tracker.append((old_state, new_state)) @ha.callback def callback_that_throws(event): raise ValueError unsub_single = async_track_state_change_event( hass, ["light.Bowl"], single_run_callback ) unsub_multi = async_track_state_change_event( hass, ["light.Bowl", "switch.kitchen"], multiple_run_callback ) unsub_throws = async_track_state_change_event( hass, ["light.Bowl", "switch.kitchen"], callback_that_throws ) # Adding state to state machine hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 1 assert single_entity_id_tracker[-1][0] is None assert single_entity_id_tracker[-1][1] is not None assert len(multiple_entity_id_tracker) == 1 assert multiple_entity_id_tracker[-1][0] is None assert multiple_entity_id_tracker[-1][1] is not None # Set same state should not trigger a state change/listener hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 1 assert len(multiple_entity_id_tracker) == 1 # State change off -> on hass.states.async_set("light.Bowl", "off") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 2 assert len(multiple_entity_id_tracker) == 2 # State change off -> off hass.states.async_set("light.Bowl", "off", {"some_attr": 1}) await hass.async_block_till_done() assert len(single_entity_id_tracker) == 3 assert len(multiple_entity_id_tracker) == 3 # State change off -> on hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 4 assert len(multiple_entity_id_tracker) == 4 hass.states.async_remove("light.bowl") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 5 assert single_entity_id_tracker[-1][0] is not None assert single_entity_id_tracker[-1][1] is None assert len(multiple_entity_id_tracker) == 5 assert multiple_entity_id_tracker[-1][0] is not None assert multiple_entity_id_tracker[-1][1] is None # Set state for different entity id hass.states.async_set("switch.kitchen", "on") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 5 assert len(multiple_entity_id_tracker) == 6 unsub_single() # Ensure unsubing the listener works hass.states.async_set("light.Bowl", "off") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 5 assert len(multiple_entity_id_tracker) == 7 unsub_multi() unsub_throws() async def test_async_track_state_change_event_with_empty_list(hass): """Test async_track_state_change_event passing an empty list of entities.""" unsub_single = async_track_state_change_event( hass, [], ha.callback(lambda event: None) ) unsub_single2 = async_track_state_change_event( hass, [], ha.callback(lambda event: None) ) unsub_single2() unsub_single() async def test_async_track_state_added_domain(hass): """Test async_track_state_added_domain.""" single_entity_id_tracker = [] multiple_entity_id_tracker = [] @ha.callback def single_run_callback(event): old_state = event.data.get("old_state") new_state = event.data.get("new_state") single_entity_id_tracker.append((old_state, new_state)) @ha.callback def multiple_run_callback(event): old_state = event.data.get("old_state") new_state = event.data.get("new_state") multiple_entity_id_tracker.append((old_state, new_state)) @ha.callback def callback_that_throws(event): raise ValueError unsub_single = async_track_state_added_domain(hass, "light", single_run_callback) unsub_multi = async_track_state_added_domain( hass, ["light", "switch"], multiple_run_callback ) unsub_throws = async_track_state_added_domain( hass, ["light", "switch"], callback_that_throws ) # Adding state to state machine hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 1 assert single_entity_id_tracker[-1][0] is None assert single_entity_id_tracker[-1][1] is not None assert len(multiple_entity_id_tracker) == 1 assert multiple_entity_id_tracker[-1][0] is None assert multiple_entity_id_tracker[-1][1] is not None # Set same state should not trigger a state change/listener hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 1 assert len(multiple_entity_id_tracker) == 1 # State change off -> on - nothing added so no trigger hass.states.async_set("light.Bowl", "off") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 1 assert len(multiple_entity_id_tracker) == 1 # State change off -> off - nothing added so no trigger hass.states.async_set("light.Bowl", "off", {"some_attr": 1}) await hass.async_block_till_done() assert len(single_entity_id_tracker) == 1 assert len(multiple_entity_id_tracker) == 1 # Removing state does not trigger hass.states.async_remove("light.bowl") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 1 assert len(multiple_entity_id_tracker) == 1 # Set state for different entity id hass.states.async_set("switch.kitchen", "on") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 1 assert len(multiple_entity_id_tracker) == 2 unsub_single() # Ensure unsubing the listener works hass.states.async_set("light.new", "off") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 1 assert len(multiple_entity_id_tracker) == 3 unsub_multi() unsub_throws() async def test_async_track_state_added_domain_with_empty_list(hass): """Test async_track_state_added_domain passing an empty list of domains.""" unsub_single = async_track_state_added_domain( hass, [], ha.callback(lambda event: None) ) unsub_single2 = async_track_state_added_domain( hass, [], ha.callback(lambda event: None) ) unsub_single2() unsub_single() async def test_async_track_state_removed_domain_with_empty_list(hass): """Test async_track_state_removed_domain passing an empty list of domains.""" unsub_single = async_track_state_removed_domain( hass, [], ha.callback(lambda event: None) ) unsub_single2 = async_track_state_removed_domain( hass, [], ha.callback(lambda event: None) ) unsub_single2() unsub_single() async def test_async_track_state_removed_domain(hass): """Test async_track_state_removed_domain.""" single_entity_id_tracker = [] multiple_entity_id_tracker = [] @ha.callback def single_run_callback(event): old_state = event.data.get("old_state") new_state = event.data.get("new_state") single_entity_id_tracker.append((old_state, new_state)) @ha.callback def multiple_run_callback(event): old_state = event.data.get("old_state") new_state = event.data.get("new_state") multiple_entity_id_tracker.append((old_state, new_state)) @ha.callback def callback_that_throws(event): raise ValueError unsub_single = async_track_state_removed_domain(hass, "light", single_run_callback) unsub_multi = async_track_state_removed_domain( hass, ["light", "switch"], multiple_run_callback ) unsub_throws = async_track_state_removed_domain( hass, ["light", "switch"], callback_that_throws ) # Adding state to state machine hass.states.async_set("light.Bowl", "on") hass.states.async_remove("light.Bowl") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 1 assert single_entity_id_tracker[-1][1] is None assert single_entity_id_tracker[-1][0] is not None assert len(multiple_entity_id_tracker) == 1 assert multiple_entity_id_tracker[-1][1] is None assert multiple_entity_id_tracker[-1][0] is not None # Added and than removed (light) hass.states.async_set("light.Bowl", "on") hass.states.async_remove("light.Bowl") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 2 assert len(multiple_entity_id_tracker) == 2 # Added and than removed (light) hass.states.async_set("light.Bowl", "off") hass.states.async_remove("light.Bowl") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 3 assert len(multiple_entity_id_tracker) == 3 # Added and than removed (light) hass.states.async_set("light.Bowl", "off", {"some_attr": 1}) hass.states.async_remove("light.Bowl") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 4 assert len(multiple_entity_id_tracker) == 4 # Added and than removed (switch) hass.states.async_set("switch.kitchen", "on") hass.states.async_remove("switch.kitchen") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 4 assert len(multiple_entity_id_tracker) == 5 unsub_single() # Ensure unsubing the listener works hass.states.async_set("light.new", "off") hass.states.async_remove("light.new") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 4 assert len(multiple_entity_id_tracker) == 6 unsub_multi() unsub_throws() async def test_async_track_state_removed_domain_match_all(hass): """Test async_track_state_removed_domain with a match_all.""" single_entity_id_tracker = [] match_all_entity_id_tracker = [] @ha.callback def single_run_callback(event): old_state = event.data.get("old_state") new_state = event.data.get("new_state") single_entity_id_tracker.append((old_state, new_state)) @ha.callback def match_all_run_callback(event): old_state = event.data.get("old_state") new_state = event.data.get("new_state") match_all_entity_id_tracker.append((old_state, new_state)) unsub_single = async_track_state_removed_domain(hass, "light", single_run_callback) unsub_match_all = async_track_state_removed_domain( hass, MATCH_ALL, match_all_run_callback ) hass.states.async_set("light.new", "off") hass.states.async_remove("light.new") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 1 assert len(match_all_entity_id_tracker) == 1 hass.states.async_set("switch.new", "off") hass.states.async_remove("switch.new") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 1 assert len(match_all_entity_id_tracker) == 2 unsub_match_all() unsub_single() hass.states.async_set("switch.new", "off") hass.states.async_remove("switch.new") await hass.async_block_till_done() assert len(single_entity_id_tracker) == 1 assert len(match_all_entity_id_tracker) == 2 async def test_track_template(hass): """Test tracking template.""" specific_runs = [] wildcard_runs = [] wildercard_runs = [] template_condition = Template("{{states.switch.test.state == 'on'}}", hass) template_condition_var = Template( "{{states.switch.test.state == 'on' and test == 5}}", hass ) hass.states.async_set("switch.test", "off") def specific_run_callback(entity_id, old_state, new_state): specific_runs.append(1) async_track_template(hass, template_condition, specific_run_callback) @ha.callback def wildcard_run_callback(entity_id, old_state, new_state): wildcard_runs.append((old_state, new_state)) async_track_template(hass, template_condition, wildcard_run_callback) async def wildercard_run_callback(entity_id, old_state, new_state): wildercard_runs.append((old_state, new_state)) async_track_template( hass, template_condition_var, wildercard_run_callback, {"test": 5} ) hass.states.async_set("switch.test", "on") await hass.async_block_till_done() assert len(specific_runs) == 1 assert len(wildcard_runs) == 1 assert len(wildercard_runs) == 1 hass.states.async_set("switch.test", "on") await hass.async_block_till_done() assert len(specific_runs) == 1 assert len(wildcard_runs) == 1 assert len(wildercard_runs) == 1 hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert len(specific_runs) == 1 assert len(wildcard_runs) == 1 assert len(wildercard_runs) == 1 hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert len(specific_runs) == 1 assert len(wildcard_runs) == 1 assert len(wildercard_runs) == 1 hass.states.async_set("switch.test", "on") await hass.async_block_till_done() assert len(specific_runs) == 2 assert len(wildcard_runs) == 2 assert len(wildercard_runs) == 2 template_iterate = Template("{{ (states.switch | length) > 0 }}", hass) iterate_calls = [] @ha.callback def iterate_callback(entity_id, old_state, new_state): iterate_calls.append((entity_id, old_state, new_state)) async_track_template(hass, template_iterate, iterate_callback) await hass.async_block_till_done() hass.states.async_set("switch.new", "on") await hass.async_block_till_done() assert len(iterate_calls) == 1 assert iterate_calls[0][0] == "switch.new" assert iterate_calls[0][1] is None assert iterate_calls[0][2].state == "on" async def test_track_template_error(hass, caplog): """Test tracking template with error.""" template_error = Template("{{ (states.switch | lunch) > 0 }}", hass) error_calls = [] @ha.callback def error_callback(entity_id, old_state, new_state): error_calls.append((entity_id, old_state, new_state)) async_track_template(hass, template_error, error_callback) await hass.async_block_till_done() hass.states.async_set("switch.new", "on") await hass.async_block_till_done() assert not error_calls assert "lunch" in caplog.text assert "TemplateAssertionError" in caplog.text caplog.clear() with patch.object(Template, "async_render") as render: render.return_value = "ok" hass.states.async_set("switch.not_exist", "off") await hass.async_block_till_done() assert "no filter named 'lunch'" not in caplog.text assert "TemplateAssertionError" not in caplog.text async def test_track_template_error_can_recover(hass, caplog): """Test tracking template with error.""" hass.states.async_set("switch.data_system", "cow", {"opmode": 0}) template_error = Template( "{{ states.sensor.data_system.attributes['opmode'] == '0' }}", hass ) error_calls = [] @ha.callback def error_callback(entity_id, old_state, new_state): error_calls.append((entity_id, old_state, new_state)) async_track_template(hass, template_error, error_callback) await hass.async_block_till_done() assert not error_calls hass.states.async_remove("switch.data_system") assert "UndefinedError" in caplog.text hass.states.async_set("switch.data_system", "cow", {"opmode": 0}) caplog.clear() assert "UndefinedError" not in caplog.text async def test_track_template_result(hass): """Test tracking template.""" specific_runs = [] wildcard_runs = [] wildercard_runs = [] template_condition = Template("{{states.sensor.test.state}}", hass) template_condition_var = Template( "{{(states.sensor.test.state|int) + test }}", hass ) def specific_run_callback(event, updates): track_result = updates.pop() specific_runs.append(int(track_result.result)) async_track_template_result( hass, [TrackTemplate(template_condition, None)], specific_run_callback ) @ha.callback def wildcard_run_callback(event, updates): track_result = updates.pop() wildcard_runs.append( (int(track_result.last_result or 0), int(track_result.result)) ) async_track_template_result( hass, [TrackTemplate(template_condition, None)], wildcard_run_callback ) async def wildercard_run_callback(event, updates): track_result = updates.pop() wildercard_runs.append( (int(track_result.last_result or 0), int(track_result.result)) ) async_track_template_result( hass, [TrackTemplate(template_condition_var, {"test": 5})], wildercard_run_callback, ) await hass.async_block_till_done() hass.states.async_set("sensor.test", 5) await hass.async_block_till_done() assert specific_runs == [5] assert wildcard_runs == [(0, 5)] assert wildercard_runs == [(0, 10)] hass.states.async_set("sensor.test", 30) await hass.async_block_till_done() assert specific_runs == [5, 30] assert wildcard_runs == [(0, 5), (5, 30)] assert wildercard_runs == [(0, 10), (10, 35)] hass.states.async_set("sensor.test", 30) await hass.async_block_till_done() assert len(specific_runs) == 2 assert len(wildcard_runs) == 2 assert len(wildercard_runs) == 2 hass.states.async_set("sensor.test", 5) await hass.async_block_till_done() assert len(specific_runs) == 3 assert len(wildcard_runs) == 3 assert len(wildercard_runs) == 3 hass.states.async_set("sensor.test", 5) await hass.async_block_till_done() assert len(specific_runs) == 3 assert len(wildcard_runs) == 3 assert len(wildercard_runs) == 3 hass.states.async_set("sensor.test", 20) await hass.async_block_till_done() assert len(specific_runs) == 4 assert len(wildcard_runs) == 4 assert len(wildercard_runs) == 4 async def test_track_template_result_complex(hass): """Test tracking template.""" specific_runs = [] template_complex_str = """ {% if states("sensor.domain") == "light" %} {{ states.light | map(attribute='entity_id') | list }} {% elif states("sensor.domain") == "lock" %} {{ states.lock | map(attribute='entity_id') | list }} {% elif states("sensor.domain") == "single_binary_sensor" %} {{ states("binary_sensor.single") }} {% else %} {{ states | map(attribute='entity_id') | list }} {% endif %} """ template_complex = Template(template_complex_str, hass) def specific_run_callback(event, updates): specific_runs.append(updates.pop().result) hass.states.async_set("light.one", "on") hass.states.async_set("lock.one", "locked") info = async_track_template_result( hass, [TrackTemplate(template_complex, None, timedelta(seconds=0))], specific_run_callback, ) await hass.async_block_till_done() assert info.listeners == {"all": True, "domains": set(), "entities": set()} hass.states.async_set("sensor.domain", "light") await hass.async_block_till_done() assert len(specific_runs) == 1 assert specific_runs[0] == ["light.one"] assert info.listeners == { "all": False, "domains": {"light"}, "entities": {"sensor.domain"}, } hass.states.async_set("sensor.domain", "lock") await hass.async_block_till_done() assert len(specific_runs) == 2 assert specific_runs[1] == ["lock.one"] assert info.listeners == { "all": False, "domains": {"lock"}, "entities": {"sensor.domain"}, } hass.states.async_set("sensor.domain", "all") await hass.async_block_till_done() assert len(specific_runs) == 3 assert "light.one" in specific_runs[2] assert "lock.one" in specific_runs[2] assert "sensor.domain" in specific_runs[2] assert info.listeners == {"all": True, "domains": set(), "entities": set()} hass.states.async_set("sensor.domain", "light") await hass.async_block_till_done() assert len(specific_runs) == 4 assert specific_runs[3] == ["light.one"] assert info.listeners == { "all": False, "domains": {"light"}, "entities": {"sensor.domain"}, } hass.states.async_set("light.two", "on") await hass.async_block_till_done() assert len(specific_runs) == 5 assert "light.one" in specific_runs[4] assert "light.two" in specific_runs[4] assert "sensor.domain" not in specific_runs[4] assert info.listeners == { "all": False, "domains": {"light"}, "entities": {"sensor.domain"}, } hass.states.async_set("light.three", "on") await hass.async_block_till_done() assert len(specific_runs) == 6 assert "light.one" in specific_runs[5] assert "light.two" in specific_runs[5] assert "light.three" in specific_runs[5] assert "sensor.domain" not in specific_runs[5] assert info.listeners == { "all": False, "domains": {"light"}, "entities": {"sensor.domain"}, } hass.states.async_set("sensor.domain", "lock") await hass.async_block_till_done() assert len(specific_runs) == 7 assert specific_runs[6] == ["lock.one"] assert info.listeners == { "all": False, "domains": {"lock"}, "entities": {"sensor.domain"}, } hass.states.async_set("sensor.domain", "single_binary_sensor") await hass.async_block_till_done() assert len(specific_runs) == 8 assert specific_runs[7] == "unknown" assert info.listeners == { "all": False, "domains": set(), "entities": {"binary_sensor.single", "sensor.domain"}, } hass.states.async_set("binary_sensor.single", "binary_sensor_on") await hass.async_block_till_done() assert len(specific_runs) == 9 assert specific_runs[8] == "binary_sensor_on" assert info.listeners == { "all": False, "domains": set(), "entities": {"binary_sensor.single", "sensor.domain"}, } hass.states.async_set("sensor.domain", "lock") await hass.async_block_till_done() assert len(specific_runs) == 10 assert specific_runs[9] == ["lock.one"] assert info.listeners == { "all": False, "domains": {"lock"}, "entities": {"sensor.domain"}, } async def test_track_template_result_with_wildcard(hass): """Test tracking template with a wildcard.""" specific_runs = [] template_complex_str = r""" {% for state in states %} {% if state.entity_id | regex_match('.*\.office_') %} {{ state.entity_id }}={{ state.state }} {% endif %} {% endfor %} """ template_complex = Template(template_complex_str, hass) def specific_run_callback(event, updates): specific_runs.append(updates.pop().result) hass.states.async_set("cover.office_drapes", "closed") hass.states.async_set("cover.office_window", "closed") hass.states.async_set("cover.office_skylight", "open") info = async_track_template_result( hass, [TrackTemplate(template_complex, None)], specific_run_callback ) await hass.async_block_till_done() hass.states.async_set("cover.office_window", "open") await hass.async_block_till_done() assert len(specific_runs) == 1 assert info.listeners == {"all": True, "domains": set(), "entities": set()} assert "cover.office_drapes=closed" in specific_runs[0] assert "cover.office_window=open" in specific_runs[0] assert "cover.office_skylight=open" in specific_runs[0] async def test_track_template_result_with_group(hass): """Test tracking template with a group.""" hass.states.async_set("sensor.power_1", 0) hass.states.async_set("sensor.power_2", 200.2) hass.states.async_set("sensor.power_3", 400.4) hass.states.async_set("sensor.power_4", 800.8) assert await async_setup_component( hass, "group", {"group": {"power_sensors": "sensor.power_1,sensor.power_2,sensor.power_3"}}, ) await hass.async_block_till_done() assert hass.states.get("group.power_sensors") assert hass.states.get("group.power_sensors").state specific_runs = [] template_complex_str = r""" {{ states.group.power_sensors.attributes.entity_id | expand | map(attribute='state')|map('float')|sum }} """ template_complex = Template(template_complex_str, hass) def specific_run_callback(event, updates): specific_runs.append(updates.pop().result) info = async_track_template_result( hass, [TrackTemplate(template_complex, None)], specific_run_callback ) await hass.async_block_till_done() assert info.listeners == { "all": False, "domains": set(), "entities": { "group.power_sensors", "sensor.power_1", "sensor.power_2", "sensor.power_3", }, } hass.states.async_set("sensor.power_1", 100.1) await hass.async_block_till_done() assert len(specific_runs) == 1 assert specific_runs[0] == 100.1 + 200.2 + 400.4 hass.states.async_set("sensor.power_3", 0) await hass.async_block_till_done() assert len(specific_runs) == 2 assert specific_runs[1] == 100.1 + 200.2 + 0 with patch( "homeassistant.config.load_yaml_config_file", return_value={ "group": { "power_sensors": "sensor.power_1,sensor.power_2,sensor.power_3,sensor.power_4", } }, ): await hass.services.async_call("group", "reload") await hass.async_block_till_done() info.async_refresh() await hass.async_block_till_done() assert specific_runs[-1] == 100.1 + 200.2 + 0 + 800.8 async def test_track_template_result_and_conditional(hass): """Test tracking template with an and conditional.""" specific_runs = [] hass.states.async_set("light.a", "off") hass.states.async_set("light.b", "off") template_str = '{% if states.light.a.state == "on" and states.light.b.state == "on" %}on{% else %}off{% endif %}' template = Template(template_str, hass) def specific_run_callback(event, updates): specific_runs.append(updates.pop().result) info = async_track_template_result( hass, [TrackTemplate(template, None)], specific_run_callback ) await hass.async_block_till_done() assert info.listeners == {"all": False, "domains": set(), "entities": {"light.a"}} hass.states.async_set("light.b", "on") await hass.async_block_till_done() assert len(specific_runs) == 0 hass.states.async_set("light.a", "on") await hass.async_block_till_done() assert len(specific_runs) == 1 assert specific_runs[0] == "on" assert info.listeners == { "all": False, "domains": set(), "entities": {"light.a", "light.b"}, } hass.states.async_set("light.b", "off") await hass.async_block_till_done() assert len(specific_runs) == 2 assert specific_runs[1] == "off" assert info.listeners == { "all": False, "domains": set(), "entities": {"light.a", "light.b"}, } hass.states.async_set("light.a", "off") await hass.async_block_till_done() assert len(specific_runs) == 2 hass.states.async_set("light.b", "on") await hass.async_block_till_done() assert len(specific_runs) == 2 hass.states.async_set("light.a", "on") await hass.async_block_till_done() assert len(specific_runs) == 3 assert specific_runs[2] == "on" async def test_track_template_result_iterator(hass): """Test tracking template.""" iterator_runs = [] @ha.callback def iterator_callback(event, updates): iterator_runs.append(updates.pop().result) async_track_template_result( hass, [ TrackTemplate( Template( """ {% for state in states.sensor %} {% if state.state == 'on' %} {{ state.entity_id }}, {% endif %} {% endfor %} """, hass, ), None, timedelta(seconds=0), ) ], iterator_callback, ) await hass.async_block_till_done() hass.states.async_set("sensor.test", 5) await hass.async_block_till_done() assert iterator_runs == [""] filter_runs = [] @ha.callback def filter_callback(event, updates): filter_runs.append(updates.pop().result) info = async_track_template_result( hass, [ TrackTemplate( Template( """{{ states.sensor|selectattr("state","equalto","on") |join(",", attribute="entity_id") }}""", hass, ), None, timedelta(seconds=0), ) ], filter_callback, ) await hass.async_block_till_done() assert info.listeners == { "all": False, "domains": {"sensor"}, "entities": set(), } hass.states.async_set("sensor.test", 6) await hass.async_block_till_done() assert filter_runs == [""] assert iterator_runs == [""] hass.states.async_set("sensor.new", "on") await hass.async_block_till_done() assert iterator_runs == ["", "sensor.new,"] assert filter_runs == ["", "sensor.new"] async def test_track_template_result_errors(hass, caplog): """Test tracking template with errors in the template.""" template_syntax_error = Template("{{states.switch", hass) template_not_exist = Template("{{states.switch.not_exist.state }}", hass) syntax_error_runs = [] not_exist_runs = [] @ha.callback def syntax_error_listener(event, updates): track_result = updates.pop() syntax_error_runs.append( ( event, track_result.template, track_result.last_result, track_result.result, ) ) async_track_template_result( hass, [TrackTemplate(template_syntax_error, None)], syntax_error_listener ) await hass.async_block_till_done() assert len(syntax_error_runs) == 0 assert "TemplateSyntaxError" in caplog.text @ha.callback def not_exist_runs_error_listener(event, updates): template_track = updates.pop() not_exist_runs.append( ( event, template_track.template, template_track.last_result, template_track.result, ) ) async_track_template_result( hass, [TrackTemplate(template_not_exist, None)], not_exist_runs_error_listener, ) await hass.async_block_till_done() assert len(syntax_error_runs) == 0 assert len(not_exist_runs) == 0 hass.states.async_set("switch.not_exist", "off") await hass.async_block_till_done() assert len(not_exist_runs) == 1 assert not_exist_runs[0][0].data.get("entity_id") == "switch.not_exist" assert not_exist_runs[0][1] == template_not_exist assert not_exist_runs[0][2] is None assert not_exist_runs[0][3] == "off" hass.states.async_set("switch.not_exist", "on") await hass.async_block_till_done() assert len(syntax_error_runs) == 1 assert len(not_exist_runs) == 2 assert not_exist_runs[1][0].data.get("entity_id") == "switch.not_exist" assert not_exist_runs[1][1] == template_not_exist assert not_exist_runs[1][2] == "off" assert not_exist_runs[1][3] == "on" with patch.object(Template, "async_render") as render: render.side_effect = TemplateError(jinja2.TemplateError()) hass.states.async_set("switch.not_exist", "off") await hass.async_block_till_done() assert len(not_exist_runs) == 3 assert not_exist_runs[2][0].data.get("entity_id") == "switch.not_exist" assert not_exist_runs[2][1] == template_not_exist assert not_exist_runs[2][2] == "on" assert isinstance(not_exist_runs[2][3], TemplateError) async def test_static_string(hass): """Test a static string.""" template_refresh = Template("{{ 'static' }}", hass) refresh_runs = [] @ha.callback def refresh_listener(event, updates): refresh_runs.append(updates.pop().result) info = async_track_template_result( hass, [TrackTemplate(template_refresh, None)], refresh_listener ) await hass.async_block_till_done() info.async_refresh() await hass.async_block_till_done() assert refresh_runs == ["static"] async def test_track_template_rate_limit(hass): """Test template rate limit.""" template_refresh = Template("{{ states | count }}", hass) refresh_runs = [] @ha.callback def refresh_listener(event, updates): refresh_runs.append(updates.pop().result) info = async_track_template_result( hass, [TrackTemplate(template_refresh, None, timedelta(seconds=0.1))], refresh_listener, ) await hass.async_block_till_done() info.async_refresh() await hass.async_block_till_done() assert refresh_runs == [0] hass.states.async_set("sensor.one", "any") await hass.async_block_till_done() assert refresh_runs == [0] info.async_refresh() assert refresh_runs == [0, 1] hass.states.async_set("sensor.two", "any") await hass.async_block_till_done() assert refresh_runs == [0, 1] next_time = dt_util.utcnow() + timedelta(seconds=0.125) with patch( "homeassistant.helpers.ratelimit.dt_util.utcnow", return_value=next_time ): async_fire_time_changed(hass, next_time) await hass.async_block_till_done() assert refresh_runs == [0, 1, 2] hass.states.async_set("sensor.three", "any") await hass.async_block_till_done() assert refresh_runs == [0, 1, 2] hass.states.async_set("sensor.four", "any") await hass.async_block_till_done() assert refresh_runs == [0, 1, 2] next_time = dt_util.utcnow() + timedelta(seconds=0.125 * 2) with patch( "homeassistant.helpers.ratelimit.dt_util.utcnow", return_value=next_time ): async_fire_time_changed(hass, next_time) await hass.async_block_till_done() assert refresh_runs == [0, 1, 2, 4] hass.states.async_set("sensor.five", "any") await hass.async_block_till_done() assert refresh_runs == [0, 1, 2, 4] async def test_track_template_rate_limit_five(hass): """Test template rate limit of 5 seconds.""" template_refresh = Template("{{ states | count }}", hass) refresh_runs = [] @ha.callback def refresh_listener(event, updates): refresh_runs.append(updates.pop().result) info = async_track_template_result( hass, [TrackTemplate(template_refresh, None, timedelta(seconds=5))], refresh_listener, ) await hass.async_block_till_done() info.async_refresh() await hass.async_block_till_done() assert refresh_runs == [0] hass.states.async_set("sensor.one", "any") await hass.async_block_till_done() assert refresh_runs == [0] info.async_refresh() assert refresh_runs == [0, 1] hass.states.async_set("sensor.two", "any") await hass.async_block_till_done() assert refresh_runs == [0, 1] hass.states.async_set("sensor.three", "any") await hass.async_block_till_done() assert refresh_runs == [0, 1] async def test_track_template_has_default_rate_limit(hass): """Test template has a rate limit by default.""" hass.states.async_set("sensor.zero", "any") template_refresh = Template("{{ states | list | count }}", hass) refresh_runs = [] @ha.callback def refresh_listener(event, updates): refresh_runs.append(updates.pop().result) info = async_track_template_result( hass, [TrackTemplate(template_refresh, None)], refresh_listener, ) await hass.async_block_till_done() info.async_refresh() await hass.async_block_till_done() assert refresh_runs == [1] hass.states.async_set("sensor.one", "any") await hass.async_block_till_done() assert refresh_runs == [1] info.async_refresh() assert refresh_runs == [1, 2] hass.states.async_set("sensor.two", "any") await hass.async_block_till_done() assert refresh_runs == [1, 2] hass.states.async_set("sensor.three", "any") await hass.async_block_till_done() assert refresh_runs == [1, 2] async def test_track_template_unavailable_sates_has_default_rate_limit(hass): """Test template watching for unavailable states has a rate limit by default.""" hass.states.async_set("sensor.zero", "unknown") template_refresh = Template( "{{ states | selectattr('state', 'in', ['unavailable', 'unknown', 'none']) | list | count }}", hass, ) refresh_runs = [] @ha.callback def refresh_listener(event, updates): refresh_runs.append(updates.pop().result) info = async_track_template_result( hass, [TrackTemplate(template_refresh, None)], refresh_listener, ) await hass.async_block_till_done() info.async_refresh() await hass.async_block_till_done() assert refresh_runs == [1] hass.states.async_set("sensor.one", "unknown") await hass.async_block_till_done() assert refresh_runs == [1] info.async_refresh() assert refresh_runs == [1, 2] hass.states.async_set("sensor.two", "any") await hass.async_block_till_done() assert refresh_runs == [1, 2] hass.states.async_set("sensor.three", "unknown") await hass.async_block_till_done() assert refresh_runs == [1, 2] info.async_refresh() await hass.async_block_till_done() assert refresh_runs == [1, 2, 3] async def test_specifically_referenced_entity_is_not_rate_limited(hass): """Test template rate limit of 5 seconds.""" hass.states.async_set("sensor.one", "none") template_refresh = Template('{{ states | count }}_{{ states("sensor.one") }}', hass) refresh_runs = [] @ha.callback def refresh_listener(event, updates): refresh_runs.append(updates.pop().result) info = async_track_template_result( hass, [TrackTemplate(template_refresh, None, timedelta(seconds=5))], refresh_listener, ) await hass.async_block_till_done() info.async_refresh() await hass.async_block_till_done() assert refresh_runs == ["1_none"] hass.states.async_set("sensor.one", "any") await hass.async_block_till_done() assert refresh_runs == ["1_none", "1_any"] info.async_refresh() assert refresh_runs == ["1_none", "1_any"] hass.states.async_set("sensor.two", "any") await hass.async_block_till_done() assert refresh_runs == ["1_none", "1_any"] hass.states.async_set("sensor.three", "any") await hass.async_block_till_done() assert refresh_runs == ["1_none", "1_any"] hass.states.async_set("sensor.one", "none") await hass.async_block_till_done() assert refresh_runs == ["1_none", "1_any", "3_none"] async def test_track_two_templates_with_different_rate_limits(hass): """Test two templates with different rate limits.""" template_one = Template("{{ states | count }} ", hass) template_five = Template("{{ states | count }}", hass) refresh_runs = { template_one: [], template_five: [], } @ha.callback def refresh_listener(event, updates): for update in updates: refresh_runs[update.template].append(update.result) info = async_track_template_result( hass, [ TrackTemplate(template_one, None, timedelta(seconds=0.1)), TrackTemplate(template_five, None, timedelta(seconds=5)), ], refresh_listener, ) await hass.async_block_till_done() info.async_refresh() await hass.async_block_till_done() assert refresh_runs[template_one] == [0] assert refresh_runs[template_five] == [0] hass.states.async_set("sensor.one", "any") await hass.async_block_till_done() assert refresh_runs[template_one] == [0] assert refresh_runs[template_five] == [0] info.async_refresh() assert refresh_runs[template_one] == [0, 1] assert refresh_runs[template_five] == [0, 1] hass.states.async_set("sensor.two", "any") await hass.async_block_till_done() assert refresh_runs[template_one] == [0, 1] assert refresh_runs[template_five] == [0, 1] next_time = dt_util.utcnow() + timedelta(seconds=0.125 * 1) with patch( "homeassistant.helpers.ratelimit.dt_util.utcnow", return_value=next_time ): async_fire_time_changed(hass, next_time) await hass.async_block_till_done() await hass.async_block_till_done() assert refresh_runs[template_one] == [0, 1, 2] assert refresh_runs[template_five] == [0, 1] hass.states.async_set("sensor.three", "any") await hass.async_block_till_done() assert refresh_runs[template_one] == [0, 1, 2] assert refresh_runs[template_five] == [0, 1] hass.states.async_set("sensor.four", "any") await hass.async_block_till_done() assert refresh_runs[template_one] == [0, 1, 2] assert refresh_runs[template_five] == [0, 1] hass.states.async_set("sensor.five", "any") await hass.async_block_till_done() assert refresh_runs[template_one] == [0, 1, 2] assert refresh_runs[template_five] == [0, 1] async def test_string(hass): """Test a string.""" template_refresh = Template("no_template", hass) refresh_runs = [] @ha.callback def refresh_listener(event, updates): refresh_runs.append(updates.pop().result) info = async_track_template_result( hass, [TrackTemplate(template_refresh, None)], refresh_listener ) await hass.async_block_till_done() info.async_refresh() await hass.async_block_till_done() assert refresh_runs == ["no_template"] async def test_track_template_result_refresh_cancel(hass): """Test cancelling and refreshing result.""" template_refresh = Template("{{states.switch.test.state == 'on' and now() }}", hass) refresh_runs = [] @ha.callback def refresh_listener(event, updates): refresh_runs.append(updates.pop().result) info = async_track_template_result( hass, [TrackTemplate(template_refresh, None)], refresh_listener ) await hass.async_block_till_done() hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert refresh_runs == [False] assert len(refresh_runs) == 1 info.async_refresh() hass.states.async_set("switch.test", "on") await hass.async_block_till_done() assert len(refresh_runs) == 2 assert refresh_runs[0] != refresh_runs[1] info.async_remove() hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert len(refresh_runs) == 2 template_refresh = Template("{{ value }}", hass) refresh_runs = [] info = async_track_template_result( hass, [TrackTemplate(template_refresh, {"value": "duck"})], refresh_listener, ) await hass.async_block_till_done() info.async_refresh() await hass.async_block_till_done() assert refresh_runs == ["duck"] info.async_refresh() await hass.async_block_till_done() assert refresh_runs == ["duck"] async def test_async_track_template_result_multiple_templates(hass): """Test tracking multiple templates.""" template_1 = Template("{{ states.switch.test.state == 'on' }}") template_2 = Template("{{ states.switch.test.state == 'on' }}") template_3 = Template("{{ states.switch.test.state == 'off' }}") template_4 = Template( "{{ states.binary_sensor | map(attribute='entity_id') | list }}" ) refresh_runs = [] @ha.callback def refresh_listener(event, updates): refresh_runs.append(updates) async_track_template_result( hass, [ TrackTemplate(template_1, None), TrackTemplate(template_2, None), TrackTemplate(template_3, None), TrackTemplate(template_4, None), ], refresh_listener, ) hass.states.async_set("switch.test", "on") await hass.async_block_till_done() assert refresh_runs == [ [ TrackTemplateResult(template_1, None, True), TrackTemplateResult(template_2, None, True), TrackTemplateResult(template_3, None, False), ] ] refresh_runs = [] hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert refresh_runs == [ [ TrackTemplateResult(template_1, True, False), TrackTemplateResult(template_2, True, False), TrackTemplateResult(template_3, False, True), ] ] refresh_runs = [] hass.states.async_set("binary_sensor.test", "off") await hass.async_block_till_done() assert refresh_runs == [ [TrackTemplateResult(template_4, None, ["binary_sensor.test"])] ] async def test_async_track_template_result_multiple_templates_mixing_domain(hass): """Test tracking multiple templates when tracking entities and an entire domain.""" template_1 = Template("{{ states.switch.test.state == 'on' }}") template_2 = Template("{{ states.switch.test.state == 'on' }}") template_3 = Template("{{ states.switch.test.state == 'off' }}") template_4 = Template("{{ states.switch | map(attribute='entity_id') | list }}") refresh_runs = [] @ha.callback def refresh_listener(event, updates): refresh_runs.append(updates) async_track_template_result( hass, [ TrackTemplate(template_1, None), TrackTemplate(template_2, None), TrackTemplate(template_3, None), TrackTemplate(template_4, None, timedelta(seconds=0)), ], refresh_listener, ) hass.states.async_set("switch.test", "on") await hass.async_block_till_done() assert refresh_runs == [ [ TrackTemplateResult(template_1, None, True), TrackTemplateResult(template_2, None, True), TrackTemplateResult(template_3, None, False), TrackTemplateResult(template_4, None, ["switch.test"]), ] ] refresh_runs = [] hass.states.async_set("switch.test", "off") await hass.async_block_till_done() assert refresh_runs == [ [ TrackTemplateResult(template_1, True, False), TrackTemplateResult(template_2, True, False), TrackTemplateResult(template_3, False, True), ] ] refresh_runs = [] hass.states.async_set("binary_sensor.test", "off") await hass.async_block_till_done() assert refresh_runs == [] refresh_runs = [] hass.states.async_set("switch.new", "off") await hass.async_block_till_done() assert refresh_runs == [ [ TrackTemplateResult( template_4, ["switch.test"], ["switch.new", "switch.test"] ) ] ] async def test_async_track_template_result_raise_on_template_error(hass): """Test that we raise as soon as we encounter a failed template.""" with pytest.raises(TemplateError): async_track_template_result( hass, [ TrackTemplate( Template( "{{ states.switch | function_that_does_not_exist | list }}" ), None, ), ], ha.callback(lambda event, updates: None), raise_on_template_error=True, ) async def test_track_same_state_simple_no_trigger(hass): """Test track_same_change with no trigger.""" callback_runs = [] period = timedelta(minutes=1) @ha.callback def callback_run_callback(): callback_runs.append(1) async_track_same_state( hass, period, callback_run_callback, callback(lambda _, _2, to_s: to_s.state == "on"), entity_ids="light.Bowl", ) # Adding state to state machine hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() assert len(callback_runs) == 0 # Change state on state machine hass.states.async_set("light.Bowl", "off") await hass.async_block_till_done() assert len(callback_runs) == 0 # change time to track and see if they trigger future = dt_util.utcnow() + period async_fire_time_changed(hass, future) await hass.async_block_till_done() assert len(callback_runs) == 0 async def test_track_same_state_simple_trigger_check_funct(hass): """Test track_same_change with trigger and check funct.""" callback_runs = [] check_func = [] period = timedelta(minutes=1) @ha.callback def callback_run_callback(): callback_runs.append(1) @ha.callback def async_check_func(entity, from_s, to_s): check_func.append((entity, from_s, to_s)) return True async_track_same_state( hass, period, callback_run_callback, entity_ids="light.Bowl", async_check_same_func=async_check_func, ) # Adding state to state machine hass.states.async_set("light.Bowl", "on") await hass.async_block_till_done() await hass.async_block_till_done() assert len(callback_runs) == 0 assert check_func[-1][2].state == "on" assert check_func[-1][0] == "light.bowl" # change time to track and see if they trigger future = dt_util.utcnow() + period async_fire_time_changed(hass, future) await hass.async_block_till_done() assert len(callback_runs) == 1 async def test_track_time_interval(hass): """Test tracking time interval.""" specific_runs = [] utc_now = dt_util.utcnow() unsub = async_track_time_interval( hass, callback(lambda x: specific_runs.append(x)), timedelta(seconds=10) ) async_fire_time_changed(hass, utc_now + timedelta(seconds=5)) await hass.async_block_till_done() assert len(specific_runs) == 0 async_fire_time_changed(hass, utc_now + timedelta(seconds=13)) await hass.async_block_till_done() assert len(specific_runs) == 1 async_fire_time_changed(hass, utc_now + timedelta(minutes=20)) await hass.async_block_till_done() assert len(specific_runs) == 2 unsub() async_fire_time_changed(hass, utc_now + timedelta(seconds=30)) await hass.async_block_till_done() assert len(specific_runs) == 2 async def test_track_sunrise(hass, legacy_patchable_time): """Test track the sunrise.""" latitude = 32.87336 longitude = 117.22743 # Setup sun component hass.config.latitude = latitude hass.config.longitude = longitude assert await async_setup_component( hass, sun.DOMAIN, {sun.DOMAIN: {sun.CONF_ELEVATION: 0}} ) # Get next sunrise/sunset astral = Astral() utc_now = datetime(2014, 5, 24, 12, 0, 0, tzinfo=dt_util.UTC) utc_today = utc_now.date() mod = -1 while True: next_rising = astral.sunrise_utc( utc_today + timedelta(days=mod), latitude, longitude ) if next_rising > utc_now: break mod += 1 # Track sunrise runs = [] with patch("homeassistant.util.dt.utcnow", return_value=utc_now): unsub = async_track_sunrise(hass, callback(lambda: runs.append(1))) offset_runs = [] offset = timedelta(minutes=30) with patch("homeassistant.util.dt.utcnow", return_value=utc_now): unsub2 = async_track_sunrise( hass, callback(lambda: offset_runs.append(1)), offset ) # run tests async_fire_time_changed(hass, next_rising - offset) await hass.async_block_till_done() assert len(runs) == 0 assert len(offset_runs) == 0 async_fire_time_changed(hass, next_rising) await hass.async_block_till_done() assert len(runs) == 1 assert len(offset_runs) == 0 async_fire_time_changed(hass, next_rising + offset) await hass.async_block_till_done() assert len(runs) == 1 assert len(offset_runs) == 1 unsub() unsub2() async_fire_time_changed(hass, next_rising + offset) await hass.async_block_till_done() assert len(runs) == 1 assert len(offset_runs) == 1 async def test_track_sunrise_update_location(hass, legacy_patchable_time): """Test track the sunrise.""" # Setup sun component hass.config.latitude = 32.87336 hass.config.longitude = 117.22743 assert await async_setup_component( hass, sun.DOMAIN, {sun.DOMAIN: {sun.CONF_ELEVATION: 0}} ) # Get next sunrise astral = Astral() utc_now = datetime(2014, 5, 24, 12, 0, 0, tzinfo=dt_util.UTC) utc_today = utc_now.date() mod = -1 while True: next_rising = astral.sunrise_utc( utc_today + timedelta(days=mod), hass.config.latitude, hass.config.longitude ) if next_rising > utc_now: break mod += 1 # Track sunrise runs = [] with patch("homeassistant.util.dt.utcnow", return_value=utc_now): async_track_sunrise(hass, callback(lambda: runs.append(1))) # Mimic sunrise async_fire_time_changed(hass, next_rising) await hass.async_block_till_done() assert len(runs) == 1 # Move! with patch("homeassistant.util.dt.utcnow", return_value=utc_now): await hass.config.async_update(latitude=40.755931, longitude=-73.984606) await hass.async_block_till_done() # Mimic sunrise async_fire_time_changed(hass, next_rising) await hass.async_block_till_done() # Did not increase assert len(runs) == 1 # Get next sunrise mod = -1 while True: next_rising = astral.sunrise_utc( utc_today + timedelta(days=mod), hass.config.latitude, hass.config.longitude ) if next_rising > utc_now: break mod += 1 # Mimic sunrise at new location async_fire_time_changed(hass, next_rising) await hass.async_block_till_done() assert len(runs) == 2 async def test_track_sunset(hass, legacy_patchable_time): """Test track the sunset.""" latitude = 32.87336 longitude = 117.22743 # Setup sun component hass.config.latitude = latitude hass.config.longitude = longitude assert await async_setup_component( hass, sun.DOMAIN, {sun.DOMAIN: {sun.CONF_ELEVATION: 0}} ) # Get next sunrise/sunset astral = Astral() utc_now = datetime(2014, 5, 24, 12, 0, 0, tzinfo=dt_util.UTC) utc_today = utc_now.date() mod = -1 while True: next_setting = astral.sunset_utc( utc_today + timedelta(days=mod), latitude, longitude ) if next_setting > utc_now: break mod += 1 # Track sunset runs = [] with patch("homeassistant.util.dt.utcnow", return_value=utc_now): unsub = async_track_sunset(hass, callback(lambda: runs.append(1))) offset_runs = [] offset = timedelta(minutes=30) with patch("homeassistant.util.dt.utcnow", return_value=utc_now): unsub2 = async_track_sunset( hass, callback(lambda: offset_runs.append(1)), offset ) # Run tests async_fire_time_changed(hass, next_setting - offset) await hass.async_block_till_done() assert len(runs) == 0 assert len(offset_runs) == 0 async_fire_time_changed(hass, next_setting) await hass.async_block_till_done() assert len(runs) == 1 assert len(offset_runs) == 0 async_fire_time_changed(hass, next_setting + offset) await hass.async_block_till_done() assert len(runs) == 1 assert len(offset_runs) == 1 unsub() unsub2() async_fire_time_changed(hass, next_setting + offset) await hass.async_block_till_done() assert len(runs) == 1 assert len(offset_runs) == 1 async def test_async_track_time_change(hass): """Test tracking time change.""" wildcard_runs = [] specific_runs = [] now = dt_util.utcnow() time_that_will_not_match_right_away = datetime( now.year + 1, 5, 24, 11, 59, 55, tzinfo=dt_util.UTC ) with patch( "homeassistant.util.dt.utcnow", return_value=time_that_will_not_match_right_away ): unsub = async_track_time_change( hass, callback(lambda x: wildcard_runs.append(x)) ) unsub_utc = async_track_utc_time_change( hass, callback(lambda x: specific_runs.append(x)), second=[0, 30] ) async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 12, 0, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 1 assert len(wildcard_runs) == 1 async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 12, 0, 15, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 1 assert len(wildcard_runs) == 2 async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 12, 0, 30, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 2 assert len(wildcard_runs) == 3 unsub() unsub_utc() async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 12, 0, 30, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 2 assert len(wildcard_runs) == 3 async def test_periodic_task_minute(hass): """Test periodic tasks per minute.""" specific_runs = [] now = dt_util.utcnow() time_that_will_not_match_right_away = datetime( now.year + 1, 5, 24, 11, 59, 55, tzinfo=dt_util.UTC ) with patch( "homeassistant.util.dt.utcnow", return_value=time_that_will_not_match_right_away ): unsub = async_track_utc_time_change( hass, callback(lambda x: specific_runs.append(x)), minute="/5", second=0 ) async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 12, 0, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 1 async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 12, 3, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 1 async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 12, 5, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 2 unsub() async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 12, 5, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 2 async def test_periodic_task_hour(hass): """Test periodic tasks per hour.""" specific_runs = [] now = dt_util.utcnow() time_that_will_not_match_right_away = datetime( now.year + 1, 5, 24, 21, 59, 55, tzinfo=dt_util.UTC ) with patch( "homeassistant.util.dt.utcnow", return_value=time_that_will_not_match_right_away ): unsub = async_track_utc_time_change( hass, callback(lambda x: specific_runs.append(x)), hour="/2", minute=0, second=0, ) async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 22, 0, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 1 async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 23, 0, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 1 async_fire_time_changed( hass, datetime(now.year + 1, 5, 25, 0, 0, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 2 async_fire_time_changed( hass, datetime(now.year + 1, 5, 25, 1, 0, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 2 async_fire_time_changed( hass, datetime(now.year + 1, 5, 25, 2, 0, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 3 unsub() async_fire_time_changed( hass, datetime(now.year + 1, 5, 25, 2, 0, 0, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 3 async def test_periodic_task_wrong_input(hass): """Test periodic tasks with wrong input.""" specific_runs = [] now = dt_util.utcnow() with pytest.raises(ValueError): async_track_utc_time_change( hass, callback(lambda x: specific_runs.append(x)), hour="/two" ) async_fire_time_changed( hass, datetime(now.year + 1, 5, 2, 0, 0, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 0 async def test_periodic_task_clock_rollback(hass): """Test periodic tasks with the time rolling backwards.""" specific_runs = [] now = dt_util.utcnow() time_that_will_not_match_right_away = datetime( now.year + 1, 5, 24, 21, 59, 55, tzinfo=dt_util.UTC ) with patch( "homeassistant.util.dt.utcnow", return_value=time_that_will_not_match_right_away ): unsub = async_track_utc_time_change( hass, callback(lambda x: specific_runs.append(x)), hour="/2", minute=0, second=0, ) async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 22, 0, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 1 async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 23, 0, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 1 async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 22, 0, 0, 999999, tzinfo=dt_util.UTC), fire_all=True, ) await hass.async_block_till_done() assert len(specific_runs) == 2 async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 0, 0, 0, 999999, tzinfo=dt_util.UTC), fire_all=True, ) await hass.async_block_till_done() assert len(specific_runs) == 3 async_fire_time_changed( hass, datetime(now.year + 1, 5, 25, 2, 0, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 4 unsub() async_fire_time_changed( hass, datetime(now.year + 1, 5, 25, 2, 0, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 4 async def test_periodic_task_duplicate_time(hass): """Test periodic tasks not triggering on duplicate time.""" specific_runs = [] now = dt_util.utcnow() time_that_will_not_match_right_away = datetime( now.year + 1, 5, 24, 21, 59, 55, tzinfo=dt_util.UTC ) with patch( "homeassistant.util.dt.utcnow", return_value=time_that_will_not_match_right_away ): unsub = async_track_utc_time_change( hass, callback(lambda x: specific_runs.append(x)), hour="/2", minute=0, second=0, ) async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 22, 0, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 1 async_fire_time_changed( hass, datetime(now.year + 1, 5, 24, 22, 0, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 1 async_fire_time_changed( hass, datetime(now.year + 1, 5, 25, 0, 0, 0, 999999, tzinfo=dt_util.UTC) ) await hass.async_block_till_done() assert len(specific_runs) == 2 unsub() async def test_periodic_task_entering_dst(hass): """Test periodic task behavior when entering dst.""" timezone = dt_util.get_time_zone("Europe/Vienna") dt_util.set_default_time_zone(timezone) specific_runs = [] now = dt_util.utcnow() time_that_will_not_match_right_away = timezone.localize( datetime(now.year + 1, 3, 25, 2, 31, 0) ) with patch( "homeassistant.util.dt.utcnow", return_value=time_that_will_not_match_right_away ): unsub = async_track_time_change( hass, callback(lambda x: specific_runs.append(x)), hour=2, minute=30, second=0, ) async_fire_time_changed( hass, timezone.localize(datetime(now.year + 1, 3, 25, 1, 50, 0, 999999)) ) await hass.async_block_till_done() assert len(specific_runs) == 0 async_fire_time_changed( hass, timezone.localize(datetime(now.year + 1, 3, 25, 3, 50, 0, 999999)) ) await hass.async_block_till_done() assert len(specific_runs) == 0 async_fire_time_changed( hass, timezone.localize(datetime(now.year + 1, 3, 26, 1, 50, 0, 999999)) ) await hass.async_block_till_done() assert len(specific_runs) == 0 async_fire_time_changed( hass, timezone.localize(datetime(now.year + 1, 3, 26, 2, 50, 0, 999999)) ) await hass.async_block_till_done() assert len(specific_runs) == 1 unsub() async def test_periodic_task_leaving_dst(hass): """Test periodic task behavior when leaving dst.""" timezone = dt_util.get_time_zone("Europe/Vienna") dt_util.set_default_time_zone(timezone) specific_runs = [] now = dt_util.utcnow() time_that_will_not_match_right_away = timezone.localize( datetime(now.year + 1, 10, 28, 2, 28, 0), is_dst=True ) with patch( "homeassistant.util.dt.utcnow", return_value=time_that_will_not_match_right_away ): unsub = async_track_time_change( hass, callback(lambda x: specific_runs.append(x)), hour=2, minute=30, second=0, ) async_fire_time_changed( hass, timezone.localize( datetime(now.year + 1, 10, 28, 2, 5, 0, 999999), is_dst=False ), ) await hass.async_block_till_done() assert len(specific_runs) == 0 async_fire_time_changed( hass, timezone.localize( datetime(now.year + 1, 10, 28, 2, 55, 0, 999999), is_dst=False ), ) await hass.async_block_till_done() assert len(specific_runs) == 1 async_fire_time_changed( hass, timezone.localize( datetime(now.year + 2, 10, 28, 2, 45, 0, 999999), is_dst=True ), ) await hass.async_block_till_done() assert len(specific_runs) == 2 async_fire_time_changed( hass, timezone.localize( datetime(now.year + 2, 10, 28, 2, 55, 0, 999999), is_dst=True ), ) await hass.async_block_till_done() assert len(specific_runs) == 2 async_fire_time_changed( hass, timezone.localize( datetime(now.year + 2, 10, 28, 2, 55, 0, 999999), is_dst=True ), ) await hass.async_block_till_done() assert len(specific_runs) == 2 unsub() async def test_call_later(hass): """Test calling an action later.""" def action(): pass now = datetime(2017, 12, 19, 15, 40, 0, tzinfo=dt_util.UTC) with patch( "homeassistant.helpers.event.async_track_point_in_utc_time" ) as mock, patch("homeassistant.util.dt.utcnow", return_value=now): async_call_later(hass, 3, action) assert len(mock.mock_calls) == 1 p_hass, p_action, p_point = mock.mock_calls[0][1] assert p_hass is hass assert p_action is action assert p_point == now + timedelta(seconds=3) async def test_async_call_later(hass): """Test calling an action later.""" def action(): pass now = datetime(2017, 12, 19, 15, 40, 0, tzinfo=dt_util.UTC) with patch( "homeassistant.helpers.event.async_track_point_in_utc_time" ) as mock, patch("homeassistant.util.dt.utcnow", return_value=now): remove = async_call_later(hass, 3, action) assert len(mock.mock_calls) == 1 p_hass, p_action, p_point = mock.mock_calls[0][1] assert p_hass is hass assert p_action is action assert p_point == now + timedelta(seconds=3) assert remove is mock() async def test_track_state_change_event_chain_multple_entity(hass): """Test that adding a new state tracker inside a tracker does not fire right away.""" tracker_called = [] chained_tracker_called = [] chained_tracker_unsub = [] tracker_unsub = [] @ha.callback def chained_single_run_callback(event): old_state = event.data.get("old_state") new_state = event.data.get("new_state") chained_tracker_called.append((old_state, new_state)) @ha.callback def single_run_callback(event): old_state = event.data.get("old_state") new_state = event.data.get("new_state") tracker_called.append((old_state, new_state)) chained_tracker_unsub.append( async_track_state_change_event( hass, ["light.bowl", "light.top"], chained_single_run_callback ) ) tracker_unsub.append( async_track_state_change_event( hass, ["light.bowl", "light.top"], single_run_callback ) ) hass.states.async_set("light.bowl", "on") hass.states.async_set("light.top", "on") await hass.async_block_till_done() assert len(tracker_called) == 2 assert len(chained_tracker_called) == 1 assert len(tracker_unsub) == 1 assert len(chained_tracker_unsub) == 2 hass.states.async_set("light.bowl", "off") await hass.async_block_till_done() assert len(tracker_called) == 3 assert len(chained_tracker_called) == 3 assert len(tracker_unsub) == 1 assert len(chained_tracker_unsub) == 3 async def test_track_state_change_event_chain_single_entity(hass): """Test that adding a new state tracker inside a tracker does not fire right away.""" tracker_called = [] chained_tracker_called = [] chained_tracker_unsub = [] tracker_unsub = [] @ha.callback def chained_single_run_callback(event): old_state = event.data.get("old_state") new_state = event.data.get("new_state") chained_tracker_called.append((old_state, new_state)) @ha.callback def single_run_callback(event): old_state = event.data.get("old_state") new_state = event.data.get("new_state") tracker_called.append((old_state, new_state)) chained_tracker_unsub.append( async_track_state_change_event( hass, "light.bowl", chained_single_run_callback ) ) tracker_unsub.append( async_track_state_change_event(hass, "light.bowl", single_run_callback) ) hass.states.async_set("light.bowl", "on") await hass.async_block_till_done() assert len(tracker_called) == 1 assert len(chained_tracker_called) == 0 assert len(tracker_unsub) == 1 assert len(chained_tracker_unsub) == 1 hass.states.async_set("light.bowl", "off") await hass.async_block_till_done() assert len(tracker_called) == 2 assert len(chained_tracker_called) == 1 assert len(tracker_unsub) == 1 assert len(chained_tracker_unsub) == 2 async def test_track_point_in_utc_time_cancel(hass): """Test cancel of async track point in time.""" times = [] @ha.callback def run_callback(utc_time): nonlocal times times.append(utc_time) def _setup_listeners(): """Ensure we test the non-async version.""" utc_now = dt_util.utcnow() with pytest.raises(TypeError): track_point_in_utc_time("nothass", run_callback, utc_now) unsub1 = hass.helpers.event.track_point_in_utc_time( run_callback, utc_now + timedelta(seconds=0.1) ) hass.helpers.event.track_point_in_utc_time( run_callback, utc_now + timedelta(seconds=0.1) ) unsub1() await hass.async_add_executor_job(_setup_listeners) await asyncio.sleep(0.2) assert len(times) == 1 assert times[0].tzinfo == dt_util.UTC async def test_async_track_point_in_time_cancel(hass): """Test cancel of async track point in time.""" times = [] hst_tz = dt_util.get_time_zone("US/Hawaii") dt_util.set_default_time_zone(hst_tz) @ha.callback def run_callback(local_time): nonlocal times times.append(local_time) utc_now = dt_util.utcnow() hst_now = utc_now.astimezone(hst_tz) unsub1 = hass.helpers.event.async_track_point_in_time( run_callback, hst_now + timedelta(seconds=0.1) ) hass.helpers.event.async_track_point_in_time( run_callback, hst_now + timedelta(seconds=0.1) ) unsub1() await asyncio.sleep(0.2) assert len(times) == 1 assert times[0].tzinfo.zone == "US/Hawaii" async def test_async_track_entity_registry_updated_event(hass): """Test tracking entity registry updates for an entity_id.""" entity_id = "switch.puppy_feeder" new_entity_id = "switch.dog_feeder" untracked_entity_id = "switch.kitty_feeder" hass.states.async_set(entity_id, "on") await hass.async_block_till_done() event_data = [] @ha.callback def run_callback(event): event_data.append(event.data) unsub1 = hass.helpers.event.async_track_entity_registry_updated_event( entity_id, run_callback ) unsub2 = hass.helpers.event.async_track_entity_registry_updated_event( new_entity_id, run_callback ) hass.bus.async_fire( EVENT_ENTITY_REGISTRY_UPDATED, {"action": "create", "entity_id": entity_id} ) hass.bus.async_fire( EVENT_ENTITY_REGISTRY_UPDATED, {"action": "create", "entity_id": untracked_entity_id}, ) await hass.async_block_till_done() hass.bus.async_fire( EVENT_ENTITY_REGISTRY_UPDATED, { "action": "update", "entity_id": new_entity_id, "old_entity_id": entity_id, "changes": {}, }, ) await hass.async_block_till_done() hass.bus.async_fire( EVENT_ENTITY_REGISTRY_UPDATED, {"action": "remove", "entity_id": new_entity_id} ) await hass.async_block_till_done() unsub1() unsub2() hass.bus.async_fire( EVENT_ENTITY_REGISTRY_UPDATED, {"action": "create", "entity_id": entity_id} ) hass.bus.async_fire( EVENT_ENTITY_REGISTRY_UPDATED, {"action": "create", "entity_id": new_entity_id} ) await hass.async_block_till_done() assert event_data[0] == {"action": "create", "entity_id": "switch.puppy_feeder"} assert event_data[1] == { "action": "update", "changes": {}, "entity_id": "switch.dog_feeder", "old_entity_id": "switch.puppy_feeder", } assert event_data[2] == {"action": "remove", "entity_id": "switch.dog_feeder"} async def test_async_track_entity_registry_updated_event_with_a_callback_that_throws( hass, ): """Test tracking entity registry updates for an entity_id when one callback throws.""" entity_id = "switch.puppy_feeder" hass.states.async_set(entity_id, "on") await hass.async_block_till_done() event_data = [] @ha.callback def run_callback(event): event_data.append(event.data) @ha.callback def failing_callback(event): raise ValueError unsub1 = hass.helpers.event.async_track_entity_registry_updated_event( entity_id, failing_callback ) unsub2 = hass.helpers.event.async_track_entity_registry_updated_event( entity_id, run_callback ) hass.bus.async_fire( EVENT_ENTITY_REGISTRY_UPDATED, {"action": "create", "entity_id": entity_id} ) await hass.async_block_till_done() unsub1() unsub2() assert event_data[0] == {"action": "create", "entity_id": "switch.puppy_feeder"} async def test_async_track_entity_registry_updated_event_with_empty_list(hass): """Test async_track_entity_registry_updated_event passing an empty list of entities.""" unsub_single = hass.helpers.event.async_track_entity_registry_updated_event( [], ha.callback(lambda event: None) ) unsub_single2 = hass.helpers.event.async_track_entity_registry_updated_event( [], ha.callback(lambda event: None) ) unsub_single2() unsub_single()