"""Support for media browsing.""" from __future__ import annotations from collections.abc import Callable from contextlib import suppress from functools import partial import logging from typing import cast from soco.data_structures import DidlObject from soco.ms_data_structures import MusicServiceItem from soco.music_library import MusicLibrary from homeassistant.components import media_source, plex, spotify from homeassistant.components.media_player import ( BrowseError, BrowseMedia, MediaClass, MediaType, ) from homeassistant.core import HomeAssistant from homeassistant.helpers.network import is_internal_request from .const import ( DOMAIN, EXPANDABLE_MEDIA_TYPES, LIBRARY_TITLES_MAPPING, MEDIA_TYPES_TO_SONOS, PLAYABLE_MEDIA_TYPES, SONOS_ALBUM, SONOS_ALBUM_ARTIST, SONOS_GENRE, SONOS_TO_MEDIA_CLASSES, SONOS_TO_MEDIA_TYPES, SONOS_TRACKS, SONOS_TYPES_MAPPING, ) from .exception import UnknownMediaType from .favorites import SonosFavorites from .speaker import SonosMedia, SonosSpeaker _LOGGER = logging.getLogger(__name__) GetBrowseImageUrlType = Callable[[str, str, "str | None"], str] def get_thumbnail_url_full( media: SonosMedia, is_internal: bool, get_browse_image_url: GetBrowseImageUrlType, media_content_type: str, media_content_id: str, media_image_id: str | None = None, ) -> str | None: """Get thumbnail URL.""" if is_internal: item = get_media( media.library, media_content_id, media_content_type, ) return getattr(item, "album_art_uri", None) return get_browse_image_url( media_content_type, media_content_id, media_image_id, ) def media_source_filter(item: BrowseMedia) -> bool: """Filter media sources.""" return item.media_content_type.startswith("audio/") async def async_browse_media( hass: HomeAssistant, speaker: SonosSpeaker, media: SonosMedia, get_browse_image_url: GetBrowseImageUrlType, media_content_id: str | None, media_content_type: str | None, ) -> BrowseMedia: """Browse media.""" if media_content_id is None: return await root_payload( hass, speaker, media, get_browse_image_url, ) assert media_content_type is not None if media_source.is_media_source_id(media_content_id): return await media_source.async_browse_media( hass, media_content_id, content_filter=media_source_filter ) if plex.is_plex_media_id(media_content_id): return await plex.async_browse_media( hass, media_content_type, media_content_id, platform=DOMAIN ) if media_content_type == "plex": return await plex.async_browse_media(hass, None, None, platform=DOMAIN) if spotify.is_spotify_media_type(media_content_type): return await spotify.async_browse_media( hass, media_content_type, media_content_id, can_play_artist=False ) if media_content_type == "library": return await hass.async_add_executor_job( library_payload, media.library, partial( get_thumbnail_url_full, media, is_internal_request(hass), get_browse_image_url, ), ) if media_content_type == "favorites": return await hass.async_add_executor_job( favorites_payload, speaker.favorites, ) if media_content_type == "favorites_folder": return await hass.async_add_executor_job( favorites_folder_payload, speaker.favorites, media_content_id, ) payload = { "search_type": media_content_type, "idstring": media_content_id, } response = await hass.async_add_executor_job( build_item_response, media.library, payload, partial( get_thumbnail_url_full, media, is_internal_request(hass), get_browse_image_url, ), ) if response is None: raise BrowseError(f"Media not found: {media_content_type} / {media_content_id}") return response def build_item_response( media_library: MusicLibrary, payload: dict[str, str], get_thumbnail_url=None ) -> BrowseMedia | None: """Create response payload for the provided media query.""" if payload["search_type"] == MediaType.ALBUM and payload["idstring"].startswith( ("A:GENRE", "A:COMPOSER") ): payload["idstring"] = "A:ALBUMARTIST/" + "/".join( payload["idstring"].split("/")[2:] ) try: search_type = MEDIA_TYPES_TO_SONOS[payload["search_type"]] except KeyError: _LOGGER.debug( "Unknown media type received when building item response: %s", payload["search_type"], ) return None media = media_library.browse_by_idstring( search_type, payload["idstring"], full_album_art_uri=True, max_items=0, ) if media is None: return None thumbnail = None title = None # Fetch album info for titles and thumbnails # Can't be extracted from track info if ( payload["search_type"] == MediaType.ALBUM and media[0].item_class == "object.item.audioItem.musicTrack" ): item = get_media(media_library, payload["idstring"], SONOS_ALBUM_ARTIST) title = getattr(item, "title", None) thumbnail = get_thumbnail_url(SONOS_ALBUM_ARTIST, payload["idstring"]) if not title: try: title = payload["idstring"].split("/")[1] except IndexError: title = LIBRARY_TITLES_MAPPING[payload["idstring"]] try: media_class = SONOS_TO_MEDIA_CLASSES[ MEDIA_TYPES_TO_SONOS[payload["search_type"]] ] except KeyError: _LOGGER.debug("Unknown media type received %s", payload["search_type"]) return None children = [] for item in media: with suppress(UnknownMediaType): children.append(item_payload(item, get_thumbnail_url)) return BrowseMedia( title=title, thumbnail=thumbnail, media_class=media_class, media_content_id=payload["idstring"], media_content_type=payload["search_type"], children=children, can_play=can_play(payload["search_type"]), can_expand=can_expand(payload["search_type"]), ) def item_payload(item: DidlObject, get_thumbnail_url=None) -> BrowseMedia: """ Create response payload for a single media item. Used by async_browse_media. """ media_type = get_media_type(item) try: media_class = SONOS_TO_MEDIA_CLASSES[media_type] except KeyError as err: _LOGGER.debug("Unknown media type received %s", media_type) raise UnknownMediaType from err content_id = get_content_id(item) thumbnail = None if getattr(item, "album_art_uri", None): thumbnail = get_thumbnail_url(media_class, content_id) return BrowseMedia( title=item.title, thumbnail=thumbnail, media_class=media_class, media_content_id=content_id, media_content_type=SONOS_TO_MEDIA_TYPES[media_type], can_play=can_play(item.item_class), can_expand=can_expand(item), ) async def root_payload( hass: HomeAssistant, speaker: SonosSpeaker, media: SonosMedia, get_browse_image_url: GetBrowseImageUrlType, ) -> BrowseMedia: """Return root payload for Sonos.""" children: list[BrowseMedia] = [] if speaker.favorites: children.append( BrowseMedia( title="Favorites", media_class=MediaClass.DIRECTORY, media_content_id="", media_content_type="favorites", thumbnail="https://brands.home-assistant.io/_/sonos/logo.png", can_play=False, can_expand=True, ) ) if await hass.async_add_executor_job( partial(media.library.browse_by_idstring, "tracks", "", max_items=1) ): children.append( BrowseMedia( title="Music Library", media_class=MediaClass.DIRECTORY, media_content_id="", media_content_type="library", thumbnail="https://brands.home-assistant.io/_/sonos/logo.png", can_play=False, can_expand=True, ) ) if "plex" in hass.config.components: children.append( BrowseMedia( title="Plex", media_class=MediaClass.APP, media_content_id="", media_content_type="plex", thumbnail="https://brands.home-assistant.io/_/plex/logo.png", can_play=False, can_expand=True, ) ) if "spotify" in hass.config.components: result = await spotify.async_browse_media(hass, None, None) if result.children: children.extend(result.children) try: item = await media_source.async_browse_media( hass, None, content_filter=media_source_filter ) # If domain is None, it's overview of available sources if item.domain is None and item.children is not None: children.extend(item.children) else: children.append(item) except media_source.BrowseError: pass if len(children) == 1: return await async_browse_media( hass, speaker, media, get_browse_image_url, children[0].media_content_id, children[0].media_content_type, ) return BrowseMedia( title="Sonos", media_class=MediaClass.DIRECTORY, media_content_id="", media_content_type="root", can_play=False, can_expand=True, children=children, ) def library_payload(media_library: MusicLibrary, get_thumbnail_url=None) -> BrowseMedia: """ Create response payload to describe contents of a specific library. Used by async_browse_media. """ children = [] for item in media_library.browse(): with suppress(UnknownMediaType): children.append(item_payload(item, get_thumbnail_url)) return BrowseMedia( title="Music Library", media_class=MediaClass.DIRECTORY, media_content_id="library", media_content_type="library", can_play=False, can_expand=True, children=children, ) def favorites_payload(favorites: SonosFavorites) -> BrowseMedia: """ Create response payload to describe contents of a specific library. Used by async_browse_media. """ children: list[BrowseMedia] = [] group_types: set[str] = {fav.reference.item_class for fav in favorites} for group_type in sorted(group_types): try: media_content_type = SONOS_TYPES_MAPPING[group_type] media_class = SONOS_TO_MEDIA_CLASSES[group_type] except KeyError: _LOGGER.debug("Unknown media type or class received %s", group_type) continue children.append( BrowseMedia( title=media_content_type.title(), media_class=media_class, media_content_id=group_type, media_content_type="favorites_folder", can_play=False, can_expand=True, ) ) return BrowseMedia( title="Favorites", media_class=MediaClass.DIRECTORY, media_content_id="", media_content_type="favorites", can_play=False, can_expand=True, children=children, ) def favorites_folder_payload( favorites: SonosFavorites, media_content_id: str ) -> BrowseMedia: """Create response payload to describe all items of a type of favorite. Used by async_browse_media. """ children: list[BrowseMedia] = [] content_type = SONOS_TYPES_MAPPING[media_content_id] for favorite in favorites: if favorite.reference.item_class != media_content_id: continue children.append( BrowseMedia( title=favorite.title, media_class=SONOS_TO_MEDIA_CLASSES[favorite.reference.item_class], media_content_id=favorite.item_id, media_content_type="favorite_item_id", can_play=True, can_expand=False, thumbnail=getattr(favorite, "album_art_uri", None), ) ) return BrowseMedia( title=content_type.title(), media_class=MediaClass.DIRECTORY, media_content_id="", media_content_type="favorites", can_play=False, can_expand=True, children=children, ) def get_media_type(item: DidlObject) -> str: """Extract media type of item.""" if item.item_class == "object.item.audioItem.musicTrack": return SONOS_TRACKS if ( item.item_class == "object.container.album.musicAlbum" and SONOS_TYPES_MAPPING.get(item.item_id.split("/")[0]) in [ SONOS_ALBUM_ARTIST, SONOS_GENRE, ] ): return SONOS_TYPES_MAPPING[item.item_class] return SONOS_TYPES_MAPPING.get(item.item_id.split("/")[0], item.item_class) def can_play(item: DidlObject) -> bool: """ Test if playable. Used by async_browse_media. """ return SONOS_TO_MEDIA_TYPES.get(item) in PLAYABLE_MEDIA_TYPES def can_expand(item: DidlObject) -> bool: """ Test if expandable. Used by async_browse_media. """ if isinstance(item, str): return SONOS_TYPES_MAPPING.get(item) in EXPANDABLE_MEDIA_TYPES if SONOS_TO_MEDIA_TYPES.get(item.item_class) in EXPANDABLE_MEDIA_TYPES: return True return SONOS_TYPES_MAPPING.get(item.item_id) in EXPANDABLE_MEDIA_TYPES def get_content_id(item: DidlObject) -> str: """Extract content id or uri.""" if item.item_class == "object.item.audioItem.musicTrack": return cast(str, item.get_uri()) return cast(str, item.item_id) def get_media( media_library: MusicLibrary, item_id: str, search_type: str ) -> MusicServiceItem: """Fetch media/album.""" search_type = MEDIA_TYPES_TO_SONOS.get(search_type, search_type) if not item_id.startswith("A:ALBUM") and search_type == SONOS_ALBUM: item_id = "A:ALBUMARTIST/" + "/".join(item_id.split("/")[2:]) search_term = item_id.split("/")[-1] matches = media_library.get_music_library_information( search_type, search_term=search_term, full_album_art_uri=True ) if len(matches) > 0: return matches[0]