"""Provide frame helper for finding the current frame context.""" from __future__ import annotations import asyncio from collections.abc import Callable from contextlib import suppress from dataclasses import dataclass import functools from functools import cached_property import linecache import logging import sys from types import FrameType from typing import Any, cast from homeassistant.core import HomeAssistant, async_get_hass from homeassistant.exceptions import HomeAssistantError from homeassistant.loader import async_suggest_report_issue _LOGGER = logging.getLogger(__name__) # Keep track of integrations already reported to prevent flooding _REPORTED_INTEGRATIONS: set[str] = set() @dataclass(kw_only=True) class IntegrationFrame: """Integration frame container.""" custom_integration: bool integration: str module: str | None relative_filename: str _frame: FrameType @cached_property def line_number(self) -> int: """Return the line number of the frame.""" return self._frame.f_lineno @cached_property def filename(self) -> str: """Return the filename of the frame.""" return self._frame.f_code.co_filename @cached_property def line(self) -> str: """Return the line of the frame.""" return (linecache.getline(self.filename, self.line_number) or "?").strip() def get_integration_logger(fallback_name: str) -> logging.Logger: """Return a logger by checking the current integration frame. If Python is unable to access the sources files, the call stack frame will be missing information, so let's guard by requiring a fallback name. https://github.com/home-assistant/core/issues/24982 """ try: integration_frame = get_integration_frame() except MissingIntegrationFrame: return logging.getLogger(fallback_name) if integration_frame.custom_integration: logger_name = f"custom_components.{integration_frame.integration}" else: logger_name = f"homeassistant.components.{integration_frame.integration}" return logging.getLogger(logger_name) def get_current_frame(depth: int = 0) -> FrameType: """Return the current frame.""" # Add one to depth since get_current_frame is included return sys._getframe(depth + 1) # noqa: SLF001 def get_integration_frame(exclude_integrations: set | None = None) -> IntegrationFrame: """Return the frame, integration and integration path of the current stack frame.""" found_frame = None if not exclude_integrations: exclude_integrations = set() frame: FrameType | None = get_current_frame() while frame is not None: filename = frame.f_code.co_filename for path in ("custom_components/", "homeassistant/components/"): try: index = filename.index(path) start = index + len(path) end = filename.index("/", start) integration = filename[start:end] if integration not in exclude_integrations: found_frame = frame break except ValueError: continue if found_frame is not None: break frame = frame.f_back if found_frame is None: raise MissingIntegrationFrame found_module: str | None = None for module, module_obj in dict(sys.modules).items(): if not hasattr(module_obj, "__file__"): continue if module_obj.__file__ == found_frame.f_code.co_filename: found_module = module break return IntegrationFrame( custom_integration=path == "custom_components/", integration=integration, module=found_module, relative_filename=found_frame.f_code.co_filename[index:], _frame=found_frame, ) class MissingIntegrationFrame(HomeAssistantError): """Raised when no integration is found in the frame.""" def report( what: str, exclude_integrations: set | None = None, error_if_core: bool = True, level: int = logging.WARNING, log_custom_component_only: bool = False, error_if_integration: bool = False, ) -> None: """Report incorrect usage. Async friendly. """ try: integration_frame = get_integration_frame( exclude_integrations=exclude_integrations ) except MissingIntegrationFrame as err: msg = f"Detected code that {what}. Please report this issue." if error_if_core: raise RuntimeError(msg) from err if not log_custom_component_only: _LOGGER.warning(msg, stack_info=True) return if ( error_if_integration or not log_custom_component_only or integration_frame.custom_integration ): _report_integration(what, integration_frame, level, error_if_integration) def _report_integration( what: str, integration_frame: IntegrationFrame, level: int = logging.WARNING, error: bool = False, ) -> None: """Report incorrect usage in an integration. Async friendly. """ # Keep track of integrations already reported to prevent flooding key = f"{integration_frame.filename}:{integration_frame.line_number}" if not error and key in _REPORTED_INTEGRATIONS: return _REPORTED_INTEGRATIONS.add(key) hass: HomeAssistant | None = None with suppress(HomeAssistantError): hass = async_get_hass() report_issue = async_suggest_report_issue( hass, integration_domain=integration_frame.integration, module=integration_frame.module, ) integration_type = "custom " if integration_frame.custom_integration else "" _LOGGER.log( level, "Detected that %sintegration '%s' %s at %s, line %s: %s, please %s", integration_type, integration_frame.integration, what, integration_frame.relative_filename, integration_frame.line_number, integration_frame.line, report_issue, ) if not error: return raise RuntimeError( f"Detected that {integration_type}integration " f"'{integration_frame.integration}' {what} at " f"{integration_frame.relative_filename}, line " f"{integration_frame.line_number}: {integration_frame.line}. " f"Please {report_issue}." ) def warn_use[_CallableT: Callable](func: _CallableT, what: str) -> _CallableT: """Mock a function to warn when it was about to be used.""" if asyncio.iscoroutinefunction(func): @functools.wraps(func) async def report_use(*args: Any, **kwargs: Any) -> None: report(what) else: @functools.wraps(func) def report_use(*args: Any, **kwargs: Any) -> None: report(what) return cast(_CallableT, report_use)