Core Async improvements (#4087)
* Clean up HomeAssistant.start * Add missing pieces to remote HA constructor * Make HomeAssistant constructor async safe * Code cleanup * Init websession lazypull/4038/merge
parent
9d836a115a
commit
5a2b4a5376
|
@ -143,69 +143,33 @@ class HomeAssistant(object):
|
|||
self.config = Config() # type: Config
|
||||
self.state = CoreState.not_running
|
||||
self.exit_code = None
|
||||
self.websession = aiohttp.ClientSession(loop=self.loop)
|
||||
self._websession = None
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
"""Return if Home Assistant is running."""
|
||||
return self.state in (CoreState.starting, CoreState.running)
|
||||
|
||||
@property
|
||||
def websession(self):
|
||||
"""Return an aiohttp session to make web requests."""
|
||||
if self._websession is None:
|
||||
self._websession = aiohttp.ClientSession(loop=self.loop)
|
||||
|
||||
return self._websession
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start home assistant."""
|
||||
_LOGGER.info(
|
||||
"Starting Home Assistant (%d threads)", self.pool.worker_count)
|
||||
self.state = CoreState.starting
|
||||
|
||||
# Register the async start
|
||||
self.loop.create_task(self.async_start())
|
||||
|
||||
@callback
|
||||
def stop_homeassistant(*args):
|
||||
"""Stop Home Assistant."""
|
||||
self.exit_code = 0
|
||||
self.async_add_job(self.async_stop)
|
||||
|
||||
@callback
|
||||
def restart_homeassistant(*args):
|
||||
"""Restart Home Assistant."""
|
||||
self.exit_code = RESTART_EXIT_CODE
|
||||
self.async_add_job(self.async_stop)
|
||||
|
||||
# Register the restart/stop event
|
||||
self.loop.call_soon(
|
||||
self.services.async_register,
|
||||
DOMAIN, SERVICE_HOMEASSISTANT_STOP, stop_homeassistant
|
||||
)
|
||||
self.loop.call_soon(
|
||||
self.services.async_register,
|
||||
DOMAIN, SERVICE_HOMEASSISTANT_RESTART, restart_homeassistant
|
||||
)
|
||||
|
||||
# Setup signal handling
|
||||
if sys.platform != 'win32':
|
||||
try:
|
||||
self.loop.add_signal_handler(
|
||||
signal.SIGTERM,
|
||||
stop_homeassistant
|
||||
)
|
||||
except ValueError:
|
||||
_LOGGER.warning('Could not bind to SIGTERM.')
|
||||
|
||||
try:
|
||||
self.loop.add_signal_handler(
|
||||
signal.SIGHUP,
|
||||
restart_homeassistant
|
||||
)
|
||||
except ValueError:
|
||||
_LOGGER.warning('Could not bind to SIGHUP.')
|
||||
|
||||
# Run forever and catch keyboard interrupt
|
||||
try:
|
||||
# Block until stopped
|
||||
_LOGGER.info("Starting Home Assistant core loop")
|
||||
self.loop.run_forever()
|
||||
except KeyboardInterrupt:
|
||||
self.loop.call_soon(stop_homeassistant)
|
||||
self.loop.call_soon(self._async_stop_handler)
|
||||
self.loop.run_forever()
|
||||
finally:
|
||||
self.loop.close()
|
||||
|
@ -216,6 +180,31 @@ class HomeAssistant(object):
|
|||
|
||||
This method is a coroutine.
|
||||
"""
|
||||
_LOGGER.info(
|
||||
"Starting Home Assistant (%d threads)", self.pool.worker_count)
|
||||
|
||||
self.state = CoreState.starting
|
||||
|
||||
# Register the restart/stop event
|
||||
self.services.async_register(
|
||||
DOMAIN, SERVICE_HOMEASSISTANT_STOP, self._async_stop_handler)
|
||||
self.services.async_register(
|
||||
DOMAIN, SERVICE_HOMEASSISTANT_RESTART, self._async_restart_handler)
|
||||
|
||||
# Setup signal handling
|
||||
if sys.platform != 'win32':
|
||||
try:
|
||||
self.loop.add_signal_handler(
|
||||
signal.SIGTERM, self._async_stop_handler)
|
||||
except ValueError:
|
||||
_LOGGER.warning('Could not bind to SIGTERM.')
|
||||
|
||||
try:
|
||||
self.loop.add_signal_handler(
|
||||
signal.SIGHUP, self._async_restart_handler)
|
||||
except ValueError:
|
||||
_LOGGER.warning('Could not bind to SIGHUP.')
|
||||
|
||||
# pylint: disable=protected-access
|
||||
self.loop._thread_ident = threading.get_ident()
|
||||
_async_create_timer(self)
|
||||
|
@ -301,10 +290,7 @@ class HomeAssistant(object):
|
|||
# sleep in the loop executor, this forces execution back into
|
||||
# the event loop to avoid the block thread from starving the
|
||||
# async loop
|
||||
run_coroutine_threadsafe(
|
||||
sleep_wait(),
|
||||
self.loop
|
||||
).result()
|
||||
run_coroutine_threadsafe(sleep_wait(), self.loop).result()
|
||||
|
||||
complete.set()
|
||||
|
||||
|
@ -326,10 +312,13 @@ class HomeAssistant(object):
|
|||
yield from self.loop.run_in_executor(None, self.pool.block_till_done)
|
||||
yield from self.loop.run_in_executor(None, self.pool.stop)
|
||||
self.executor.shutdown()
|
||||
if self._websession is not None:
|
||||
yield from self._websession.close()
|
||||
self.state = CoreState.not_running
|
||||
self.loop.stop()
|
||||
|
||||
# pylint: disable=no-self-use
|
||||
@callback
|
||||
def _async_exception_handler(self, loop, context):
|
||||
"""Handle all exception inside the core loop."""
|
||||
message = context.get('message')
|
||||
|
@ -348,6 +337,18 @@ class HomeAssistant(object):
|
|||
exc_info=exc_info
|
||||
)
|
||||
|
||||
@callback
|
||||
def _async_stop_handler(self, *args):
|
||||
"""Stop Home Assistant."""
|
||||
self.exit_code = 0
|
||||
self.async_add_job(self.async_stop)
|
||||
|
||||
@callback
|
||||
def _async_restart_handler(self, *args):
|
||||
"""Restart Home Assistant."""
|
||||
self.exit_code = RESTART_EXIT_CODE
|
||||
self.async_add_job(self.async_stop)
|
||||
|
||||
|
||||
class EventOrigin(enum.Enum):
|
||||
"""Represent the origin of an event."""
|
||||
|
@ -877,10 +878,7 @@ class ServiceRegistry(object):
|
|||
self._bus = bus
|
||||
self._loop = loop
|
||||
self._cur_id = 0
|
||||
run_callback_threadsafe(
|
||||
loop,
|
||||
bus.async_listen, EVENT_CALL_SERVICE, self._event_to_service_call,
|
||||
)
|
||||
self._async_unsub_call_event = None
|
||||
|
||||
@property
|
||||
def services(self):
|
||||
|
@ -947,6 +945,10 @@ class ServiceRegistry(object):
|
|||
else:
|
||||
self._services[domain] = {service: service_obj}
|
||||
|
||||
if self._async_unsub_call_event is None:
|
||||
self._async_unsub_call_event = self._bus.async_listen(
|
||||
EVENT_CALL_SERVICE, self._event_to_service_call)
|
||||
|
||||
self._bus.async_fire(
|
||||
EVENT_SERVICE_REGISTERED,
|
||||
{ATTR_DOMAIN: domain, ATTR_SERVICE: service}
|
||||
|
|
|
@ -8,6 +8,7 @@ For more details about the Python API, please refer to the documentation at
|
|||
https://home-assistant.io/developers/python_api/
|
||||
"""
|
||||
import asyncio
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from datetime import datetime
|
||||
import enum
|
||||
import json
|
||||
|
@ -124,14 +125,18 @@ class HomeAssistant(ha.HomeAssistant):
|
|||
self.remote_api = remote_api
|
||||
|
||||
self.loop = loop or asyncio.get_event_loop()
|
||||
self.executor = ThreadPoolExecutor(max_workers=5)
|
||||
self.loop.set_default_executor(self.executor)
|
||||
self.loop.set_exception_handler(self._async_exception_handler)
|
||||
self.pool = ha.create_worker_pool()
|
||||
|
||||
self.bus = EventBus(remote_api, self)
|
||||
self.services = ha.ServiceRegistry(self.bus, self.add_job, self.loop)
|
||||
self.states = StateMachine(self.bus, self.loop, self.remote_api)
|
||||
self.config = ha.Config()
|
||||
self.state = ha.CoreState.not_running
|
||||
self._websession = None
|
||||
|
||||
self.state = ha.CoreState.not_running
|
||||
self.config.api = local_api
|
||||
|
||||
def start(self):
|
||||
|
|
|
@ -92,26 +92,21 @@ def async_test_home_assistant(loop):
|
|||
"""Return a Home Assistant object pointing at test config dir."""
|
||||
loop._thread_ident = threading.get_ident()
|
||||
|
||||
def get_hass():
|
||||
"""Temp while we migrate core HASS over to be async constructors."""
|
||||
hass = ha.HomeAssistant(loop)
|
||||
hass = ha.HomeAssistant(loop)
|
||||
|
||||
hass.config.location_name = 'test home'
|
||||
hass.config.config_dir = get_test_config_dir()
|
||||
hass.config.latitude = 32.87336
|
||||
hass.config.longitude = -117.22743
|
||||
hass.config.elevation = 0
|
||||
hass.config.time_zone = date_util.get_time_zone('US/Pacific')
|
||||
hass.config.units = METRIC_SYSTEM
|
||||
hass.config.skip_pip = True
|
||||
hass.config.location_name = 'test home'
|
||||
hass.config.config_dir = get_test_config_dir()
|
||||
hass.config.latitude = 32.87336
|
||||
hass.config.longitude = -117.22743
|
||||
hass.config.elevation = 0
|
||||
hass.config.time_zone = date_util.get_time_zone('US/Pacific')
|
||||
hass.config.units = METRIC_SYSTEM
|
||||
hass.config.skip_pip = True
|
||||
|
||||
if 'custom_components.test' not in loader.AVAILABLE_COMPONENTS:
|
||||
loader.prepare(hass)
|
||||
if 'custom_components.test' not in loader.AVAILABLE_COMPONENTS:
|
||||
yield from loop.run_in_executor(None, loader.prepare, hass)
|
||||
|
||||
hass.state = ha.CoreState.running
|
||||
return hass
|
||||
|
||||
hass = yield from loop.run_in_executor(None, get_hass)
|
||||
hass.state = ha.CoreState.running
|
||||
|
||||
return hass
|
||||
|
||||
|
|
Loading…
Reference in New Issue