core/tests/util/test_logging.py

152 lines
4.2 KiB
Python

"""Test Home Assistant logging util methods."""
import asyncio
from functools import partial
import logging
import queue
from unittest.mock import patch
import pytest
from homeassistant.core import (
HomeAssistant,
callback,
is_callback,
is_callback_check_partial,
)
import homeassistant.util.logging as logging_util
async def test_logging_with_queue_handler() -> None:
"""Test logging with HomeAssistantQueueHandler."""
simple_queue = queue.SimpleQueue() # type: ignore
handler = logging_util.HomeAssistantQueueHandler(simple_queue)
log_record = logging.makeLogRecord({"msg": "Test Log Record"})
handler.emit(log_record)
with (
pytest.raises(asyncio.CancelledError),
patch.object(handler, "enqueue", side_effect=asyncio.CancelledError),
):
handler.emit(log_record)
with patch.object(handler, "emit") as emit_mock:
handler.handle(log_record)
emit_mock.assert_called_once()
with (
patch.object(handler, "filter") as filter_mock,
patch.object(handler, "emit") as emit_mock,
):
filter_mock.return_value = False
handler.handle(log_record)
emit_mock.assert_not_called()
with (
patch.object(handler, "enqueue", side_effect=OSError),
patch.object(handler, "handleError") as mock_handle_error,
):
handler.emit(log_record)
mock_handle_error.assert_called_once()
handler.close()
assert simple_queue.get_nowait().msg == "Test Log Record"
assert simple_queue.empty()
async def test_migrate_log_handler(hass: HomeAssistant) -> None:
"""Test migrating log handlers."""
logging_util.async_activate_log_queue_handler(hass)
assert len(logging.root.handlers) == 1
assert isinstance(logging.root.handlers[0], logging_util.HomeAssistantQueueHandler)
# Test that the close hook shuts down the queue handler's thread
listener_thread = logging.root.handlers[0].listener._thread
assert listener_thread.is_alive()
logging.root.handlers[0].close()
assert not listener_thread.is_alive()
@pytest.mark.no_fail_on_log_exception
async def test_async_create_catching_coro(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test exception logging of wrapped coroutine."""
async def job():
raise Exception("This is a bad coroutine")
hass.async_create_task(logging_util.async_create_catching_coro(job()))
await hass.async_block_till_done()
assert "This is a bad coroutine" in caplog.text
assert "in test_async_create_catching_coro" in caplog.text
def test_catch_log_exception() -> None:
"""Test it is still a callback after wrapping including partial."""
async def async_meth():
pass
assert asyncio.iscoroutinefunction(
logging_util.catch_log_exception(partial(async_meth), lambda: None)
)
@callback
def callback_meth():
pass
assert is_callback_check_partial(
logging_util.catch_log_exception(partial(callback_meth), lambda: None)
)
def sync_meth():
pass
wrapped = logging_util.catch_log_exception(partial(sync_meth), lambda: None)
assert not is_callback(wrapped)
assert not asyncio.iscoroutinefunction(wrapped)
@pytest.mark.no_fail_on_log_exception
async def test_catch_log_exception_catches_and_logs() -> None:
"""Test it is still a callback after wrapping including partial."""
saved_args = []
def save_args(*args):
saved_args.append(args)
async def async_meth():
raise ValueError("failure async")
func = logging_util.catch_log_exception(async_meth, save_args)
await func("failure async passed")
assert saved_args == [("failure async passed",)]
saved_args.clear()
@callback
def callback_meth():
raise ValueError("failure callback")
func = logging_util.catch_log_exception(callback_meth, save_args)
func("failure callback passed")
assert saved_args == [("failure callback passed",)]
saved_args.clear()
def sync_meth():
raise ValueError("failure sync")
func = logging_util.catch_log_exception(sync_meth, save_args)
func("failure sync passed")
assert saved_args == [("failure sync passed",)]