core/tests/common.py

626 lines
18 KiB
Python
Raw Normal View History

2016-03-09 09:25:50 +00:00
"""Test the helper method for writing tests."""
import asyncio
import functools as ft
import os
import sys
from unittest.mock import patch, MagicMock, Mock
from io import StringIO
import logging
import threading
from contextlib import contextmanager
from aiohttp import web
2015-09-01 07:18:26 +00:00
from homeassistant import core as ha, loader
2017-06-04 01:51:29 +00:00
from homeassistant.setup import setup_component, async_setup_component
from homeassistant.config import async_process_component_config
from homeassistant.helpers import (
intent, dispatcher, entity, restore_state, entity_registry,
entity_platform)
2016-08-09 03:42:25 +00:00
from homeassistant.util.unit_system import METRIC_SYSTEM
import homeassistant.util.dt as date_util
import homeassistant.util.yaml as yaml
2015-04-30 05:26:54 +00:00
from homeassistant.const import (
2015-05-01 04:03:01 +00:00
STATE_ON, STATE_OFF, DEVICE_DEFAULT_NAME, EVENT_TIME_CHANGED,
2015-09-12 16:15:28 +00:00
EVENT_STATE_CHANGED, EVENT_PLATFORM_DISCOVERED, ATTR_SERVICE,
ATTR_DISCOVERED, SERVER_PORT, EVENT_HOMEASSISTANT_CLOSE)
from homeassistant.components import mqtt, recorder
from homeassistant.components.http.auth import auth_middleware
from homeassistant.components.http.const import (
KEY_USE_X_FORWARDED_FOR, KEY_BANS_ENABLED, KEY_TRUSTED_NETWORKS)
from homeassistant.util.async import (
run_callback_threadsafe, run_coroutine_threadsafe)
2014-11-25 08:20:36 +00:00
_TEST_INSTANCE_PORT = SERVER_PORT
_LOGGER = logging.getLogger(__name__)
INSTANCES = []
2014-11-25 08:20:36 +00:00
def threadsafe_callback_factory(func):
"""Create threadsafe functions out of callbacks.
Callback needs to have `hass` as first argument.
"""
@ft.wraps(func)
def threadsafe(*args, **kwargs):
"""Call func threadsafe."""
hass = args[0]
2017-06-04 01:51:29 +00:00
return run_callback_threadsafe(
hass.loop, ft.partial(func, *args, **kwargs)).result()
return threadsafe
2017-06-04 01:51:29 +00:00
def threadsafe_coroutine_factory(func):
"""Create threadsafe functions out of coroutine.
Callback needs to have `hass` as first argument.
"""
@ft.wraps(func)
def threadsafe(*args, **kwargs):
"""Call func threadsafe."""
hass = args[0]
return run_coroutine_threadsafe(
func(*args, **kwargs), hass.loop).result()
return threadsafe
def get_test_config_dir(*add_path):
2016-03-09 09:25:50 +00:00
"""Return a path to a test config dir."""
return os.path.join(os.path.dirname(__file__), 'testing_config', *add_path)
def get_test_home_assistant():
"""Return a Home Assistant object pointing at test config directory."""
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
else:
loop = asyncio.new_event_loop()
hass = loop.run_until_complete(async_test_home_assistant(loop))
2015-05-01 04:03:01 +00:00
stop_event = threading.Event()
def run_loop():
"""Run event loop."""
# pylint: disable=protected-access
loop._thread_ident = threading.get_ident()
loop.run_forever()
stop_event.set()
orig_stop = hass.stop
def start_hass(*mocks):
"""Start hass."""
run_coroutine_threadsafe(hass.async_start(), loop=hass.loop).result()
def stop_hass():
"""Stop hass."""
orig_stop()
stop_event.wait()
2017-03-07 09:11:41 +00:00
loop.close()
hass.start = start_hass
hass.stop = stop_hass
threading.Thread(name="LoopThread", target=run_loop, daemon=False).start()
return hass
# pylint: disable=protected-access
@asyncio.coroutine
def async_test_home_assistant(loop):
"""Return a Home Assistant object pointing at test config dir."""
hass = ha.HomeAssistant(loop)
INSTANCES.append(hass)
orig_async_add_job = hass.async_add_job
def async_add_job(target, *args):
"""Add a magic mock."""
if isinstance(target, Mock):
return mock_coro(target(*args))
return orig_async_add_job(target, *args)
hass.async_add_job = async_add_job
hass.config.location_name = 'test home'
hass.config.config_dir = get_test_config_dir()
hass.config.latitude = 32.87336
hass.config.longitude = -117.22743
hass.config.elevation = 0
hass.config.time_zone = date_util.get_time_zone('US/Pacific')
hass.config.units = METRIC_SYSTEM
hass.config.skip_pip = True
if 'custom_components.test' not in loader.AVAILABLE_COMPONENTS:
yield from loop.run_in_executor(None, loader.prepare, hass)
hass.state = ha.CoreState.running
2016-11-03 02:16:59 +00:00
# Mock async_start
orig_start = hass.async_start
@asyncio.coroutine
def mock_async_start():
"""Start the mocking."""
# We only mock time during tests and we want to track tasks
with patch('homeassistant.core._async_create_timer'), \
patch.object(hass, 'async_stop_track_tasks'):
2016-11-03 02:16:59 +00:00
yield from orig_start()
hass.async_start = mock_async_start
@ha.callback
def clear_instance(event):
"""Clear global instance."""
INSTANCES.remove(hass)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_CLOSE, clear_instance)
return hass
def get_test_instance_port():
"""Return unused port for running test instance.
The socket that holds the default port does not get released when we stop
HA in a different test case. Until I have figured out what is going on,
let's run each test on a different port.
"""
global _TEST_INSTANCE_PORT
_TEST_INSTANCE_PORT += 1
return _TEST_INSTANCE_PORT
2017-06-25 17:53:15 +00:00
@ha.callback
def async_mock_service(hass, domain, service, schema=None):
"""Set up a fake service & return a calls log list to this service."""
calls = []
@asyncio.coroutine
def mock_service_log(call): # pylint: disable=unnecessary-lambda
"""Mock service call."""
calls.append(call)
hass.services.async_register(
domain, service, mock_service_log, schema=schema)
return calls
2017-06-25 17:53:15 +00:00
mock_service = threadsafe_callback_factory(async_mock_service)
@ha.callback
def async_mock_intent(hass, intent_typ):
"""Set up a fake intent handler."""
intents = []
class MockIntentHandler(intent.IntentHandler):
intent_type = intent_typ
@asyncio.coroutine
def async_handle(self, intent):
"""Handle the intent."""
intents.append(intent)
return intent.create_response()
intent.async_register(hass, MockIntentHandler())
return intents
@ha.callback
def async_fire_mqtt_message(hass, topic, payload, qos=0):
2016-03-09 09:25:50 +00:00
"""Fire the MQTT message."""
if isinstance(payload, str):
payload = payload.encode('utf-8')
dispatcher.async_dispatcher_send(
hass, mqtt.SIGNAL_MQTT_MESSAGE_RECEIVED, topic,
payload, qos)
2015-08-11 06:11:46 +00:00
fire_mqtt_message = threadsafe_callback_factory(async_fire_mqtt_message)
@ha.callback
def async_fire_time_changed(hass, time):
2016-03-09 09:25:50 +00:00
"""Fire a time changes event."""
hass.bus.async_fire(EVENT_TIME_CHANGED, {'now': time})
fire_time_changed = threadsafe_callback_factory(async_fire_time_changed)
2015-08-03 15:57:12 +00:00
2015-09-12 16:15:28 +00:00
def fire_service_discovered(hass, service, info):
2016-03-09 09:25:50 +00:00
"""Fire the MQTT message."""
2015-09-12 16:15:28 +00:00
hass.bus.fire(EVENT_PLATFORM_DISCOVERED, {
ATTR_SERVICE: service,
ATTR_DISCOVERED: info
})
2015-04-30 05:26:54 +00:00
def load_fixture(filename):
"""Load a fixture."""
path = os.path.join(os.path.dirname(__file__), 'fixtures', filename)
with open(path) as fptr:
return fptr.read()
2015-05-01 04:03:01 +00:00
def mock_state_change_event(hass, new_state, old_state=None):
2016-03-09 10:15:04 +00:00
"""Mock state change envent."""
2015-05-01 04:03:01 +00:00
event_data = {
'entity_id': new_state.entity_id,
'new_state': new_state,
}
if old_state:
event_data['old_state'] = old_state
hass.bus.fire(EVENT_STATE_CHANGED, event_data)
def mock_http_component(hass, api_password=None):
2016-03-09 09:25:50 +00:00
"""Mock the HTTP component."""
hass.http = MagicMock(api_password=api_password)
mock_component(hass, 'http')
hass.http.views = {}
def mock_register_view(view):
"""Store registered view."""
if isinstance(view, type):
# Instantiate the view, if needed
view = view()
hass.http.views[view.name] = view
hass.http.register_view = mock_register_view
2015-07-11 07:02:52 +00:00
def mock_http_component_app(hass, api_password=None):
"""Create an aiohttp.web.Application instance for testing."""
if 'http' not in hass.config.components:
mock_http_component(hass, api_password)
app = web.Application(middlewares=[auth_middleware])
app['hass'] = hass
app[KEY_USE_X_FORWARDED_FOR] = False
app[KEY_BANS_ENABLED] = False
app[KEY_TRUSTED_NETWORKS] = []
return app
2017-06-04 01:51:29 +00:00
@asyncio.coroutine
def async_mock_mqtt_component(hass):
2016-03-09 09:25:50 +00:00
"""Mock the MQTT component."""
with patch('homeassistant.components.mqtt.MQTT') as mock_mqtt:
mock_mqtt().async_connect.return_value = mock_coro(True)
2017-06-04 01:51:29 +00:00
yield from async_setup_component(hass, mqtt.DOMAIN, {
mqtt.DOMAIN: {
mqtt.CONF_BROKER: 'mock-broker',
}
})
return mock_mqtt
2015-08-11 06:11:46 +00:00
2017-06-04 01:51:29 +00:00
mock_mqtt_component = threadsafe_coroutine_factory(async_mock_mqtt_component)
@ha.callback
def mock_component(hass, component):
"""Mock a component is setup."""
if component in hass.config.components:
AssertionError("Component {} is already setup".format(component))
hass.config.components.add(component)
def mock_registry(hass, mock_entries=None):
"""Mock the Entity Registry."""
registry = entity_registry.EntityRegistry(hass)
registry.entities = mock_entries or {}
hass.data[entity_platform.DATA_REGISTRY] = registry
return registry
class MockModule(object):
2016-03-09 10:15:04 +00:00
"""Representation of a fake module."""
# pylint: disable=invalid-name
def __init__(self, domain=None, dependencies=None, setup=None,
requirements=None, config_schema=None, platform_schema=None,
async_setup=None):
2016-03-09 10:15:04 +00:00
"""Initialize the mock module."""
self.DOMAIN = domain
self.DEPENDENCIES = dependencies or []
self.REQUIREMENTS = requirements or []
self._setup = setup
if config_schema is not None:
self.CONFIG_SCHEMA = config_schema
if platform_schema is not None:
self.PLATFORM_SCHEMA = platform_schema
if async_setup is not None:
self.async_setup = async_setup
def setup(self, hass, config):
"""Set up the component.
We always define this mock because MagicMock setups will be seen by the
executor as a coroutine, raising an exception.
"""
if self._setup is not None:
return self._setup(hass, config)
return True
2016-01-31 02:55:52 +00:00
class MockPlatform(object):
2016-03-09 09:25:50 +00:00
"""Provide a fake platform."""
2016-01-31 02:55:52 +00:00
# pylint: disable=invalid-name
def __init__(self, setup_platform=None, dependencies=None,
platform_schema=None, async_setup_platform=None):
2016-03-09 09:25:50 +00:00
"""Initialize the platform."""
self.DEPENDENCIES = dependencies or []
2016-01-31 02:55:52 +00:00
self._setup_platform = setup_platform
if platform_schema is not None:
self.PLATFORM_SCHEMA = platform_schema
if async_setup_platform is not None:
self.async_setup_platform = async_setup_platform
2016-01-31 02:55:52 +00:00
def setup_platform(self, hass, config, add_devices, discovery_info=None):
"""Set up the platform."""
2016-01-31 02:55:52 +00:00
if self._setup_platform is not None:
self._setup_platform(hass, config, add_devices, discovery_info)
class MockToggleDevice(entity.ToggleEntity):
2016-03-09 09:25:50 +00:00
"""Provide a mock toggle device."""
2016-03-09 10:15:04 +00:00
2014-11-25 08:20:36 +00:00
def __init__(self, name, state):
2016-03-09 10:15:04 +00:00
"""Initialize the mock device."""
self._name = name or DEVICE_DEFAULT_NAME
self._state = state
2014-11-26 05:28:43 +00:00
self.calls = []
2014-11-25 08:20:36 +00:00
@property
def name(self):
2016-03-09 09:25:50 +00:00
"""Return the name of the device if any."""
self.calls.append(('name', {}))
return self._name
@property
def state(self):
2016-03-09 10:15:04 +00:00
"""Return the name of the device if any."""
self.calls.append(('state', {}))
return self._state
@property
def is_on(self):
2016-03-09 09:25:50 +00:00
"""Return true if device is on."""
self.calls.append(('is_on', {}))
return self._state == STATE_ON
2014-11-25 08:20:36 +00:00
def turn_on(self, **kwargs):
2016-03-09 09:25:50 +00:00
"""Turn the device on."""
2014-11-26 05:28:43 +00:00
self.calls.append(('turn_on', kwargs))
self._state = STATE_ON
2014-11-25 08:20:36 +00:00
def turn_off(self, **kwargs):
2016-03-09 09:25:50 +00:00
"""Turn the device off."""
2014-11-26 05:28:43 +00:00
self.calls.append(('turn_off', kwargs))
self._state = STATE_OFF
2014-11-25 08:20:36 +00:00
2014-11-26 05:28:43 +00:00
def last_call(self, method=None):
2016-03-09 09:25:50 +00:00
"""Return the last call."""
if not self.calls:
return None
elif method is None:
2014-11-26 05:28:43 +00:00
return self.calls[-1]
else:
try:
return next(call for call in reversed(self.calls)
if call[0] == method)
except StopIteration:
return None
def patch_yaml_files(files_dict, endswith=True):
"""Patch load_yaml with a dictionary of yaml files."""
# match using endswith, start search with longest string
matchlist = sorted(list(files_dict.keys()), key=len) if endswith else []
def mock_open_f(fname, **_):
"""Mock open() in the yaml module, used by load_yaml."""
# Return the mocked file on full match
if fname in files_dict:
_LOGGER.debug("patch_yaml_files match %s", fname)
res = StringIO(files_dict[fname])
setattr(res, 'name', fname)
return res
# Match using endswith
for ends in matchlist:
if fname.endswith(ends):
_LOGGER.debug("patch_yaml_files end match %s: %s", ends, fname)
res = StringIO(files_dict[ends])
setattr(res, 'name', fname)
return res
# Fallback for hass.components (i.e. services.yaml)
if 'homeassistant/components' in fname:
_LOGGER.debug("patch_yaml_files using real file: %s", fname)
return open(fname, encoding='utf-8')
# Not found
raise FileNotFoundError("File not found: {}".format(fname))
return patch.object(yaml, 'open', mock_open_f, create=True)
def mock_coro(return_value=None):
"""Return a coro that returns a value."""
return mock_coro_func(return_value)()
def mock_coro_func(return_value=None):
"""Return a method to create a coro function that returns a value."""
@asyncio.coroutine
def coro(*args, **kwargs):
"""Fake coroutine."""
return return_value
return coro
@contextmanager
def assert_setup_component(count, domain=None):
"""Collect valid configuration from setup_component.
- count: The amount of valid platforms that should be setup
- domain: The domain to count is optional. It can be automatically
determined most of the time
Use as a context manager around setup.setup_component
with assert_setup_component(0) as result_config:
setup_component(hass, domain, start_config)
# using result_config is optional
"""
config = {}
@ha.callback
def mock_psc(hass, config_input, domain):
"""Mock the prepare_setup_component to capture config."""
res = async_process_component_config(
hass, config_input, domain)
config[domain] = None if res is None else res.get(domain)
_LOGGER.debug("Configuration for %s, Validated: %s, Original %s",
domain, config[domain], config_input.get(domain))
return res
assert isinstance(config, dict)
with patch('homeassistant.config.async_process_component_config',
mock_psc):
yield config
if domain is None:
assert len(config) == 1, ('assert_setup_component requires DOMAIN: {}'
.format(list(config.keys())))
domain = list(config.keys())[0]
res = config.get(domain)
res_len = 0 if res is None else len(res)
assert res_len == count, 'setup_component failed, expected {} got {}: {}' \
.format(count, res_len, res)
def init_recorder_component(hass, add_config=None):
"""Initialize the recorder."""
config = dict(add_config) if add_config else {}
config[recorder.CONF_DB_URL] = 'sqlite://' # In memory DB
with patch('homeassistant.components.recorder.migration.migrate_schema'):
assert setup_component(hass, recorder.DOMAIN,
{recorder.DOMAIN: config})
assert recorder.DOMAIN in hass.config.components
_LOGGER.info("In-memory recorder successfully started")
def mock_restore_cache(hass, states):
"""Mock the DATA_RESTORE_CACHE."""
key = restore_state.DATA_RESTORE_CACHE
hass.data[key] = {
state.entity_id: state for state in states}
_LOGGER.debug('Restore cache: %s', hass.data[key])
assert len(hass.data[key]) == len(states), \
"Duplicate entity_id? {}".format(states)
hass.state = ha.CoreState.starting
mock_component(hass, recorder.DOMAIN)
class MockDependency:
"""Decorator to mock install a dependency."""
def __init__(self, root, *args):
"""Initialize decorator."""
self.root = root
self.submodules = args
def __enter__(self):
"""Start mocking."""
def resolve(mock, path):
"""Resolve a mock."""
if not path:
return mock
return resolve(getattr(mock, path[0]), path[1:])
base = MagicMock()
to_mock = {
"{}.{}".format(self.root, tom): resolve(base, tom.split('.'))
for tom in self.submodules
}
to_mock[self.root] = base
self.patcher = patch.dict('sys.modules', to_mock)
self.patcher.start()
return base
def __exit__(self, *exc):
"""Stop mocking."""
self.patcher.stop()
return False
def __call__(self, func):
"""Apply decorator."""
def run_mocked(*args, **kwargs):
"""Run with mocked dependencies."""
with self as base:
args = list(args) + [base]
func(*args, **kwargs)
return run_mocked
class MockEntity(entity.Entity):
"""Mock Entity class."""
def __init__(self, **values):
"""Initialize an entity."""
self._values = values
if 'entity_id' in values:
self.entity_id = values['entity_id']
@property
def name(self):
"""Return the name of the entity."""
return self._handle('name')
@property
def should_poll(self):
"""Return the ste of the polling."""
return self._handle('should_poll')
@property
def unique_id(self):
"""Return the unique ID of the entity."""
return self._handle('unique_id')
@property
def available(self):
"""Return True if entity is available."""
return self._handle('available')
def _handle(self, attr):
"""Helper for the attributes."""
if attr in self._values:
return self._values[attr]
return getattr(super(), attr)