core/homeassistant/helpers/frame.py

133 lines
3.6 KiB
Python

"""Provide frame helper for finding the current frame context."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
import functools
import logging
from traceback import FrameSummary, extract_stack
from typing import Any, TypeVar, cast
from homeassistant.exceptions import HomeAssistantError
_LOGGER = logging.getLogger(__name__)
# Keep track of integrations already reported to prevent flooding
_REPORTED_INTEGRATIONS: set[str] = set()
_CallableT = TypeVar("_CallableT", bound=Callable)
def get_integration_frame(
exclude_integrations: set | None = None,
) -> tuple[FrameSummary, str, str]:
"""Return the frame, integration and integration path of the current stack frame."""
found_frame = None
if not exclude_integrations:
exclude_integrations = set()
for frame in reversed(extract_stack()):
for path in ("custom_components/", "homeassistant/components/"):
try:
index = frame.filename.index(path)
start = index + len(path)
end = frame.filename.index("/", start)
integration = frame.filename[start:end]
if integration not in exclude_integrations:
found_frame = frame
break
except ValueError:
continue
if found_frame is not None:
break
if found_frame is None:
raise MissingIntegrationFrame
return found_frame, integration, path
class MissingIntegrationFrame(HomeAssistantError):
"""Raised when no integration is found in the frame."""
def report(
what: str,
exclude_integrations: set | None = None,
error_if_core: bool = True,
level: int = logging.WARNING,
) -> None:
"""Report incorrect usage.
Async friendly.
"""
try:
integration_frame = get_integration_frame(
exclude_integrations=exclude_integrations
)
except MissingIntegrationFrame as err:
msg = f"Detected code that {what}. Please report this issue."
if error_if_core:
raise RuntimeError(msg) from err
_LOGGER.warning(msg, stack_info=True)
return
report_integration(what, integration_frame, level)
def report_integration(
what: str,
integration_frame: tuple[FrameSummary, str, str],
level: int = logging.WARNING,
) -> None:
"""Report incorrect usage in an integration.
Async friendly.
"""
found_frame, integration, path = integration_frame
# Keep track of integrations already reported to prevent flooding
key = f"{found_frame.filename}:{found_frame.lineno}"
if key in _REPORTED_INTEGRATIONS:
return
_REPORTED_INTEGRATIONS.add(key)
index = found_frame.filename.index(path)
if path == "custom_components/":
extra = " to the custom integration author"
else:
extra = ""
_LOGGER.log(
level,
(
"Detected integration that %s. "
"Please report issue%s for %s using this method at %s, line %s: %s"
),
what,
extra,
integration,
found_frame.filename[index:],
found_frame.lineno,
(found_frame.line or "?").strip(),
)
def warn_use(func: _CallableT, what: str) -> _CallableT:
"""Mock a function to warn when it was about to be used."""
if asyncio.iscoroutinefunction(func):
@functools.wraps(func)
async def report_use(*args: Any, **kwargs: Any) -> None:
report(what)
else:
@functools.wraps(func)
def report_use(*args: Any, **kwargs: Any) -> None:
report(what)
return cast(_CallableT, report_use)