627 lines
21 KiB
Python
627 lines
21 KiB
Python
"""Media Source Implementation."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import TYPE_CHECKING, Any, cast
|
|
|
|
from music_assistant_models.enums import MediaType as MASSMediaType
|
|
from music_assistant_models.media_items import MediaItemType, SearchResults
|
|
|
|
from homeassistant.components import media_source
|
|
from homeassistant.components.media_player import (
|
|
BrowseError,
|
|
BrowseMedia,
|
|
MediaClass,
|
|
MediaType,
|
|
SearchError,
|
|
SearchMedia,
|
|
SearchMediaQuery,
|
|
)
|
|
from homeassistant.core import HomeAssistant
|
|
|
|
from .const import DEFAULT_NAME, DOMAIN
|
|
|
|
if TYPE_CHECKING:
|
|
from music_assistant_client import MusicAssistantClient
|
|
|
|
MEDIA_TYPE_AUDIOBOOK = "audiobook"
|
|
MEDIA_TYPE_RADIO = "radio"
|
|
|
|
PLAYABLE_MEDIA_TYPES = [
|
|
MediaType.ALBUM,
|
|
MediaType.ARTIST,
|
|
MEDIA_TYPE_AUDIOBOOK,
|
|
MediaType.PLAYLIST,
|
|
MediaType.PODCAST,
|
|
MEDIA_TYPE_RADIO,
|
|
MediaType.PODCAST,
|
|
MediaType.TRACK,
|
|
]
|
|
|
|
LIBRARY_ARTISTS = "artists"
|
|
LIBRARY_ALBUMS = "albums"
|
|
LIBRARY_TRACKS = "tracks"
|
|
LIBRARY_PLAYLISTS = "playlists"
|
|
LIBRARY_RADIO = "radio"
|
|
LIBRARY_PODCASTS = "podcasts"
|
|
LIBRARY_AUDIOBOOKS = "audiobooks"
|
|
|
|
|
|
LIBRARY_TITLE_MAP = {
|
|
LIBRARY_ARTISTS: "Artists",
|
|
LIBRARY_ALBUMS: "Albums",
|
|
LIBRARY_TRACKS: "Tracks",
|
|
LIBRARY_PLAYLISTS: "Playlists",
|
|
LIBRARY_RADIO: "Radio stations",
|
|
LIBRARY_PODCASTS: "Podcasts",
|
|
LIBRARY_AUDIOBOOKS: "Audiobooks",
|
|
}
|
|
|
|
LIBRARY_MEDIA_CLASS_MAP = {
|
|
LIBRARY_ARTISTS: MediaClass.ARTIST,
|
|
LIBRARY_ALBUMS: MediaClass.ALBUM,
|
|
LIBRARY_TRACKS: MediaClass.TRACK,
|
|
LIBRARY_PLAYLISTS: MediaClass.PLAYLIST,
|
|
LIBRARY_RADIO: MediaClass.MUSIC, # radio is not accepted by HA
|
|
LIBRARY_PODCASTS: MediaClass.PODCAST,
|
|
LIBRARY_AUDIOBOOKS: MediaClass.DIRECTORY, # audiobook is not accepted by HA
|
|
}
|
|
|
|
MEDIA_CONTENT_TYPE_FLAC = "audio/flac"
|
|
THUMB_SIZE = 200
|
|
SORT_NAME_DESC = "sort_name_desc"
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def media_source_filter(item: BrowseMedia) -> bool:
|
|
"""Filter media sources."""
|
|
return item.media_content_type.startswith("audio/")
|
|
|
|
|
|
async def async_browse_media(
|
|
hass: HomeAssistant,
|
|
mass: MusicAssistantClient,
|
|
media_content_id: str | None,
|
|
media_content_type: str | None,
|
|
) -> BrowseMedia:
|
|
"""Browse media."""
|
|
if media_content_id is None:
|
|
return await build_main_listing(hass)
|
|
|
|
assert media_content_type is not None
|
|
|
|
if media_source.is_media_source_id(media_content_id):
|
|
return await media_source.async_browse_media(
|
|
hass, media_content_id, content_filter=media_source_filter
|
|
)
|
|
|
|
if media_content_id == LIBRARY_ARTISTS:
|
|
return await build_artists_listing(mass)
|
|
if media_content_id == LIBRARY_ALBUMS:
|
|
return await build_albums_listing(mass)
|
|
if media_content_id == LIBRARY_TRACKS:
|
|
return await build_tracks_listing(mass)
|
|
if media_content_id == LIBRARY_PLAYLISTS:
|
|
return await build_playlists_listing(mass)
|
|
if media_content_id == LIBRARY_RADIO:
|
|
return await build_radio_listing(mass)
|
|
if media_content_id == LIBRARY_PODCASTS:
|
|
return await build_podcasts_listing(mass)
|
|
if media_content_id == LIBRARY_AUDIOBOOKS:
|
|
return await build_audiobooks_listing(mass)
|
|
if "artist" in media_content_id:
|
|
return await build_artist_items_listing(mass, media_content_id)
|
|
if "album" in media_content_id:
|
|
return await build_album_items_listing(mass, media_content_id)
|
|
if "playlist" in media_content_id:
|
|
return await build_playlist_items_listing(mass, media_content_id)
|
|
raise BrowseError(f"Media not found: {media_content_type} / {media_content_id}")
|
|
|
|
|
|
async def build_main_listing(hass: HomeAssistant) -> BrowseMedia:
|
|
"""Build main browse listing."""
|
|
children: list[BrowseMedia] = []
|
|
for library, media_class in LIBRARY_MEDIA_CLASS_MAP.items():
|
|
child_source = BrowseMedia(
|
|
media_class=MediaClass.DIRECTORY,
|
|
media_content_id=library,
|
|
media_content_type=DOMAIN,
|
|
title=LIBRARY_TITLE_MAP[library],
|
|
children_media_class=media_class,
|
|
can_play=False,
|
|
can_expand=True,
|
|
)
|
|
children.append(child_source)
|
|
|
|
try:
|
|
item = await media_source.async_browse_media(
|
|
hass, None, content_filter=media_source_filter
|
|
)
|
|
# If domain is None, it's overview of available sources
|
|
if item.domain is None and item.children is not None:
|
|
children.extend(item.children)
|
|
else:
|
|
children.append(item)
|
|
except media_source.BrowseError:
|
|
pass
|
|
|
|
return BrowseMedia(
|
|
media_class=MediaClass.DIRECTORY,
|
|
media_content_id="",
|
|
media_content_type=DOMAIN,
|
|
title=DEFAULT_NAME,
|
|
can_play=False,
|
|
can_expand=True,
|
|
children=children,
|
|
)
|
|
|
|
|
|
async def build_playlists_listing(mass: MusicAssistantClient) -> BrowseMedia:
|
|
"""Build Playlists browse listing."""
|
|
media_class = LIBRARY_MEDIA_CLASS_MAP[LIBRARY_PLAYLISTS]
|
|
return BrowseMedia(
|
|
media_class=MediaClass.DIRECTORY,
|
|
media_content_id=LIBRARY_PLAYLISTS,
|
|
media_content_type=MediaType.PLAYLIST,
|
|
title=LIBRARY_TITLE_MAP[LIBRARY_PLAYLISTS],
|
|
can_play=False,
|
|
can_expand=True,
|
|
children_media_class=media_class,
|
|
children=[
|
|
build_item(mass, item, can_expand=True)
|
|
# we only grab the first page here because the
|
|
# HA media browser does not support paging
|
|
for item in await mass.music.get_library_playlists(
|
|
limit=500, order_by=SORT_NAME_DESC
|
|
)
|
|
if item.available
|
|
],
|
|
)
|
|
|
|
|
|
async def build_playlist_items_listing(
|
|
mass: MusicAssistantClient, identifier: str
|
|
) -> BrowseMedia:
|
|
"""Build Playlist items browse listing."""
|
|
playlist = await mass.music.get_item_by_uri(identifier)
|
|
if TYPE_CHECKING:
|
|
assert playlist.uri is not None
|
|
|
|
return BrowseMedia(
|
|
media_class=MediaClass.PLAYLIST,
|
|
media_content_id=playlist.uri,
|
|
media_content_type=MediaType.PLAYLIST,
|
|
title=playlist.name,
|
|
can_play=True,
|
|
can_expand=True,
|
|
children_media_class=MediaClass.TRACK,
|
|
children=[
|
|
build_item(mass, item, can_expand=False)
|
|
# we only grab the first page here because the
|
|
# HA media browser does not support paging
|
|
for item in await mass.music.get_playlist_tracks(
|
|
playlist.item_id, playlist.provider
|
|
)
|
|
if item.available
|
|
],
|
|
)
|
|
|
|
|
|
async def build_artists_listing(mass: MusicAssistantClient) -> BrowseMedia:
|
|
"""Build Albums browse listing."""
|
|
media_class = LIBRARY_MEDIA_CLASS_MAP[LIBRARY_ARTISTS]
|
|
|
|
return BrowseMedia(
|
|
media_class=MediaClass.DIRECTORY,
|
|
media_content_id=LIBRARY_ARTISTS,
|
|
media_content_type=MediaType.ARTIST,
|
|
title=LIBRARY_TITLE_MAP[LIBRARY_ARTISTS],
|
|
can_play=False,
|
|
can_expand=True,
|
|
children_media_class=media_class,
|
|
children=[
|
|
build_item(mass, artist, can_expand=True)
|
|
# we only grab the first page here because the
|
|
# HA media browser does not support paging
|
|
for artist in await mass.music.get_library_artists(
|
|
limit=500, order_by=SORT_NAME_DESC
|
|
)
|
|
if artist.available
|
|
],
|
|
)
|
|
|
|
|
|
async def build_artist_items_listing(
|
|
mass: MusicAssistantClient, identifier: str
|
|
) -> BrowseMedia:
|
|
"""Build Artist items browse listing."""
|
|
artist = await mass.music.get_item_by_uri(identifier)
|
|
albums = await mass.music.get_artist_albums(artist.item_id, artist.provider)
|
|
|
|
if TYPE_CHECKING:
|
|
assert artist.uri is not None
|
|
|
|
return BrowseMedia(
|
|
media_class=MediaType.ARTIST,
|
|
media_content_id=artist.uri,
|
|
media_content_type=MediaType.ARTIST,
|
|
title=artist.name,
|
|
can_play=True,
|
|
can_expand=True,
|
|
children_media_class=MediaClass.ALBUM,
|
|
children=[
|
|
build_item(mass, album, can_expand=True)
|
|
for album in albums
|
|
if album.available
|
|
],
|
|
)
|
|
|
|
|
|
async def build_albums_listing(mass: MusicAssistantClient) -> BrowseMedia:
|
|
"""Build Albums browse listing."""
|
|
media_class = LIBRARY_MEDIA_CLASS_MAP[LIBRARY_ALBUMS]
|
|
|
|
return BrowseMedia(
|
|
media_class=MediaClass.DIRECTORY,
|
|
media_content_id=LIBRARY_ALBUMS,
|
|
media_content_type=MediaType.ALBUM,
|
|
title=LIBRARY_TITLE_MAP[LIBRARY_ALBUMS],
|
|
can_play=False,
|
|
can_expand=True,
|
|
children_media_class=media_class,
|
|
children=[
|
|
build_item(mass, album, can_expand=True)
|
|
# we only grab the first page here because the
|
|
# HA media browser does not support paging
|
|
for album in await mass.music.get_library_albums(
|
|
limit=500, order_by=SORT_NAME_DESC
|
|
)
|
|
if album.available
|
|
],
|
|
)
|
|
|
|
|
|
async def build_album_items_listing(
|
|
mass: MusicAssistantClient, identifier: str
|
|
) -> BrowseMedia:
|
|
"""Build Album items browse listing."""
|
|
album = await mass.music.get_item_by_uri(identifier)
|
|
tracks = await mass.music.get_album_tracks(album.item_id, album.provider)
|
|
|
|
if TYPE_CHECKING:
|
|
assert album.uri is not None
|
|
|
|
return BrowseMedia(
|
|
media_class=MediaType.ALBUM,
|
|
media_content_id=album.uri,
|
|
media_content_type=MediaType.ALBUM,
|
|
title=album.name,
|
|
can_play=True,
|
|
can_expand=True,
|
|
children_media_class=MediaClass.TRACK,
|
|
children=[
|
|
build_item(mass, track, False) for track in tracks if track.available
|
|
],
|
|
)
|
|
|
|
|
|
async def build_tracks_listing(mass: MusicAssistantClient) -> BrowseMedia:
|
|
"""Build Tracks browse listing."""
|
|
media_class = LIBRARY_MEDIA_CLASS_MAP[LIBRARY_TRACKS]
|
|
|
|
return BrowseMedia(
|
|
media_class=MediaClass.DIRECTORY,
|
|
media_content_id=LIBRARY_TRACKS,
|
|
media_content_type=MediaType.TRACK,
|
|
title=LIBRARY_TITLE_MAP[LIBRARY_TRACKS],
|
|
can_play=False,
|
|
can_expand=True,
|
|
children_media_class=media_class,
|
|
children=[
|
|
build_item(mass, track, can_expand=False)
|
|
# we only grab the first page here because the
|
|
# HA media browser does not support paging
|
|
for track in await mass.music.get_library_tracks(
|
|
limit=500, order_by=SORT_NAME_DESC
|
|
)
|
|
if track.available
|
|
],
|
|
)
|
|
|
|
|
|
async def build_podcasts_listing(mass: MusicAssistantClient) -> BrowseMedia:
|
|
"""Build Podcasts browse listing."""
|
|
media_class = LIBRARY_MEDIA_CLASS_MAP[LIBRARY_PODCASTS]
|
|
return BrowseMedia(
|
|
media_class=MediaClass.DIRECTORY,
|
|
media_content_id=LIBRARY_PODCASTS,
|
|
media_content_type=MediaType.PODCAST,
|
|
title=LIBRARY_TITLE_MAP[LIBRARY_PODCASTS],
|
|
can_play=False,
|
|
can_expand=True,
|
|
children_media_class=media_class,
|
|
children=[
|
|
build_item(mass, podcast, can_expand=False)
|
|
# we only grab the first page here because the
|
|
# HA media browser does not support paging
|
|
for podcast in await mass.music.get_library_podcasts(
|
|
limit=500, order_by=SORT_NAME_DESC
|
|
)
|
|
if podcast.available
|
|
],
|
|
)
|
|
|
|
|
|
async def build_audiobooks_listing(mass: MusicAssistantClient) -> BrowseMedia:
|
|
"""Build Audiobooks browse listing."""
|
|
media_class = LIBRARY_MEDIA_CLASS_MAP[LIBRARY_AUDIOBOOKS]
|
|
return BrowseMedia(
|
|
media_class=MediaClass.DIRECTORY,
|
|
media_content_id=LIBRARY_AUDIOBOOKS,
|
|
media_content_type=DOMAIN,
|
|
title=LIBRARY_TITLE_MAP[LIBRARY_AUDIOBOOKS],
|
|
can_play=False,
|
|
can_expand=True,
|
|
children_media_class=media_class,
|
|
children=[
|
|
build_item(mass, audiobook, can_expand=False)
|
|
# we only grab the first page here because the
|
|
# HA media browser does not support paging
|
|
for audiobook in await mass.music.get_library_audiobooks(
|
|
limit=500, order_by=SORT_NAME_DESC
|
|
)
|
|
if audiobook.available
|
|
],
|
|
)
|
|
|
|
|
|
async def build_radio_listing(mass: MusicAssistantClient) -> BrowseMedia:
|
|
"""Build Radio browse listing."""
|
|
media_class = LIBRARY_MEDIA_CLASS_MAP[LIBRARY_RADIO]
|
|
return BrowseMedia(
|
|
media_class=MediaClass.DIRECTORY,
|
|
media_content_id=LIBRARY_RADIO,
|
|
media_content_type=DOMAIN,
|
|
title=LIBRARY_TITLE_MAP[LIBRARY_RADIO],
|
|
can_play=False,
|
|
can_expand=True,
|
|
children_media_class=media_class,
|
|
children=[
|
|
build_item(mass, track, can_expand=False, media_class=media_class)
|
|
# we only grab the first page here because the
|
|
# HA media browser does not support paging
|
|
for track in await mass.music.get_library_radios(
|
|
limit=500, order_by=SORT_NAME_DESC
|
|
)
|
|
if track.available
|
|
],
|
|
)
|
|
|
|
|
|
def build_item(
|
|
mass: MusicAssistantClient,
|
|
item: MediaItemType,
|
|
can_expand: bool = True,
|
|
media_class: Any = None,
|
|
) -> BrowseMedia:
|
|
"""Return BrowseMedia for MediaItem."""
|
|
if artists := getattr(item, "artists", None):
|
|
title = f"{artists[0].name} - {item.name}"
|
|
else:
|
|
title = item.name
|
|
img_url = mass.get_media_item_image_url(item)
|
|
|
|
if TYPE_CHECKING:
|
|
assert item.uri is not None
|
|
|
|
return BrowseMedia(
|
|
media_class=media_class or item.media_type.value,
|
|
media_content_id=item.uri,
|
|
media_content_type=MediaType.MUSIC,
|
|
title=title,
|
|
can_play=True,
|
|
can_expand=can_expand,
|
|
thumbnail=img_url,
|
|
)
|
|
|
|
|
|
async def _search_within_album(
|
|
mass: MusicAssistantClient, album_uri: str, search_query: str, limit: int
|
|
) -> SearchMedia:
|
|
"""Search for tracks within a specific album."""
|
|
album = await mass.music.get_item_by_uri(album_uri)
|
|
tracks = await mass.music.get_album_tracks(album.item_id, album.provider)
|
|
|
|
# Filter tracks by search query
|
|
filtered_tracks = [
|
|
track
|
|
for track in tracks
|
|
if search_query.lower() in track.name.lower() and track.available
|
|
]
|
|
|
|
return SearchMedia(
|
|
result=[
|
|
build_item(mass, track, can_expand=False)
|
|
for track in filtered_tracks[:limit]
|
|
]
|
|
)
|
|
|
|
|
|
async def _search_within_artist(
|
|
mass: MusicAssistantClient, artist_uri: str, search_query: str, limit: int
|
|
) -> SearchResults:
|
|
"""Search for content within an artist's catalog."""
|
|
artist = await mass.music.get_item_by_uri(artist_uri)
|
|
search_query = f"{artist.name} - {search_query}"
|
|
return await mass.music.search(
|
|
search_query,
|
|
media_types=[MASSMediaType.ALBUM, MASSMediaType.TRACK],
|
|
limit=limit,
|
|
)
|
|
|
|
|
|
def _get_media_types_from_query(query: SearchMediaQuery) -> list[MASSMediaType]:
|
|
"""Map query to Music Assistant media types."""
|
|
media_types: list[MASSMediaType] = []
|
|
|
|
match query.media_content_type:
|
|
case MediaType.ARTIST:
|
|
media_types = [MASSMediaType.ARTIST]
|
|
case MediaType.ALBUM:
|
|
media_types = [MASSMediaType.ALBUM]
|
|
case MediaType.TRACK:
|
|
media_types = [MASSMediaType.TRACK]
|
|
case MediaType.PLAYLIST:
|
|
media_types = [MASSMediaType.PLAYLIST]
|
|
case "radio":
|
|
media_types = [MASSMediaType.RADIO]
|
|
case "audiobook":
|
|
media_types = [MASSMediaType.AUDIOBOOK]
|
|
case MediaType.PODCAST:
|
|
media_types = [MASSMediaType.PODCAST]
|
|
case _:
|
|
# No specific type selected
|
|
if query.media_filter_classes:
|
|
# Map MediaClass to search types
|
|
mapping = {
|
|
MediaClass.ARTIST: MASSMediaType.ARTIST,
|
|
MediaClass.ALBUM: MASSMediaType.ALBUM,
|
|
MediaClass.TRACK: MASSMediaType.TRACK,
|
|
MediaClass.PLAYLIST: MASSMediaType.PLAYLIST,
|
|
MediaClass.MUSIC: MASSMediaType.RADIO,
|
|
MediaClass.DIRECTORY: MASSMediaType.AUDIOBOOK,
|
|
MediaClass.PODCAST: MASSMediaType.PODCAST,
|
|
}
|
|
media_types = [
|
|
mapping[cls] for cls in query.media_filter_classes if cls in mapping
|
|
]
|
|
|
|
# Default to all types if none specified
|
|
if not media_types:
|
|
media_types = [
|
|
MASSMediaType.ARTIST,
|
|
MASSMediaType.ALBUM,
|
|
MASSMediaType.TRACK,
|
|
MASSMediaType.PLAYLIST,
|
|
MASSMediaType.RADIO,
|
|
MASSMediaType.AUDIOBOOK,
|
|
MASSMediaType.PODCAST,
|
|
]
|
|
|
|
return media_types
|
|
|
|
|
|
def _process_search_results(
|
|
mass: MusicAssistantClient,
|
|
search_results: SearchResults,
|
|
media_types: list[MASSMediaType],
|
|
) -> list[BrowseMedia]:
|
|
"""Process search results into BrowseMedia items."""
|
|
result: list[BrowseMedia] = []
|
|
|
|
# Process search results for each media type
|
|
for media_type in media_types:
|
|
# Get items for each media type using pattern matching
|
|
items: list[MediaItemType] = []
|
|
match media_type:
|
|
case MASSMediaType.ARTIST if search_results.artists:
|
|
# Cast to ensure type safety
|
|
items = cast(list[MediaItemType], search_results.artists)
|
|
case MASSMediaType.ALBUM if search_results.albums:
|
|
items = cast(list[MediaItemType], search_results.albums)
|
|
case MASSMediaType.TRACK if search_results.tracks:
|
|
items = cast(list[MediaItemType], search_results.tracks)
|
|
case MASSMediaType.PLAYLIST if search_results.playlists:
|
|
items = cast(list[MediaItemType], search_results.playlists)
|
|
case MASSMediaType.RADIO if search_results.radio:
|
|
items = cast(list[MediaItemType], search_results.radio)
|
|
case MASSMediaType.PODCAST if search_results.podcasts:
|
|
items = cast(list[MediaItemType], search_results.podcasts)
|
|
case MASSMediaType.AUDIOBOOK if search_results.audiobooks:
|
|
items = cast(list[MediaItemType], search_results.audiobooks)
|
|
case _:
|
|
continue
|
|
|
|
# Add available items to results
|
|
for item in items:
|
|
if not item.available:
|
|
continue
|
|
|
|
# Create browse item
|
|
# Convert to string to get the original value since we're using MASSMediaType enum
|
|
str_media_type = media_type.value.lower()
|
|
can_expand = _should_expand_media_type(str_media_type)
|
|
media_class = _get_media_class_for_type(str_media_type)
|
|
|
|
browse_item = build_item(
|
|
mass,
|
|
item,
|
|
can_expand=can_expand,
|
|
media_class=media_class,
|
|
)
|
|
result.append(browse_item)
|
|
|
|
return result
|
|
|
|
|
|
def _should_expand_media_type(media_type: str) -> bool:
|
|
"""Determine if a media type should be expandable."""
|
|
return media_type in ("artist", "album", "playlist", "podcast")
|
|
|
|
|
|
def _get_media_class_for_type(media_type: str) -> MediaClass | None:
|
|
"""Get the appropriate media class for a given media type."""
|
|
mapping = {
|
|
"artist": LIBRARY_MEDIA_CLASS_MAP[LIBRARY_ARTISTS],
|
|
"album": LIBRARY_MEDIA_CLASS_MAP[LIBRARY_ALBUMS],
|
|
"track": LIBRARY_MEDIA_CLASS_MAP[LIBRARY_TRACKS],
|
|
"playlist": LIBRARY_MEDIA_CLASS_MAP[LIBRARY_PLAYLISTS],
|
|
"radio": LIBRARY_MEDIA_CLASS_MAP[LIBRARY_RADIO],
|
|
"podcast": LIBRARY_MEDIA_CLASS_MAP[LIBRARY_PODCASTS],
|
|
"audiobook": LIBRARY_MEDIA_CLASS_MAP[LIBRARY_AUDIOBOOKS],
|
|
}
|
|
return mapping.get(media_type)
|
|
|
|
|
|
async def async_search_media(
|
|
mass: MusicAssistantClient,
|
|
query: SearchMediaQuery,
|
|
) -> SearchMedia:
|
|
"""Search media."""
|
|
try:
|
|
search_query = query.search_query
|
|
limit = 5 # Default limit per media type
|
|
search_results: SearchResults | None = None
|
|
|
|
# Handle media_content_id if provided (for contextual searches)
|
|
if query.media_content_id:
|
|
if "album/" in query.media_content_id:
|
|
return await _search_within_album(
|
|
mass, query.media_content_id, search_query, limit
|
|
)
|
|
if "artist/" in query.media_content_id:
|
|
# For artists, we already run a search, so save the results
|
|
search_results = await _search_within_artist(
|
|
mass, query.media_content_id, search_query, limit
|
|
)
|
|
|
|
# Determine which media types to search
|
|
media_types = _get_media_types_from_query(query)
|
|
|
|
# Execute search using the Music Assistant API if we haven't already done so
|
|
if search_results is None:
|
|
search_results = await mass.music.search(
|
|
search_query, media_types=media_types, limit=limit
|
|
)
|
|
|
|
# Process the search results
|
|
result = _process_search_results(mass, search_results, media_types)
|
|
return SearchMedia(result=result)
|
|
|
|
except Exception as err:
|
|
LOGGER.debug(
|
|
"Search error details for %s: %s", query.search_query, err, exc_info=True
|
|
)
|
|
raise SearchError(f"Error searching for {query.search_query}") from err
|