"""asyncio loop utilities.""" from __future__ import annotations from collections.abc import Callable import functools from functools import cache import linecache import logging import threading import traceback from typing import Any from homeassistant.core import async_get_hass_or_none from homeassistant.helpers.frame import ( MissingIntegrationFrame, get_current_frame, get_integration_frame, ) from homeassistant.loader import async_suggest_report_issue _LOGGER = logging.getLogger(__name__) def _get_line_from_cache(filename: str, lineno: int) -> str: """Get line from cache or read from file.""" return (linecache.getline(filename, lineno) or "?").strip() # Set of previously reported blocking calls # (integration, filename, lineno) _PREVIOUSLY_REPORTED: set[tuple[str | None, str, int | Any]] = set() def raise_for_blocking_call( func: Callable[..., Any], check_allowed: Callable[[dict[str, Any]], bool] | None = None, strict: bool = True, strict_core: bool = True, **mapped_args: Any, ) -> None: """Warn if called inside the event loop. Raise if `strict` is True.""" if check_allowed is not None and check_allowed(mapped_args): return found_frame = None offender_frame = get_current_frame(2) offender_filename = offender_frame.f_code.co_filename offender_lineno = offender_frame.f_lineno offender_line = _get_line_from_cache(offender_filename, offender_lineno) report_key: tuple[str | None, str, int | Any] try: integration_frame = get_integration_frame() except MissingIntegrationFrame: # Did not source from integration? Hard error. report_key = (None, offender_filename, offender_lineno) was_reported = report_key in _PREVIOUSLY_REPORTED _PREVIOUSLY_REPORTED.add(report_key) if not strict_core: if was_reported: _LOGGER.debug( "Detected blocking call to %s with args %s in %s, " "line %s: %s inside the event loop; " "This is causing stability issues. " "Please create a bug report at " "https://github.com/home-assistant/core/issues?q=is%%3Aopen+is%%3Aissue\n" "%s\n", func.__name__, mapped_args.get("args"), offender_filename, offender_lineno, offender_line, _dev_help_message(func.__name__), ) else: _LOGGER.warning( "Detected blocking call to %s with args %s in %s, " "line %s: %s inside the event loop; " "This is causing stability issues. " "Please create a bug report at " "https://github.com/home-assistant/core/issues?q=is%%3Aopen+is%%3Aissue\n" "%s\n" "Traceback (most recent call last):\n%s", func.__name__, mapped_args.get("args"), offender_filename, offender_lineno, offender_line, _dev_help_message(func.__name__), "".join(traceback.format_stack(f=offender_frame)), ) return if found_frame is None: raise RuntimeError( # noqa: TRY200 f"Caught blocking call to {func.__name__} with args {mapped_args.get("args")} " f"in {offender_filename}, line {offender_lineno}: {offender_line} " "inside the event loop; " "This is causing stability issues. " "Please create a bug report at " "https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue\n" f"{_dev_help_message(func.__name__)}" ) report_key = (integration_frame.integration, offender_filename, offender_lineno) was_reported = report_key in _PREVIOUSLY_REPORTED _PREVIOUSLY_REPORTED.add(report_key) report_issue = async_suggest_report_issue( async_get_hass_or_none(), integration_domain=integration_frame.integration, module=integration_frame.module, ) if was_reported: _LOGGER.debug( "Detected blocking call to %s with args %s " "inside the event loop by %sintegration '%s' " "at %s, line %s: %s (offender: %s, line %s: %s), please %s\n" "%s\n", func.__name__, mapped_args.get("args"), "custom " if integration_frame.custom_integration else "", integration_frame.integration, integration_frame.relative_filename, integration_frame.line_number, integration_frame.line, offender_filename, offender_lineno, offender_line, report_issue, _dev_help_message(func.__name__), ) else: _LOGGER.warning( "Detected blocking call to %s with args %s " "inside the event loop by %sintegration '%s' " "at %s, line %s: %s (offender: %s, line %s: %s), please %s\n" "%s\n" "Traceback (most recent call last):\n%s", func.__name__, mapped_args.get("args"), "custom " if integration_frame.custom_integration else "", integration_frame.integration, integration_frame.relative_filename, integration_frame.line_number, integration_frame.line, offender_filename, offender_lineno, offender_line, report_issue, _dev_help_message(func.__name__), "".join(traceback.format_stack(f=integration_frame.frame)), ) if strict: raise RuntimeError( f"Caught blocking call to {func.__name__} with args " f"{mapped_args.get('args')} inside the event loop by " f"{'custom ' if integration_frame.custom_integration else ''}" f"integration '{integration_frame.integration}' at " f"{integration_frame.relative_filename}, line {integration_frame.line_number}:" f" {integration_frame.line}. (offender: {offender_filename}, line " f"{offender_lineno}: {offender_line}), please {report_issue}\n" f"{_dev_help_message(func.__name__)}" ) @cache def _dev_help_message(what: str) -> str: """Generate help message to guide developers.""" return ( "For developers, please see " "https://developers.home-assistant.io/docs/asyncio_blocking_operations/" f"#{what.replace('.', '')}" ) def protect_loop[**_P, _R]( func: Callable[_P, _R], loop_thread_id: int, strict: bool = True, strict_core: bool = True, check_allowed: Callable[[dict[str, Any]], bool] | None = None, ) -> Callable[_P, _R]: """Protect function from running in event loop.""" @functools.wraps(func) def protected_loop_func(*args: _P.args, **kwargs: _P.kwargs) -> _R: if threading.get_ident() == loop_thread_id: raise_for_blocking_call( func, strict=strict, strict_core=strict_core, check_allowed=check_allowed, args=args, kwargs=kwargs, ) return func(*args, **kwargs) return protected_loop_func