454 lines
15 KiB
Python
454 lines
15 KiB
Python
"""Support for media browsing."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import contextlib
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from pysqueezebox import Player
|
|
|
|
from homeassistant.components import media_source
|
|
from homeassistant.components.media_player import (
|
|
BrowseError,
|
|
BrowseMedia,
|
|
MediaClass,
|
|
MediaPlayerEntity,
|
|
MediaType,
|
|
)
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.network import is_internal_request
|
|
|
|
from .const import DOMAIN, UNPLAYABLE_TYPES
|
|
|
|
LIBRARY = [
|
|
"favorites",
|
|
"artists",
|
|
"albums",
|
|
"tracks",
|
|
"playlists",
|
|
"genres",
|
|
"new music",
|
|
"album artists",
|
|
"apps",
|
|
"radios",
|
|
]
|
|
|
|
MEDIA_TYPE_TO_SQUEEZEBOX: dict[str | MediaType, str] = {
|
|
"favorites": "favorites",
|
|
"artists": "artists",
|
|
"albums": "albums",
|
|
"tracks": "titles",
|
|
"playlists": "playlists",
|
|
"genres": "genres",
|
|
"new music": "new music",
|
|
"album artists": "album artists",
|
|
MediaType.ALBUM: "album",
|
|
MediaType.ARTIST: "artist",
|
|
MediaType.TRACK: "title",
|
|
MediaType.PLAYLIST: "playlist",
|
|
MediaType.GENRE: "genre",
|
|
MediaType.APPS: "apps",
|
|
"radios": "radios",
|
|
"favorite": "favorite",
|
|
}
|
|
|
|
SQUEEZEBOX_ID_BY_TYPE: dict[str | MediaType, str] = {
|
|
MediaType.ALBUM: "album_id",
|
|
"albums": "album_id",
|
|
MediaType.ARTIST: "artist_id",
|
|
"artists": "artist_id",
|
|
MediaType.TRACK: "track_id",
|
|
"tracks": "track_id",
|
|
MediaType.PLAYLIST: "playlist_id",
|
|
"playlists": "playlist_id",
|
|
MediaType.GENRE: "genre_id",
|
|
"genres": "genre_id",
|
|
"favorite": "item_id",
|
|
"favorites": "item_id",
|
|
MediaType.APPS: "item_id",
|
|
"app": "item_id",
|
|
"radios": "item_id",
|
|
"radio": "item_id",
|
|
}
|
|
|
|
CONTENT_TYPE_MEDIA_CLASS: dict[str | MediaType, dict[str, MediaClass | str]] = {
|
|
"favorites": {"item": MediaClass.DIRECTORY, "children": MediaClass.TRACK},
|
|
"favorite": {"item": "favorite", "children": ""},
|
|
"radios": {"item": MediaClass.DIRECTORY, "children": MediaClass.APP},
|
|
"radio": {"item": MediaClass.DIRECTORY, "children": MediaClass.APP},
|
|
"artists": {"item": MediaClass.DIRECTORY, "children": MediaClass.ARTIST},
|
|
"albums": {"item": MediaClass.DIRECTORY, "children": MediaClass.ALBUM},
|
|
"tracks": {"item": MediaClass.DIRECTORY, "children": MediaClass.TRACK},
|
|
"playlists": {"item": MediaClass.DIRECTORY, "children": MediaClass.PLAYLIST},
|
|
"genres": {"item": MediaClass.DIRECTORY, "children": MediaClass.GENRE},
|
|
"new music": {"item": MediaClass.DIRECTORY, "children": MediaClass.ALBUM},
|
|
"album artists": {"item": MediaClass.DIRECTORY, "children": MediaClass.ARTIST},
|
|
MediaType.ALBUM: {"item": MediaClass.ALBUM, "children": MediaClass.TRACK},
|
|
MediaType.ARTIST: {"item": MediaClass.ARTIST, "children": MediaClass.ALBUM},
|
|
MediaType.TRACK: {"item": MediaClass.TRACK, "children": ""},
|
|
MediaType.GENRE: {"item": MediaClass.GENRE, "children": MediaClass.ARTIST},
|
|
MediaType.PLAYLIST: {"item": MediaClass.PLAYLIST, "children": MediaClass.TRACK},
|
|
MediaType.APP: {"item": MediaClass.DIRECTORY, "children": MediaClass.TRACK},
|
|
MediaType.APPS: {"item": MediaClass.DIRECTORY, "children": MediaClass.APP},
|
|
}
|
|
|
|
CONTENT_TYPE_TO_CHILD_TYPE: dict[
|
|
str | MediaType,
|
|
str | MediaType | None,
|
|
] = {
|
|
MediaType.ALBUM: MediaType.TRACK,
|
|
MediaType.PLAYLIST: MediaType.PLAYLIST,
|
|
MediaType.ARTIST: MediaType.ALBUM,
|
|
MediaType.GENRE: MediaType.ARTIST,
|
|
"artists": MediaType.ARTIST,
|
|
"albums": MediaType.ALBUM,
|
|
"tracks": MediaType.TRACK,
|
|
"playlists": MediaType.PLAYLIST,
|
|
"genres": MediaType.GENRE,
|
|
"favorites": None, # can only be determined after inspecting the item
|
|
"radios": MediaClass.APP,
|
|
"new music": MediaType.ALBUM,
|
|
"album artists": MediaType.ARTIST,
|
|
MediaType.APPS: MediaType.APP,
|
|
MediaType.APP: MediaType.TRACK,
|
|
"favorite": None,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class BrowseData:
|
|
"""Class for browser to squeezebox mappings and other browse data."""
|
|
|
|
content_type_to_child_type: dict[
|
|
str | MediaType,
|
|
str | MediaType | None,
|
|
] = field(default_factory=dict)
|
|
content_type_media_class: dict[str | MediaType, dict[str, MediaClass | str]] = (
|
|
field(default_factory=dict)
|
|
)
|
|
squeezebox_id_by_type: dict[str | MediaType, str] = field(default_factory=dict)
|
|
media_type_to_squeezebox: dict[str | MediaType, str] = field(default_factory=dict)
|
|
known_apps_radios: set[str] = field(default_factory=set)
|
|
|
|
def __post_init__(self) -> None:
|
|
"""Initialise the maps."""
|
|
self.content_type_media_class.update(CONTENT_TYPE_MEDIA_CLASS)
|
|
self.content_type_to_child_type.update(CONTENT_TYPE_TO_CHILD_TYPE)
|
|
self.squeezebox_id_by_type.update(SQUEEZEBOX_ID_BY_TYPE)
|
|
self.media_type_to_squeezebox.update(MEDIA_TYPE_TO_SQUEEZEBOX)
|
|
|
|
|
|
def _add_new_command_to_browse_data(
|
|
browse_data: BrowseData, cmd: str | MediaType, type: str
|
|
) -> None:
|
|
"""Add items to maps for new apps or radios."""
|
|
browse_data.media_type_to_squeezebox[cmd] = cmd
|
|
browse_data.squeezebox_id_by_type[cmd] = type
|
|
browse_data.content_type_media_class[cmd] = {
|
|
"item": MediaClass.DIRECTORY,
|
|
"children": MediaClass.TRACK,
|
|
}
|
|
browse_data.content_type_to_child_type[cmd] = MediaType.TRACK
|
|
|
|
|
|
def _build_response_apps_radios_category(
|
|
browse_data: BrowseData, cmd: str | MediaType, item: dict[str, Any]
|
|
) -> BrowseMedia:
|
|
"""Build item for App or radio category."""
|
|
return BrowseMedia(
|
|
media_content_id=item["id"],
|
|
title=item["title"],
|
|
media_content_type=cmd,
|
|
media_class=browse_data.content_type_media_class[cmd]["item"],
|
|
can_expand=True,
|
|
can_play=False,
|
|
)
|
|
|
|
|
|
def _build_response_known_app(
|
|
browse_data: BrowseData, search_type: str, item: dict[str, Any]
|
|
) -> BrowseMedia:
|
|
"""Build item for app or radio."""
|
|
|
|
return BrowseMedia(
|
|
media_content_id=item["id"],
|
|
title=item["title"],
|
|
media_content_type=search_type,
|
|
media_class=browse_data.content_type_media_class[search_type]["item"],
|
|
can_play=bool(item["isaudio"] and item.get("url")),
|
|
can_expand=item["hasitems"],
|
|
)
|
|
|
|
|
|
def _build_response_favorites(item: dict[str, Any]) -> BrowseMedia:
|
|
"""Build item for favorites."""
|
|
if "album_id" in item:
|
|
return BrowseMedia(
|
|
media_content_id=str(item["album_id"]),
|
|
title=item["title"],
|
|
media_content_type=MediaType.ALBUM,
|
|
media_class=CONTENT_TYPE_MEDIA_CLASS[MediaType.ALBUM]["item"],
|
|
can_expand=True,
|
|
can_play=True,
|
|
)
|
|
if item.get("hasitems") and not item.get("isaudio"):
|
|
return BrowseMedia(
|
|
media_content_id=item["id"],
|
|
title=item["title"],
|
|
media_content_type="favorites",
|
|
media_class=CONTENT_TYPE_MEDIA_CLASS["favorites"]["item"],
|
|
can_expand=True,
|
|
can_play=False,
|
|
)
|
|
return BrowseMedia(
|
|
media_content_id=item["id"],
|
|
title=item["title"],
|
|
media_content_type="favorite",
|
|
media_class=CONTENT_TYPE_MEDIA_CLASS[MediaType.TRACK]["item"],
|
|
can_expand=bool(item.get("hasitems")),
|
|
can_play=bool(item["isaudio"] and item.get("url")),
|
|
)
|
|
|
|
|
|
def _get_item_thumbnail(
|
|
item: dict[str, Any],
|
|
player: Player,
|
|
entity: MediaPlayerEntity,
|
|
item_type: str | MediaType | None,
|
|
search_type: str,
|
|
internal_request: bool,
|
|
) -> str | None:
|
|
"""Construct path to thumbnail image."""
|
|
item_thumbnail: str | None = None
|
|
track_id = item.get("artwork_track_id") or (
|
|
item.get("id") if item_type == "track" else None
|
|
)
|
|
|
|
if track_id:
|
|
if internal_request:
|
|
item_thumbnail = player.generate_image_url_from_track_id(track_id)
|
|
elif item_type is not None:
|
|
item_thumbnail = entity.get_browse_image_url(
|
|
item_type, item["id"], track_id
|
|
)
|
|
|
|
elif search_type in ["apps", "radios"]:
|
|
item_thumbnail = player.generate_image_url(item["icon"])
|
|
if item_thumbnail is None:
|
|
item_thumbnail = item.get("image_url") # will not be proxied by HA
|
|
return item_thumbnail
|
|
|
|
|
|
async def build_item_response(
|
|
entity: MediaPlayerEntity,
|
|
player: Player,
|
|
payload: dict[str, str | None],
|
|
browse_limit: int,
|
|
browse_data: BrowseData,
|
|
) -> BrowseMedia:
|
|
"""Create response payload for search described by payload."""
|
|
|
|
internal_request = is_internal_request(entity.hass)
|
|
|
|
search_id = payload["search_id"]
|
|
search_type = payload["search_type"]
|
|
search_query = payload.get("search_query")
|
|
assert (
|
|
search_type is not None
|
|
) # async_browse_media will not call this function if search_type is None
|
|
media_class = browse_data.content_type_media_class[search_type]
|
|
|
|
children = None
|
|
|
|
if search_id and search_id != search_type:
|
|
browse_id = (browse_data.squeezebox_id_by_type[search_type], search_id)
|
|
else:
|
|
browse_id = None
|
|
|
|
result = await player.async_browse(
|
|
browse_data.media_type_to_squeezebox[search_type],
|
|
limit=browse_limit,
|
|
browse_id=browse_id,
|
|
search_query=search_query,
|
|
)
|
|
|
|
if result is not None and result.get("items"):
|
|
item_type = browse_data.content_type_to_child_type[search_type]
|
|
|
|
children = []
|
|
for item in result["items"]:
|
|
# Force the item id to a string in case it's numeric from some lms
|
|
item["id"] = str(item.get("id", ""))
|
|
if search_type in ["favorites", "favorite"]:
|
|
child_media = _build_response_favorites(item)
|
|
|
|
elif search_type in ["apps", "radios"]:
|
|
# item["cmd"] contains the name of the command to use with the cli for the app
|
|
# add the command to the dictionaries
|
|
if item["title"] == "Search" or item.get("type") in UNPLAYABLE_TYPES:
|
|
# Skip searches in apps as they'd need UI or if the link isn't to audio
|
|
continue
|
|
app_cmd = "app-" + item["cmd"]
|
|
|
|
if app_cmd not in browse_data.known_apps_radios:
|
|
browse_data.known_apps_radios.add(app_cmd)
|
|
_add_new_command_to_browse_data(browse_data, app_cmd, "item_id")
|
|
|
|
child_media = _build_response_apps_radios_category(
|
|
browse_data=browse_data, cmd=app_cmd, item=item
|
|
)
|
|
|
|
elif search_type in browse_data.known_apps_radios:
|
|
if (
|
|
item.get("title") in ["Search", None]
|
|
or item.get("type") in UNPLAYABLE_TYPES
|
|
):
|
|
# Skip searches in apps as they'd need UI
|
|
continue
|
|
|
|
child_media = _build_response_known_app(browse_data, search_type, item)
|
|
|
|
elif item_type:
|
|
child_media = BrowseMedia(
|
|
media_content_id=item["id"],
|
|
title=item["title"],
|
|
media_content_type=item_type,
|
|
media_class=CONTENT_TYPE_MEDIA_CLASS[item_type]["item"],
|
|
can_expand=bool(CONTENT_TYPE_MEDIA_CLASS[item_type]["children"]),
|
|
can_play=True,
|
|
)
|
|
|
|
assert child_media.media_class is not None
|
|
|
|
child_media.thumbnail = _get_item_thumbnail(
|
|
item=item,
|
|
player=player,
|
|
entity=entity,
|
|
item_type=item_type,
|
|
search_type=search_type,
|
|
internal_request=internal_request,
|
|
)
|
|
|
|
children.append(child_media)
|
|
|
|
if children is None:
|
|
raise BrowseError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="browse_media_not_found",
|
|
translation_placeholders={
|
|
"type": str(search_type),
|
|
"id": str(search_id),
|
|
},
|
|
)
|
|
|
|
assert media_class["item"] is not None
|
|
if not search_id:
|
|
search_id = search_type
|
|
|
|
return BrowseMedia(
|
|
title=result.get("title"),
|
|
media_class=media_class["item"],
|
|
children_media_class=media_class["children"],
|
|
media_content_id=search_id,
|
|
media_content_type=search_type,
|
|
can_play=any(child.can_play for child in children),
|
|
children=children,
|
|
can_expand=True,
|
|
)
|
|
|
|
|
|
async def library_payload(
|
|
hass: HomeAssistant,
|
|
player: Player,
|
|
browse_media: BrowseData,
|
|
) -> BrowseMedia:
|
|
"""Create response payload to describe contents of library."""
|
|
library_info: dict[str, Any] = {
|
|
"title": "Music Library",
|
|
"media_class": MediaClass.DIRECTORY,
|
|
"media_content_id": "library",
|
|
"media_content_type": "library",
|
|
"can_play": False,
|
|
"can_expand": True,
|
|
"children": [],
|
|
}
|
|
|
|
for item in LIBRARY:
|
|
media_class = browse_media.content_type_media_class[item]
|
|
|
|
result = await player.async_browse(
|
|
browse_media.media_type_to_squeezebox[item],
|
|
limit=1,
|
|
)
|
|
if result is not None and result.get("items") is not None:
|
|
assert media_class["children"] is not None
|
|
library_info["children"].append(
|
|
BrowseMedia(
|
|
title=item.title(),
|
|
media_class=media_class["children"],
|
|
media_content_id=item,
|
|
media_content_type=item,
|
|
can_play=item not in ["favorites", "apps", "radios"],
|
|
can_expand=True,
|
|
)
|
|
)
|
|
|
|
with contextlib.suppress(media_source.BrowseError):
|
|
browse = await media_source.async_browse_media(
|
|
hass, None, content_filter=media_source_content_filter
|
|
)
|
|
# If domain is None, it's overview of available sources
|
|
if browse.domain is None:
|
|
library_info["children"].extend(browse.children)
|
|
else:
|
|
library_info["children"].append(browse)
|
|
|
|
return BrowseMedia(**library_info)
|
|
|
|
|
|
def media_source_content_filter(item: BrowseMedia) -> bool:
|
|
"""Content filter for media sources."""
|
|
return item.media_content_type.startswith("audio/")
|
|
|
|
|
|
async def generate_playlist(
|
|
player: Player,
|
|
payload: dict[str, str],
|
|
browse_limit: int,
|
|
browse_media: BrowseData,
|
|
) -> list | None:
|
|
"""Generate playlist from browsing payload."""
|
|
media_type = payload["search_type"]
|
|
media_id = payload["search_id"]
|
|
|
|
if media_type not in browse_media.squeezebox_id_by_type:
|
|
raise BrowseError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="browse_media_type_not_supported",
|
|
translation_placeholders={
|
|
"media_type": str(media_type),
|
|
},
|
|
)
|
|
|
|
browse_id = (browse_media.squeezebox_id_by_type[media_type], media_id)
|
|
if media_type.startswith("app-"):
|
|
category = media_type
|
|
else:
|
|
category = "titles"
|
|
|
|
result = await player.async_browse(
|
|
category, limit=browse_limit, browse_id=browse_id
|
|
)
|
|
if result and "items" in result:
|
|
items: list = result["items"]
|
|
return items
|
|
raise BrowseError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="browse_media_not_found",
|
|
translation_placeholders={
|
|
"type": str(media_type),
|
|
"id": str(media_id),
|
|
},
|
|
)
|