Catch API errors in cast media_player service handlers (#113839)

* Catch API errors in cast media_player service handlers

* Remove left over debug code

* Fix wrapping of coroutine function with api_error
pull/103573/head^2
Erik Montnemery 2024-03-20 13:10:35 +01:00 committed by GitHub
parent eafb4190ef
commit afa9517716
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 61 additions and 7 deletions

View File

@ -5,9 +5,10 @@ from __future__ import annotations
from collections.abc import Callable
from contextlib import suppress
from datetime import datetime
from functools import wraps
import json
import logging
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar
import pychromecast
from pychromecast.controllers.homeassistant import HomeAssistantController
@ -19,6 +20,7 @@ from pychromecast.controllers.media import (
)
from pychromecast.controllers.multizone import MultizoneManager
from pychromecast.controllers.receiver import VOLUME_CONTROL_TYPE_FIXED
from pychromecast.error import PyChromecastError
from pychromecast.quick_play import quick_play
from pychromecast.socket_client import (
CONNECTION_STATUS_CONNECTED,
@ -84,6 +86,34 @@ APP_IDS_UNRELIABLE_MEDIA_INFO = ("Netflix",)
CAST_SPLASH = "https://www.home-assistant.io/images/cast/splash.png"
_CastDeviceT = TypeVar("_CastDeviceT", bound="CastDevice")
_R = TypeVar("_R")
_P = ParamSpec("_P")
_FuncType = Callable[Concatenate[_CastDeviceT, _P], _R]
_ReturnFuncType = Callable[Concatenate[_CastDeviceT, _P], _R]
def api_error(
func: _FuncType[_CastDeviceT, _P, _R],
) -> _ReturnFuncType[_CastDeviceT, _P, _R]:
"""Handle PyChromecastError and reraise a HomeAssistantError."""
@wraps(func)
def wrapper(self: _CastDeviceT, *args: _P.args, **kwargs: _P.kwargs) -> _R:
"""Wrap a CastDevice method."""
try:
return_value = func(self, *args, **kwargs)
except PyChromecastError as err:
raise HomeAssistantError(
f"{self.__class__.__name__}.{func.__name__} Failed: {err}"
) from err
return return_value
return wrapper
@callback
def _async_create_cast_device(hass: HomeAssistant, info: ChromecastInfo):
"""Create a CastDevice entity or dynamic group from the chromecast object.
@ -478,6 +508,21 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
return media_controller
@api_error
def _quick_play(self, app_name: str, data: dict[str, Any]) -> None:
"""Launch the app `app_name` and start playing media defined by `data`."""
quick_play(self._get_chromecast(), app_name, data)
@api_error
def _quit_app(self) -> None:
"""Quit the currently running app."""
self._get_chromecast().quit_app()
@api_error
def _start_app(self, app_id: str) -> None:
"""Start an app."""
self._get_chromecast().start_app(app_id)
def turn_on(self) -> None:
"""Turn on the cast device."""
@ -488,52 +533,61 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
if chromecast.app_id is not None:
# Quit the previous app before starting splash screen or media player
chromecast.quit_app()
self._quit_app()
# The only way we can turn the Chromecast is on is by launching an app
if chromecast.cast_type == pychromecast.const.CAST_TYPE_CHROMECAST:
app_data = {"media_id": CAST_SPLASH, "media_type": "image/png"}
quick_play(chromecast, "default_media_receiver", app_data)
self._quick_play("default_media_receiver", app_data)
else:
chromecast.start_app(pychromecast.config.APP_MEDIA_RECEIVER)
self._start_app(pychromecast.config.APP_MEDIA_RECEIVER)
@api_error
def turn_off(self) -> None:
"""Turn off the cast device."""
self._get_chromecast().quit_app()
@api_error
def mute_volume(self, mute: bool) -> None:
"""Mute the volume."""
self._get_chromecast().set_volume_muted(mute)
@api_error
def set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1."""
self._get_chromecast().set_volume(volume)
@api_error
def media_play(self) -> None:
"""Send play command."""
media_controller = self._media_controller()
media_controller.play()
@api_error
def media_pause(self) -> None:
"""Send pause command."""
media_controller = self._media_controller()
media_controller.pause()
@api_error
def media_stop(self) -> None:
"""Send stop command."""
media_controller = self._media_controller()
media_controller.stop()
@api_error
def media_previous_track(self) -> None:
"""Send previous track command."""
media_controller = self._media_controller()
media_controller.queue_prev()
@api_error
def media_next_track(self) -> None:
"""Send next track command."""
media_controller = self._media_controller()
media_controller.queue_next()
@api_error
def media_seek(self, position: float) -> None:
"""Seek the media to a specific location."""
media_controller = self._media_controller()
@ -646,7 +700,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
if "app_id" in app_data:
app_id = app_data.pop("app_id")
_LOGGER.info("Starting Cast app by ID %s", app_id)
await self.hass.async_add_executor_job(chromecast.start_app, app_id)
await self.hass.async_add_executor_job(self._start_app, app_id)
if app_data:
_LOGGER.warning(
"Extra keys %s were ignored. Please use app_name to cast media",
@ -657,7 +711,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
app_name = app_data.pop("app_name")
try:
await self.hass.async_add_executor_job(
quick_play, chromecast, app_name, app_data
self._quick_play, app_name, app_data
)
except NotImplementedError:
_LOGGER.error("App %s not supported", app_name)
@ -727,7 +781,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity):
app_data,
)
await self.hass.async_add_executor_job(
quick_play, chromecast, "default_media_receiver", app_data
self._quick_play, "default_media_receiver", app_data
)
def _media_status(self):