Use built in queue log handlers to avoid formatting logs in the event loop (#35633)
* Use built in queue log handlers to avoid formatting logs in the event loop Logging is now formatted and written in another thread to ensure there is minimal impact on the event loop when a log message is processed. This change replaces the existing AsyncHandler log handler as python 3.7+ now offers an off the shelf solution * add a simple test * s/async_migrate_log_handlers_to_queue/async_activate_log_queue_handler/gpull/35700/head
parent
d3ae8a938c
commit
73616520c0
|
@ -14,7 +14,6 @@ import voluptuous as vol
|
|||
from homeassistant import config as conf_util, config_entries, core, loader
|
||||
from homeassistant.components import http
|
||||
from homeassistant.const import (
|
||||
EVENT_HOMEASSISTANT_CLOSE,
|
||||
EVENT_HOMEASSISTANT_STOP,
|
||||
REQUIRED_NEXT_PYTHON_DATE,
|
||||
REQUIRED_NEXT_PYTHON_VER,
|
||||
|
@ -22,7 +21,7 @@ from homeassistant.const import (
|
|||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
from homeassistant.setup import DATA_SETUP, async_setup_component
|
||||
from homeassistant.util.logging import AsyncHandler
|
||||
from homeassistant.util.logging import async_activate_log_queue_handler
|
||||
from homeassistant.util.package import async_get_user_site, is_virtual_env
|
||||
from homeassistant.util.yaml import clear_secret_cache
|
||||
|
||||
|
@ -278,17 +277,8 @@ def async_enable_logging(
|
|||
err_handler.setLevel(logging.INFO if verbose else logging.WARNING)
|
||||
err_handler.setFormatter(logging.Formatter(fmt, datefmt=datefmt))
|
||||
|
||||
async_handler = AsyncHandler(hass.loop, err_handler)
|
||||
|
||||
async def async_stop_async_handler(_: Any) -> None:
|
||||
"""Cleanup async handler."""
|
||||
logging.getLogger("").removeHandler(async_handler) # type: ignore
|
||||
await async_handler.async_close(blocking=True)
|
||||
|
||||
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_CLOSE, async_stop_async_handler)
|
||||
|
||||
logger = logging.getLogger("")
|
||||
logger.addHandler(async_handler) # type: ignore
|
||||
logger.addHandler(err_handler)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
# Save the log file location for access by other components.
|
||||
|
@ -296,6 +286,8 @@ def async_enable_logging(
|
|||
else:
|
||||
_LOGGER.error("Unable to set up error log %s (access denied)", err_log_path)
|
||||
|
||||
async_activate_log_queue_handler(hass)
|
||||
|
||||
|
||||
async def async_mount_local_lib_path(config_dir: str) -> str:
|
||||
"""Add local library to Python Path.
|
||||
|
|
|
@ -1,12 +1,15 @@
|
|||
"""Logging utilities."""
|
||||
import asyncio
|
||||
from asyncio.events import AbstractEventLoop
|
||||
from functools import partial, wraps
|
||||
import inspect
|
||||
import logging
|
||||
import threading
|
||||
import logging.handlers
|
||||
import queue
|
||||
import traceback
|
||||
from typing import Any, Callable, Coroutine, Optional
|
||||
from typing import Any, Callable, Coroutine
|
||||
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_CLOSE
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
|
||||
|
||||
class HideSensitiveDataFilter(logging.Filter):
|
||||
|
@ -24,104 +27,51 @@ class HideSensitiveDataFilter(logging.Filter):
|
|||
return True
|
||||
|
||||
|
||||
class AsyncHandler:
|
||||
"""Logging handler wrapper to add an async layer."""
|
||||
class HomeAssistantQueueHandler(logging.handlers.QueueHandler):
|
||||
"""Process the log in another thread."""
|
||||
|
||||
def __init__(self, loop: AbstractEventLoop, handler: logging.Handler) -> None:
|
||||
"""Initialize async logging handler wrapper."""
|
||||
self.handler = handler
|
||||
self.loop = loop
|
||||
self._queue: asyncio.Queue = asyncio.Queue(loop=loop)
|
||||
self._thread = threading.Thread(target=self._process)
|
||||
|
||||
# Delegate from handler
|
||||
# pylint: disable=invalid-name
|
||||
self.setLevel = handler.setLevel
|
||||
self.setFormatter = handler.setFormatter
|
||||
self.addFilter = handler.addFilter
|
||||
self.removeFilter = handler.removeFilter
|
||||
self.filter = handler.filter
|
||||
self.flush = handler.flush
|
||||
self.handle = handler.handle
|
||||
self.handleError = handler.handleError
|
||||
self.format = handler.format
|
||||
|
||||
self._thread.start()
|
||||
|
||||
def close(self) -> None:
|
||||
"""Wrap close to handler."""
|
||||
self.emit(None)
|
||||
|
||||
async def async_close(self, blocking: bool = False) -> None:
|
||||
"""Close the handler.
|
||||
|
||||
When blocking=True, will wait till closed.
|
||||
"""
|
||||
await self._queue.put(None)
|
||||
|
||||
if blocking:
|
||||
while self._thread.is_alive():
|
||||
await asyncio.sleep(0)
|
||||
|
||||
def emit(self, record: Optional[logging.LogRecord]) -> None:
|
||||
"""Process a record."""
|
||||
ident = self.loop.__dict__.get("_thread_ident")
|
||||
|
||||
# inside eventloop
|
||||
if ident is not None and ident == threading.get_ident():
|
||||
self._queue.put_nowait(record)
|
||||
# from a thread/executor
|
||||
else:
|
||||
self.loop.call_soon_threadsafe(self._queue.put_nowait, record)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""Return the string names."""
|
||||
return str(self.handler)
|
||||
|
||||
def _process(self) -> None:
|
||||
"""Process log in a thread."""
|
||||
def emit(self, record: logging.LogRecord) -> None:
|
||||
"""Emit a log record."""
|
||||
try:
|
||||
while True:
|
||||
record = asyncio.run_coroutine_threadsafe(
|
||||
self._queue.get(), self.loop
|
||||
).result()
|
||||
self.enqueue(record)
|
||||
except asyncio.CancelledError: # pylint: disable=try-except-raise
|
||||
raise
|
||||
except Exception: # pylint: disable=broad-except
|
||||
self.handleError(record)
|
||||
|
||||
if record is None:
|
||||
self.handler.close()
|
||||
return
|
||||
|
||||
self.handler.emit(record)
|
||||
except asyncio.CancelledError:
|
||||
self.handler.close()
|
||||
@callback
|
||||
def async_activate_log_queue_handler(hass: HomeAssistant) -> None:
|
||||
"""
|
||||
Migrate the existing log handlers to use the queue.
|
||||
|
||||
def createLock(self) -> None: # pylint: disable=invalid-name
|
||||
"""Ignore lock stuff."""
|
||||
This allows us to avoid blocking I/O and formatting messages
|
||||
in the event loop as log messages are written in another thread.
|
||||
"""
|
||||
simple_queue = queue.SimpleQueue() # type: ignore
|
||||
queue_handler = HomeAssistantQueueHandler(simple_queue)
|
||||
logging.root.addHandler(queue_handler)
|
||||
|
||||
def acquire(self) -> None:
|
||||
"""Ignore lock stuff."""
|
||||
migrated_handlers = []
|
||||
for handler in logging.root.handlers[:]:
|
||||
if handler is queue_handler:
|
||||
continue
|
||||
logging.root.removeHandler(handler)
|
||||
migrated_handlers.append(handler)
|
||||
|
||||
def release(self) -> None:
|
||||
"""Ignore lock stuff."""
|
||||
listener = logging.handlers.QueueListener(
|
||||
simple_queue, *migrated_handlers, respect_handler_level=False
|
||||
)
|
||||
|
||||
@property
|
||||
def level(self) -> int:
|
||||
"""Wrap property level to handler."""
|
||||
return self.handler.level
|
||||
listener.start()
|
||||
|
||||
@property
|
||||
def formatter(self) -> Optional[logging.Formatter]:
|
||||
"""Wrap property formatter to handler."""
|
||||
return self.handler.formatter
|
||||
@callback
|
||||
def _async_stop_queue_handler(_: Any) -> None:
|
||||
"""Cleanup handler."""
|
||||
logging.root.removeHandler(queue_handler)
|
||||
listener.stop()
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""Wrap property set_name to handler."""
|
||||
return self.handler.get_name() # type: ignore
|
||||
|
||||
@name.setter
|
||||
def name(self, name: str) -> None:
|
||||
"""Wrap property get_name to handler."""
|
||||
self.handler.set_name(name) # type: ignore
|
||||
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_CLOSE, _async_stop_queue_handler)
|
||||
|
||||
|
||||
def log_exception(format_err: Callable[..., Any], *args: Any) -> None:
|
||||
|
|
|
@ -37,6 +37,15 @@ async def test_home_assistant_core_config_validation(hass):
|
|||
assert result is None
|
||||
|
||||
|
||||
async def test_async_enable_logging(hass):
|
||||
"""Test to ensure logging is migrated to the queue handlers."""
|
||||
with patch("logging.getLogger"), patch(
|
||||
"homeassistant.bootstrap.async_activate_log_queue_handler"
|
||||
) as mock_async_activate_log_queue_handler:
|
||||
bootstrap.async_enable_logging(hass)
|
||||
mock_async_activate_log_queue_handler.assert_called_once()
|
||||
|
||||
|
||||
async def test_load_hassio(hass):
|
||||
"""Test that we load Hass.io component."""
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
"""Test Home Assistant logging util methods."""
|
||||
import asyncio
|
||||
import logging
|
||||
import threading
|
||||
import queue
|
||||
|
||||
import pytest
|
||||
|
||||
import homeassistant.util.logging as logging_util
|
||||
|
||||
from tests.async_mock import patch
|
||||
|
||||
|
||||
def test_sensitive_data_filter():
|
||||
"""Test the logging sensitive data filter."""
|
||||
|
@ -21,50 +23,40 @@ def test_sensitive_data_filter():
|
|||
assert sensitive_record.msg == "******* log"
|
||||
|
||||
|
||||
async def test_async_handler_loop_log(loop):
|
||||
"""Test logging data inside from inside the event loop."""
|
||||
loop._thread_ident = threading.get_ident()
|
||||
async def test_logging_with_queue_handler():
|
||||
"""Test logging with HomeAssistantQueueHandler."""
|
||||
|
||||
queue = asyncio.Queue(loop=loop)
|
||||
base_handler = logging.handlers.QueueHandler(queue)
|
||||
handler = logging_util.AsyncHandler(loop, base_handler)
|
||||
|
||||
# Test passthrough props and noop functions
|
||||
assert handler.createLock() is None
|
||||
assert handler.acquire() is None
|
||||
assert handler.release() is None
|
||||
assert handler.formatter is base_handler.formatter
|
||||
assert handler.name is base_handler.get_name()
|
||||
handler.name = "mock_name"
|
||||
assert base_handler.get_name() == "mock_name"
|
||||
simple_queue = queue.SimpleQueue() # type: ignore
|
||||
handler = logging_util.HomeAssistantQueueHandler(simple_queue)
|
||||
|
||||
log_record = logging.makeLogRecord({"msg": "Test Log Record"})
|
||||
|
||||
handler.emit(log_record)
|
||||
await handler.async_close(True)
|
||||
assert queue.get_nowait().msg == "Test Log Record"
|
||||
assert queue.empty()
|
||||
|
||||
|
||||
async def test_async_handler_thread_log(loop):
|
||||
"""Test logging data from a thread."""
|
||||
loop._thread_ident = threading.get_ident()
|
||||
|
||||
queue = asyncio.Queue(loop=loop)
|
||||
base_handler = logging.handlers.QueueHandler(queue)
|
||||
handler = logging_util.AsyncHandler(loop, base_handler)
|
||||
|
||||
log_record = logging.makeLogRecord({"msg": "Test Log Record"})
|
||||
|
||||
def add_log():
|
||||
"""Emit a mock log."""
|
||||
with pytest.raises(asyncio.CancelledError), patch.object(
|
||||
handler, "enqueue", side_effect=asyncio.CancelledError
|
||||
):
|
||||
handler.emit(log_record)
|
||||
handler.close()
|
||||
|
||||
await loop.run_in_executor(None, add_log)
|
||||
await handler.async_close(True)
|
||||
with patch.object(handler, "enqueue", side_effect=OSError), patch.object(
|
||||
handler, "handleError"
|
||||
) as mock_handle_error:
|
||||
handler.emit(log_record)
|
||||
mock_handle_error.assert_called_once()
|
||||
|
||||
assert queue.get_nowait().msg == "Test Log Record"
|
||||
assert queue.empty()
|
||||
handler.close()
|
||||
|
||||
assert simple_queue.get_nowait().msg == "Test Log Record"
|
||||
assert simple_queue.empty()
|
||||
|
||||
|
||||
async def test_migrate_log_handler(hass):
|
||||
"""Test migrating log handlers."""
|
||||
|
||||
logging_util.async_activate_log_queue_handler(hass)
|
||||
|
||||
assert len(logging.root.handlers) == 1
|
||||
assert isinstance(logging.root.handlers[0], logging_util.HomeAssistantQueueHandler)
|
||||
|
||||
|
||||
@pytest.mark.no_fail_on_log_exception
|
||||
|
|
Loading…
Reference in New Issue