core/tests/test_runner.py

166 lines
5.4 KiB
Python

"""Test the runner."""
import asyncio
import threading
from unittest.mock import patch
import py
import pytest
from homeassistant import core, runner
from homeassistant.core import HomeAssistant
from homeassistant.util import executor, thread
# https://github.com/home-assistant/supervisor/blob/main/supervisor/docker/homeassistant.py
SUPERVISOR_HARD_TIMEOUT = 220
TIMEOUT_SAFETY_MARGIN = 10
async def test_cumulative_shutdown_timeout_less_than_supervisor() -> None:
"""Verify the cumulative shutdown timeout is at least 10s less than the supervisor."""
assert (
core.STAGE_1_SHUTDOWN_TIMEOUT
+ core.STAGE_2_SHUTDOWN_TIMEOUT
+ core.STAGE_3_SHUTDOWN_TIMEOUT
+ executor.EXECUTOR_SHUTDOWN_TIMEOUT
+ thread.THREADING_SHUTDOWN_TIMEOUT
+ TIMEOUT_SAFETY_MARGIN
<= SUPERVISOR_HARD_TIMEOUT
)
async def test_setup_and_run_hass(hass: HomeAssistant, tmpdir: py.path.local) -> None:
"""Test we can setup and run."""
test_dir = tmpdir.mkdir("config")
default_config = runner.RuntimeConfig(test_dir)
with patch("homeassistant.bootstrap.async_setup_hass", return_value=hass), patch(
"threading._shutdown"
), patch("homeassistant.core.HomeAssistant.async_run") as mock_run:
await runner.setup_and_run_hass(default_config)
assert threading._shutdown == thread.deadlock_safe_shutdown
assert mock_run.called
def test_run(hass: HomeAssistant, tmpdir: py.path.local) -> None:
"""Test we can run."""
test_dir = tmpdir.mkdir("config")
default_config = runner.RuntimeConfig(test_dir)
with patch.object(runner, "TASK_CANCELATION_TIMEOUT", 1), patch(
"homeassistant.bootstrap.async_setup_hass", return_value=hass
), patch("threading._shutdown"), patch(
"homeassistant.core.HomeAssistant.async_run"
) as mock_run:
runner.run(default_config)
assert mock_run.called
def test_run_executor_shutdown_throws(
hass: HomeAssistant, tmpdir: py.path.local
) -> None:
"""Test we can run and we still shutdown if the executor shutdown throws."""
test_dir = tmpdir.mkdir("config")
default_config = runner.RuntimeConfig(test_dir)
with patch.object(runner, "TASK_CANCELATION_TIMEOUT", 1), pytest.raises(
RuntimeError
), patch("homeassistant.bootstrap.async_setup_hass", return_value=hass), patch(
"threading._shutdown"
), patch(
"homeassistant.runner.InterruptibleThreadPoolExecutor.shutdown",
side_effect=RuntimeError,
) as mock_shutdown, patch(
"homeassistant.core.HomeAssistant.async_run"
) as mock_run:
runner.run(default_config)
assert mock_shutdown.called
assert mock_run.called
def test_run_does_not_block_forever_with_shielded_task(
hass: HomeAssistant, tmpdir: py.path.local, caplog: pytest.LogCaptureFixture
) -> None:
"""Test we can shutdown and not block forever."""
test_dir = tmpdir.mkdir("config")
default_config = runner.RuntimeConfig(test_dir)
tasks = []
async def _async_create_tasks(*_):
async def async_raise(*_):
try:
await asyncio.sleep(2)
except asyncio.CancelledError:
raise Exception
async def async_shielded(*_):
try:
await asyncio.sleep(2)
except asyncio.CancelledError:
await asyncio.sleep(2)
tasks.append(asyncio.ensure_future(asyncio.shield(async_shielded())))
tasks.append(asyncio.ensure_future(asyncio.sleep(2)))
tasks.append(asyncio.ensure_future(async_raise()))
await asyncio.sleep(0.1)
return 0
with patch.object(runner, "TASK_CANCELATION_TIMEOUT", 1), patch(
"homeassistant.bootstrap.async_setup_hass", return_value=hass
), patch("threading._shutdown"), patch(
"homeassistant.core.HomeAssistant.async_run", _async_create_tasks
):
runner.run(default_config)
assert len(tasks) == 3
assert (
"Task could not be canceled and was still running after shutdown" in caplog.text
)
async def test_unhandled_exception_traceback(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test an unhandled exception gets a traceback in debug mode."""
raised = asyncio.Event()
async def _unhandled_exception():
raised.set()
raise Exception("This is unhandled")
try:
hass.loop.set_debug(True)
task = asyncio.create_task(_unhandled_exception())
await raised.wait()
# Delete it without checking result to trigger unhandled exception
del task
finally:
hass.loop.set_debug(False)
assert "Task exception was never retrieved" in caplog.text
assert "This is unhandled" in caplog.text
assert "_unhandled_exception" in caplog.text
def test__enable_posix_spawn() -> None:
"""Test that we can enable posix_spawn on Alpine."""
def _mock_alpine_exists(path):
return path == "/etc/alpine-release"
with patch.object(runner.subprocess, "_USE_POSIX_SPAWN", False), patch.object(
runner.os.path, "exists", _mock_alpine_exists
):
runner._enable_posix_spawn()
assert runner.subprocess._USE_POSIX_SPAWN is True
with patch.object(runner.subprocess, "_USE_POSIX_SPAWN", False), patch.object(
runner.os.path, "exists", return_value=False
):
runner._enable_posix_spawn()
assert runner.subprocess._USE_POSIX_SPAWN is False