"""Test to verify that Home Assistant core works.""" # pylint: disable=protected-access,too-many-public-methods # pylint: disable=too-few-public-methods import asyncio import unittest from unittest.mock import patch, MagicMock from datetime import datetime, timedelta import pytz import homeassistant.core as ha from homeassistant.exceptions import InvalidEntityFormatError import homeassistant.util.dt as dt_util from homeassistant.util.unit_system import (METRIC_SYSTEM) from homeassistant.const import ( __version__, EVENT_STATE_CHANGED, ATTR_FRIENDLY_NAME, CONF_UNIT_SYSTEM) from tests.common import get_test_home_assistant PST = pytz.timezone('America/Los_Angeles') def test_split_entity_id(): """Test split_entity_id.""" assert ha.split_entity_id('domain.object_id') == ['domain', 'object_id'] def test_async_add_job_schedule_callback(): """Test that we schedule coroutines and add jobs to the job pool.""" hass = MagicMock() job = MagicMock() ha.HomeAssistant.async_add_job(hass, ha.callback(job)) assert len(hass.loop.call_soon.mock_calls) == 1 assert len(hass.loop.create_task.mock_calls) == 0 assert len(hass.add_job.mock_calls) == 0 @patch('asyncio.iscoroutinefunction', return_value=True) def test_async_add_job_schedule_coroutinefunction(mock_iscoro): """Test that we schedule coroutines and add jobs to the job pool.""" hass = MagicMock() job = MagicMock() ha.HomeAssistant.async_add_job(hass, job) assert len(hass.loop.call_soon.mock_calls) == 0 assert len(hass.loop.create_task.mock_calls) == 1 assert len(hass.add_job.mock_calls) == 0 @patch('asyncio.iscoroutinefunction', return_value=False) def test_async_add_job_add_threaded_job_to_pool(mock_iscoro): """Test that we schedule coroutines and add jobs to the job pool.""" hass = MagicMock() job = MagicMock() ha.HomeAssistant.async_add_job(hass, job) assert len(hass.loop.call_soon.mock_calls) == 0 assert len(hass.loop.create_task.mock_calls) == 0 assert len(hass.add_job.mock_calls) == 1 def test_async_run_job_calls_callback(): """Test that the callback annotation is respected.""" hass = MagicMock() calls = [] def job(): calls.append(1) ha.HomeAssistant.async_run_job(hass, ha.callback(job)) assert len(calls) == 1 assert len(hass.async_add_job.mock_calls) == 0 def test_async_run_job_delegates_non_async(): """Test that the callback annotation is respected.""" hass = MagicMock() calls = [] def job(): calls.append(1) ha.HomeAssistant.async_run_job(hass, job) assert len(calls) == 0 assert len(hass.async_add_job.mock_calls) == 1 class TestHomeAssistant(unittest.TestCase): """Test the Home Assistant core classes.""" def setUp(self): # pylint: disable=invalid-name """Setup things to be run when tests are started.""" self.hass = get_test_home_assistant(0) def tearDown(self): # pylint: disable=invalid-name """Stop everything that was started.""" self.hass.stop() # This test hangs on `loop.add_signal_handler` # def test_start_and_sigterm(self): # """Start the test.""" # calls = [] # self.hass.bus.listen_once(EVENT_HOMEASSISTANT_START, # lambda event: calls.append(1)) # self.hass.start() # self.assertEqual(1, len(calls)) # self.hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, # lambda event: calls.append(1)) # os.kill(os.getpid(), signal.SIGTERM) # self.hass.block_till_done() # self.assertEqual(1, len(calls)) class TestEvent(unittest.TestCase): """A Test Event class.""" def test_eq(self): """Test events.""" now = dt_util.utcnow() data = {'some': 'attr'} event1, event2 = [ ha.Event('some_type', data, time_fired=now) for _ in range(2) ] self.assertEqual(event1, event2) def test_repr(self): """Test that repr method works.""" self.assertEqual( "", str(ha.Event("TestEvent"))) self.assertEqual( "", str(ha.Event("TestEvent", {"beer": "nice"}, ha.EventOrigin.remote))) def test_as_dict(self): """Test as dictionary.""" event_type = 'some_type' now = dt_util.utcnow() data = {'some': 'attr'} event = ha.Event(event_type, data, ha.EventOrigin.local, now) expected = { 'event_type': event_type, 'data': data, 'origin': 'LOCAL', 'time_fired': now, } self.assertEqual(expected, event.as_dict()) class TestEventBus(unittest.TestCase): """Test EventBus methods.""" def setUp(self): # pylint: disable=invalid-name """Setup things to be run when tests are started.""" self.hass = get_test_home_assistant() self.bus = self.hass.bus self.bus.listen('test_event', lambda x: len) def tearDown(self): # pylint: disable=invalid-name """Stop down stuff we started.""" self.hass.stop() def test_add_remove_listener(self): """Test remove_listener method.""" old_count = len(self.bus.listeners) def listener(_): pass unsub = self.bus.listen('test', listener) self.assertEqual(old_count + 1, len(self.bus.listeners)) # Remove listener unsub() self.assertEqual(old_count, len(self.bus.listeners)) # Should do nothing now unsub() def test_unsubscribe_listener(self): """Test unsubscribe listener from returned function.""" calls = [] def listener(event): """Mock listener.""" calls.append(event) unsub = self.bus.listen('test', listener) self.bus.fire('test') self.hass.block_till_done() assert len(calls) == 1 unsub() self.bus.fire('event') self.hass.block_till_done() assert len(calls) == 1 def test_listen_once_event_with_callback(self): """Test listen_once_event method.""" runs = [] @ha.callback def event_handler(event): runs.append(event) self.bus.listen_once('test_event', event_handler) self.bus.fire('test_event') # Second time it should not increase runs self.bus.fire('test_event') self.hass.block_till_done() self.assertEqual(1, len(runs)) def test_listen_once_event_with_coroutine(self): """Test listen_once_event method.""" runs = [] @asyncio.coroutine def event_handler(event): runs.append(event) self.bus.listen_once('test_event', event_handler) self.bus.fire('test_event') # Second time it should not increase runs self.bus.fire('test_event') self.hass.block_till_done() self.assertEqual(1, len(runs)) def test_listen_once_event_with_thread(self): """Test listen_once_event method.""" runs = [] def event_handler(event): runs.append(event) self.bus.listen_once('test_event', event_handler) self.bus.fire('test_event') # Second time it should not increase runs self.bus.fire('test_event') self.hass.block_till_done() self.assertEqual(1, len(runs)) def test_thread_event_listener(self): """Test a event listener listeners.""" thread_calls = [] def thread_listener(event): thread_calls.append(event) self.bus.listen('test_thread', thread_listener) self.bus.fire('test_thread') self.hass.block_till_done() assert len(thread_calls) == 1 def test_callback_event_listener(self): """Test a event listener listeners.""" callback_calls = [] @ha.callback def callback_listener(event): callback_calls.append(event) self.bus.listen('test_callback', callback_listener) self.bus.fire('test_callback') self.hass.block_till_done() assert len(callback_calls) == 1 def test_coroutine_event_listener(self): """Test a event listener listeners.""" coroutine_calls = [] @asyncio.coroutine def coroutine_listener(event): coroutine_calls.append(event) self.bus.listen('test_coroutine', coroutine_listener) self.bus.fire('test_coroutine') self.hass.block_till_done() assert len(coroutine_calls) == 1 class TestState(unittest.TestCase): """Test State methods.""" def test_init(self): """Test state.init.""" self.assertRaises( InvalidEntityFormatError, ha.State, 'invalid_entity_format', 'test_state') def test_domain(self): """Test domain.""" state = ha.State('some_domain.hello', 'world') self.assertEqual('some_domain', state.domain) def test_object_id(self): """Test object ID.""" state = ha.State('domain.hello', 'world') self.assertEqual('hello', state.object_id) def test_name_if_no_friendly_name_attr(self): """Test if there is no friendly name.""" state = ha.State('domain.hello_world', 'world') self.assertEqual('hello world', state.name) def test_name_if_friendly_name_attr(self): """Test if there is a friendly name.""" name = 'Some Unique Name' state = ha.State('domain.hello_world', 'world', {ATTR_FRIENDLY_NAME: name}) self.assertEqual(name, state.name) def test_dict_conversion(self): """Test conversion of dict.""" state = ha.State('domain.hello', 'world', {'some': 'attr'}) self.assertEqual(state, ha.State.from_dict(state.as_dict())) def test_dict_conversion_with_wrong_data(self): """Test conversion with wrong data.""" self.assertIsNone(ha.State.from_dict(None)) self.assertIsNone(ha.State.from_dict({'state': 'yes'})) self.assertIsNone(ha.State.from_dict({'entity_id': 'yes'})) def test_repr(self): """Test state.repr.""" self.assertEqual("", str(ha.State( "happy.happy", "on", last_changed=datetime(1984, 12, 8, 12, 0, 0)))) self.assertEqual( "", str(ha.State("happy.happy", "on", {"brightness": 144}, datetime(1984, 12, 8, 12, 0, 0)))) class TestStateMachine(unittest.TestCase): """Test State machine methods.""" def setUp(self): # pylint: disable=invalid-name """Setup things to be run when tests are started.""" self.hass = get_test_home_assistant(0) self.states = self.hass.states self.states.set("light.Bowl", "on") self.states.set("switch.AC", "off") def tearDown(self): # pylint: disable=invalid-name """Stop down stuff we started.""" self.hass.stop() def test_is_state(self): """Test is_state method.""" self.assertTrue(self.states.is_state('light.Bowl', 'on')) self.assertFalse(self.states.is_state('light.Bowl', 'off')) self.assertFalse(self.states.is_state('light.Non_existing', 'on')) def test_is_state_attr(self): """Test is_state_attr method.""" self.states.set("light.Bowl", "on", {"brightness": 100}) self.assertTrue( self.states.is_state_attr('light.Bowl', 'brightness', 100)) self.assertFalse( self.states.is_state_attr('light.Bowl', 'friendly_name', 200)) self.assertFalse( self.states.is_state_attr('light.Bowl', 'friendly_name', 'Bowl')) self.assertFalse( self.states.is_state_attr('light.Non_existing', 'brightness', 100)) def test_entity_ids(self): """Test get_entity_ids method.""" ent_ids = self.states.entity_ids() self.assertEqual(2, len(ent_ids)) self.assertTrue('light.bowl' in ent_ids) self.assertTrue('switch.ac' in ent_ids) ent_ids = self.states.entity_ids('light') self.assertEqual(1, len(ent_ids)) self.assertTrue('light.bowl' in ent_ids) def test_all(self): """Test everything.""" states = sorted(state.entity_id for state in self.states.all()) self.assertEqual(['light.bowl', 'switch.ac'], states) def test_remove(self): """Test remove method.""" events = [] self.hass.bus.listen(EVENT_STATE_CHANGED, lambda event: events.append(event)) self.assertIn('light.bowl', self.states.entity_ids()) self.assertTrue(self.states.remove('light.bowl')) self.hass.block_till_done() self.assertNotIn('light.bowl', self.states.entity_ids()) self.assertEqual(1, len(events)) self.assertEqual('light.bowl', events[0].data.get('entity_id')) self.assertIsNotNone(events[0].data.get('old_state')) self.assertEqual('light.bowl', events[0].data['old_state'].entity_id) self.assertIsNone(events[0].data.get('new_state')) # If it does not exist, we should get False self.assertFalse(self.states.remove('light.Bowl')) self.hass.block_till_done() self.assertEqual(1, len(events)) def test_case_insensitivty(self): """Test insensitivty.""" runs = [] self.hass.bus.listen(EVENT_STATE_CHANGED, lambda event: runs.append(event)) self.states.set('light.BOWL', 'off') self.hass.block_till_done() self.assertTrue(self.states.is_state('light.bowl', 'off')) self.assertEqual(1, len(runs)) def test_last_changed_not_updated_on_same_state(self): """Test to not update the existing, same state.""" state = self.states.get('light.Bowl') future = dt_util.utcnow() + timedelta(hours=10) with patch('homeassistant.util.dt.utcnow', return_value=future): self.states.set("light.Bowl", "on", {'attr': 'triggers_change'}) self.hass.block_till_done() state2 = self.states.get('light.Bowl') assert state2 is not None assert state.last_changed == state2.last_changed def test_force_update(self): """Test force update option.""" events = [] self.hass.bus.listen(EVENT_STATE_CHANGED, lambda ev: events.append(ev)) self.states.set('light.bowl', 'on') self.hass.block_till_done() self.assertEqual(0, len(events)) self.states.set('light.bowl', 'on', None, True) self.hass.block_till_done() self.assertEqual(1, len(events)) class TestServiceCall(unittest.TestCase): """Test ServiceCall class.""" def test_repr(self): """Test repr method.""" self.assertEqual( "", str(ha.ServiceCall('homeassistant', 'start'))) self.assertEqual( "", str(ha.ServiceCall('homeassistant', 'start', {"fast": "yes"}))) class TestServiceRegistry(unittest.TestCase): """Test ServicerRegistry methods.""" def setUp(self): # pylint: disable=invalid-name """Setup things to be run when tests are started.""" self.hass = get_test_home_assistant() self.services = self.hass.services self.services.register("Test_Domain", "TEST_SERVICE", lambda x: None) def tearDown(self): # pylint: disable=invalid-name """Stop down stuff we started.""" self.hass.stop() def test_has_service(self): """Test has_service method.""" self.assertTrue( self.services.has_service("tesT_domaiN", "tesT_servicE")) self.assertFalse( self.services.has_service("test_domain", "non_existing")) self.assertFalse( self.services.has_service("non_existing", "test_service")) def test_services(self): """Test services.""" expected = { 'test_domain': {'test_service': {'description': '', 'fields': {}}} } self.assertEqual(expected, self.services.services) def test_call_with_blocking_done_in_time(self): """Test call with blocking.""" calls = [] def service_handler(call): """Service handler.""" calls.append(call) self.services.register("test_domain", "register_calls", service_handler) self.assertTrue( self.services.call('test_domain', 'REGISTER_CALLS', blocking=True)) self.assertEqual(1, len(calls)) def test_call_non_existing_with_blocking(self): """Test non-existing with blocking.""" prior = ha.SERVICE_CALL_LIMIT try: ha.SERVICE_CALL_LIMIT = 0.01 assert not self.services.call('test_domain', 'i_do_not_exist', blocking=True) finally: ha.SERVICE_CALL_LIMIT = prior def test_async_service(self): """Test registering and calling an async service.""" calls = [] @asyncio.coroutine def service_handler(call): """Service handler coroutine.""" calls.append(call) self.services.register('test_domain', 'register_calls', service_handler) self.assertTrue( self.services.call('test_domain', 'REGISTER_CALLS', blocking=True)) self.hass.block_till_done() self.assertEqual(1, len(calls)) def test_callback_service(self): """Test registering and calling an async service.""" calls = [] @ha.callback def service_handler(call): """Service handler coroutine.""" calls.append(call) self.services.register('test_domain', 'register_calls', service_handler) self.assertTrue( self.services.call('test_domain', 'REGISTER_CALLS', blocking=True)) self.hass.block_till_done() self.assertEqual(1, len(calls)) class TestConfig(unittest.TestCase): """Test configuration methods.""" def setUp(self): # pylint: disable=invalid-name """Setup things to be run when tests are started.""" self.config = ha.Config() self.assertIsNone(self.config.config_dir) def test_path_with_file(self): """Test get_config_path method.""" self.config.config_dir = '/tmp/ha-config' self.assertEqual("/tmp/ha-config/test.conf", self.config.path("test.conf")) def test_path_with_dir_and_file(self): """Test get_config_path method.""" self.config.config_dir = '/tmp/ha-config' self.assertEqual("/tmp/ha-config/dir/test.conf", self.config.path("dir", "test.conf")) def test_as_dict(self): """Test as dict.""" self.config.config_dir = '/tmp/ha-config' expected = { 'latitude': None, 'longitude': None, CONF_UNIT_SYSTEM: METRIC_SYSTEM.as_dict(), 'location_name': None, 'time_zone': 'UTC', 'components': [], 'config_dir': '/tmp/ha-config', 'version': __version__, } self.assertEqual(expected, self.config.as_dict()) class TestWorkerPool(unittest.TestCase): """Test WorkerPool methods.""" def test_exception_during_job(self): """Test exception during a job.""" pool = ha.create_worker_pool(1) def malicious_job(_): raise Exception("Test breaking worker pool") calls = [] def register_call(_): calls.append(1) pool.add_job(ha.JobPriority.EVENT_DEFAULT, (malicious_job, None)) pool.add_job(ha.JobPriority.EVENT_DEFAULT, (register_call, None)) pool.block_till_done() self.assertEqual(1, len(calls)) class TestWorkerPoolMonitor(object): """Test monitor_worker_pool.""" @patch('homeassistant.core._LOGGER.warning') def test_worker_pool_monitor(self, mock_warning, event_loop): """Test we log an error and increase threshold.""" hass = MagicMock() hass.pool.worker_count = 3 schedule_handle = MagicMock() hass.loop.call_later.return_value = schedule_handle ha._async_monitor_worker_pool(hass) assert hass.loop.call_later.called assert hass.bus.async_listen_once.called assert not schedule_handle.called check_threshold = hass.loop.call_later.mock_calls[0][1][1] hass.pool.queue_size = 8 check_threshold() assert not mock_warning.called hass.pool.queue_size = 9 check_threshold() assert mock_warning.called mock_warning.reset_mock() assert not mock_warning.called check_threshold() assert not mock_warning.called hass.pool.queue_size = 17 check_threshold() assert not mock_warning.called hass.pool.queue_size = 18 check_threshold() assert mock_warning.called hass.bus.async_listen_once.mock_calls[0][1][1](None) assert schedule_handle.cancel.called class TestAsyncCreateTimer(object): """Test create timer.""" @patch('homeassistant.core.asyncio.Event') @patch('homeassistant.core.dt_util.utcnow') def test_create_timer(self, mock_utcnow, mock_event, event_loop): """Test create timer fires correctly.""" hass = MagicMock() now = mock_utcnow() event = mock_event() now.second = 1 mock_utcnow.reset_mock() ha._async_create_timer(hass) assert len(hass.bus.async_listen_once.mock_calls) == 2 start_timer = hass.bus.async_listen_once.mock_calls[1][1][1] event_loop.run_until_complete(start_timer(None)) assert hass.loop.create_task.called timer = hass.loop.create_task.mock_calls[0][1][0] event.is_set.side_effect = False, False, True event_loop.run_until_complete(timer) assert len(mock_utcnow.mock_calls) == 1 assert hass.loop.call_soon.called event_type, event_data = hass.loop.call_soon.mock_calls[0][1][1:] assert ha.EVENT_TIME_CHANGED == event_type assert {ha.ATTR_NOW: now} == event_data stop_timer = hass.bus.async_listen_once.mock_calls[0][1][1] stop_timer(None) assert event.set.called