diff --git a/homeassistant/components/cast/media_player.py b/homeassistant/components/cast/media_player.py index 037e154eb96..eedbd0dd0b1 100644 --- a/homeassistant/components/cast/media_player.py +++ b/homeassistant/components/cast/media_player.py @@ -5,9 +5,10 @@ from __future__ import annotations from collections.abc import Callable from contextlib import suppress from datetime import datetime +from functools import wraps import json import logging -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar import pychromecast from pychromecast.controllers.homeassistant import HomeAssistantController @@ -19,6 +20,7 @@ from pychromecast.controllers.media import ( ) from pychromecast.controllers.multizone import MultizoneManager from pychromecast.controllers.receiver import VOLUME_CONTROL_TYPE_FIXED +from pychromecast.error import PyChromecastError from pychromecast.quick_play import quick_play from pychromecast.socket_client import ( CONNECTION_STATUS_CONNECTED, @@ -84,6 +86,34 @@ APP_IDS_UNRELIABLE_MEDIA_INFO = ("Netflix",) CAST_SPLASH = "https://www.home-assistant.io/images/cast/splash.png" +_CastDeviceT = TypeVar("_CastDeviceT", bound="CastDevice") +_R = TypeVar("_R") +_P = ParamSpec("_P") + +_FuncType = Callable[Concatenate[_CastDeviceT, _P], _R] +_ReturnFuncType = Callable[Concatenate[_CastDeviceT, _P], _R] + + +def api_error( + func: _FuncType[_CastDeviceT, _P, _R], +) -> _ReturnFuncType[_CastDeviceT, _P, _R]: + """Handle PyChromecastError and reraise a HomeAssistantError.""" + + @wraps(func) + def wrapper(self: _CastDeviceT, *args: _P.args, **kwargs: _P.kwargs) -> _R: + """Wrap a CastDevice method.""" + try: + return_value = func(self, *args, **kwargs) + except PyChromecastError as err: + raise HomeAssistantError( + f"{self.__class__.__name__}.{func.__name__} Failed: {err}" + ) from err + + return return_value + + return wrapper + + @callback def _async_create_cast_device(hass: HomeAssistant, info: ChromecastInfo): """Create a CastDevice entity or dynamic group from the chromecast object. @@ -478,6 +508,21 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): return media_controller + @api_error + def _quick_play(self, app_name: str, data: dict[str, Any]) -> None: + """Launch the app `app_name` and start playing media defined by `data`.""" + quick_play(self._get_chromecast(), app_name, data) + + @api_error + def _quit_app(self) -> None: + """Quit the currently running app.""" + self._get_chromecast().quit_app() + + @api_error + def _start_app(self, app_id: str) -> None: + """Start an app.""" + self._get_chromecast().start_app(app_id) + def turn_on(self) -> None: """Turn on the cast device.""" @@ -488,52 +533,61 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): if chromecast.app_id is not None: # Quit the previous app before starting splash screen or media player - chromecast.quit_app() + self._quit_app() # The only way we can turn the Chromecast is on is by launching an app if chromecast.cast_type == pychromecast.const.CAST_TYPE_CHROMECAST: app_data = {"media_id": CAST_SPLASH, "media_type": "image/png"} - quick_play(chromecast, "default_media_receiver", app_data) + self._quick_play("default_media_receiver", app_data) else: - chromecast.start_app(pychromecast.config.APP_MEDIA_RECEIVER) + self._start_app(pychromecast.config.APP_MEDIA_RECEIVER) + @api_error def turn_off(self) -> None: """Turn off the cast device.""" self._get_chromecast().quit_app() + @api_error def mute_volume(self, mute: bool) -> None: """Mute the volume.""" self._get_chromecast().set_volume_muted(mute) + @api_error def set_volume_level(self, volume: float) -> None: """Set volume level, range 0..1.""" self._get_chromecast().set_volume(volume) + @api_error def media_play(self) -> None: """Send play command.""" media_controller = self._media_controller() media_controller.play() + @api_error def media_pause(self) -> None: """Send pause command.""" media_controller = self._media_controller() media_controller.pause() + @api_error def media_stop(self) -> None: """Send stop command.""" media_controller = self._media_controller() media_controller.stop() + @api_error def media_previous_track(self) -> None: """Send previous track command.""" media_controller = self._media_controller() media_controller.queue_prev() + @api_error def media_next_track(self) -> None: """Send next track command.""" media_controller = self._media_controller() media_controller.queue_next() + @api_error def media_seek(self, position: float) -> None: """Seek the media to a specific location.""" media_controller = self._media_controller() @@ -646,7 +700,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): if "app_id" in app_data: app_id = app_data.pop("app_id") _LOGGER.info("Starting Cast app by ID %s", app_id) - await self.hass.async_add_executor_job(chromecast.start_app, app_id) + await self.hass.async_add_executor_job(self._start_app, app_id) if app_data: _LOGGER.warning( "Extra keys %s were ignored. Please use app_name to cast media", @@ -657,7 +711,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): app_name = app_data.pop("app_name") try: await self.hass.async_add_executor_job( - quick_play, chromecast, app_name, app_data + self._quick_play, app_name, app_data ) except NotImplementedError: _LOGGER.error("App %s not supported", app_name) @@ -727,7 +781,7 @@ class CastMediaPlayerEntity(CastDevice, MediaPlayerEntity): app_data, ) await self.hass.async_add_executor_job( - quick_play, chromecast, "default_media_receiver", app_data + self._quick_play, "default_media_receiver", app_data ) def _media_status(self):