679 lines
25 KiB
Python
679 lines
25 KiB
Python
"""Denon HEOS Media Player."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Awaitable, Callable, Coroutine, Sequence
|
|
from contextlib import suppress
|
|
import dataclasses
|
|
from datetime import datetime
|
|
from functools import reduce, wraps
|
|
import logging
|
|
from operator import ior
|
|
from typing import Any, Final
|
|
|
|
from pyheos import (
|
|
AddCriteriaType,
|
|
ControlType,
|
|
HeosError,
|
|
HeosPlayer,
|
|
MediaItem,
|
|
MediaMusicSource,
|
|
MediaType as HeosMediaType,
|
|
PlayState,
|
|
RepeatType,
|
|
const as heos_const,
|
|
)
|
|
from pyheos.util import mediauri as heos_source
|
|
|
|
from homeassistant.components import media_source
|
|
from homeassistant.components.media_player import (
|
|
ATTR_MEDIA_ENQUEUE,
|
|
BrowseError,
|
|
BrowseMedia,
|
|
MediaClass,
|
|
MediaPlayerEnqueue,
|
|
MediaPlayerEntity,
|
|
MediaPlayerEntityFeature,
|
|
MediaPlayerState,
|
|
MediaType,
|
|
RepeatMode,
|
|
async_process_play_media_url,
|
|
)
|
|
from homeassistant.components.media_source import BrowseMediaSource
|
|
from homeassistant.const import Platform
|
|
from homeassistant.core import HomeAssistant, ServiceResponse, callback
|
|
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
|
|
from homeassistant.helpers import entity_registry as er
|
|
from homeassistant.helpers.device_registry import DeviceInfo
|
|
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
|
|
from homeassistant.helpers.update_coordinator import CoordinatorEntity
|
|
from homeassistant.util.dt import utcnow
|
|
|
|
from . import services
|
|
from .const import DOMAIN
|
|
from .coordinator import HeosConfigEntry, HeosCoordinator
|
|
|
|
PARALLEL_UPDATES = 0
|
|
|
|
BROWSE_ROOT: Final = "heos://media"
|
|
|
|
BASE_SUPPORTED_FEATURES = (
|
|
MediaPlayerEntityFeature.VOLUME_MUTE
|
|
| MediaPlayerEntityFeature.VOLUME_SET
|
|
| MediaPlayerEntityFeature.VOLUME_STEP
|
|
| MediaPlayerEntityFeature.CLEAR_PLAYLIST
|
|
| MediaPlayerEntityFeature.SELECT_SOURCE
|
|
| MediaPlayerEntityFeature.PLAY_MEDIA
|
|
| MediaPlayerEntityFeature.GROUPING
|
|
| MediaPlayerEntityFeature.BROWSE_MEDIA
|
|
| MediaPlayerEntityFeature.MEDIA_ENQUEUE
|
|
)
|
|
|
|
PLAY_STATE_TO_STATE = {
|
|
None: MediaPlayerState.IDLE,
|
|
PlayState.UNKNOWN: MediaPlayerState.IDLE,
|
|
PlayState.PLAY: MediaPlayerState.PLAYING,
|
|
PlayState.STOP: MediaPlayerState.IDLE,
|
|
PlayState.PAUSE: MediaPlayerState.PAUSED,
|
|
}
|
|
|
|
CONTROL_TO_SUPPORT = {
|
|
ControlType.PLAY: MediaPlayerEntityFeature.PLAY,
|
|
ControlType.PAUSE: MediaPlayerEntityFeature.PAUSE,
|
|
ControlType.STOP: MediaPlayerEntityFeature.STOP,
|
|
ControlType.PLAY_PREVIOUS: MediaPlayerEntityFeature.PREVIOUS_TRACK,
|
|
ControlType.PLAY_NEXT: MediaPlayerEntityFeature.NEXT_TRACK,
|
|
}
|
|
|
|
HA_HEOS_ENQUEUE_MAP = {
|
|
None: AddCriteriaType.REPLACE_AND_PLAY,
|
|
MediaPlayerEnqueue.ADD: AddCriteriaType.ADD_TO_END,
|
|
MediaPlayerEnqueue.REPLACE: AddCriteriaType.REPLACE_AND_PLAY,
|
|
MediaPlayerEnqueue.NEXT: AddCriteriaType.PLAY_NEXT,
|
|
MediaPlayerEnqueue.PLAY: AddCriteriaType.PLAY_NOW,
|
|
}
|
|
|
|
HEOS_HA_REPEAT_TYPE_MAP = {
|
|
RepeatType.OFF: RepeatMode.OFF,
|
|
RepeatType.ON_ALL: RepeatMode.ALL,
|
|
RepeatType.ON_ONE: RepeatMode.ONE,
|
|
}
|
|
HA_HEOS_REPEAT_TYPE_MAP = {v: k for k, v in HEOS_HA_REPEAT_TYPE_MAP.items()}
|
|
|
|
HEOS_MEDIA_TYPE_TO_MEDIA_CLASS = {
|
|
HeosMediaType.ALBUM: MediaClass.ALBUM,
|
|
HeosMediaType.ARTIST: MediaClass.ARTIST,
|
|
HeosMediaType.CONTAINER: MediaClass.DIRECTORY,
|
|
HeosMediaType.GENRE: MediaClass.GENRE,
|
|
HeosMediaType.HEOS_SERVER: MediaClass.DIRECTORY,
|
|
HeosMediaType.HEOS_SERVICE: MediaClass.DIRECTORY,
|
|
HeosMediaType.MUSIC_SERVICE: MediaClass.DIRECTORY,
|
|
HeosMediaType.PLAYLIST: MediaClass.PLAYLIST,
|
|
HeosMediaType.SONG: MediaClass.TRACK,
|
|
HeosMediaType.STATION: MediaClass.TRACK,
|
|
}
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
async def async_setup_entry(
|
|
hass: HomeAssistant,
|
|
entry: HeosConfigEntry,
|
|
async_add_entities: AddConfigEntryEntitiesCallback,
|
|
) -> None:
|
|
"""Add media players for a config entry."""
|
|
services.register_media_player_services()
|
|
|
|
def add_entities_callback(players: Sequence[HeosPlayer]) -> None:
|
|
"""Add entities for each player."""
|
|
async_add_entities(
|
|
[HeosMediaPlayer(entry.runtime_data, player) for player in players]
|
|
)
|
|
|
|
coordinator = entry.runtime_data
|
|
coordinator.async_add_platform_callback(add_entities_callback)
|
|
add_entities_callback(list(coordinator.heos.players.values()))
|
|
|
|
|
|
type _FuncType[**_P, _R] = Callable[_P, Awaitable[_R]]
|
|
type _ReturnFuncType[**_P, _R] = Callable[_P, Coroutine[Any, Any, _R]]
|
|
|
|
|
|
def catch_action_error[**_P, _R](
|
|
action: str,
|
|
) -> Callable[[_FuncType[_P, _R]], _ReturnFuncType[_P, _R]]:
|
|
"""Return decorator that catches errors and raises HomeAssistantError."""
|
|
|
|
def decorator(func: _FuncType[_P, _R]) -> _ReturnFuncType[_P, _R]:
|
|
@wraps(func)
|
|
async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R:
|
|
try:
|
|
return await func(*args, **kwargs)
|
|
except (HeosError, ValueError) as ex:
|
|
raise HomeAssistantError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="action_error",
|
|
translation_placeholders={"action": action, "error": str(ex)},
|
|
) from ex
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
class HeosMediaPlayer(CoordinatorEntity[HeosCoordinator], MediaPlayerEntity):
|
|
"""The HEOS player."""
|
|
|
|
_attr_media_content_type = MediaType.MUSIC
|
|
_attr_supported_features = BASE_SUPPORTED_FEATURES
|
|
_attr_media_image_remotely_accessible = True
|
|
_attr_has_entity_name = True
|
|
_attr_name = None
|
|
|
|
def __init__(self, coordinator: HeosCoordinator, player: HeosPlayer) -> None:
|
|
"""Initialize."""
|
|
self._media_position_updated_at: datetime | None = None
|
|
self._player: HeosPlayer = player
|
|
self._attr_unique_id = str(player.player_id)
|
|
model_parts = player.model.split(maxsplit=1)
|
|
manufacturer = model_parts[0] if len(model_parts) == 2 else "HEOS"
|
|
model = model_parts[1] if len(model_parts) == 2 else player.model
|
|
self._attr_device_info = DeviceInfo(
|
|
identifiers={(DOMAIN, str(player.player_id))},
|
|
manufacturer=manufacturer,
|
|
model=model,
|
|
name=player.name,
|
|
serial_number=player.serial, # Only available for some models
|
|
sw_version=player.version,
|
|
)
|
|
super().__init__(coordinator, context=player.player_id)
|
|
|
|
async def _player_update(self, event: str) -> None:
|
|
"""Handle player attribute updated."""
|
|
if event == heos_const.EVENT_PLAYER_NOW_PLAYING_PROGRESS:
|
|
self._media_position_updated_at = utcnow()
|
|
self._handle_coordinator_update()
|
|
|
|
@callback
|
|
def _handle_coordinator_update(self) -> None:
|
|
"""Handle updated data from the coordinator."""
|
|
self._update_attributes()
|
|
super()._handle_coordinator_update()
|
|
|
|
@callback
|
|
def _get_group_members(self) -> list[str] | None:
|
|
"""Get group member entity IDs for the group."""
|
|
if self._player.group_id is None:
|
|
return None
|
|
if not (group := self.coordinator.heos.groups.get(self._player.group_id)):
|
|
return None
|
|
player_ids = [group.lead_player_id, *group.member_player_ids]
|
|
# Resolve player_ids to entity_ids
|
|
entity_registry = er.async_get(self.hass)
|
|
entity_ids = [
|
|
entity_id
|
|
for member_id in player_ids
|
|
if (
|
|
entity_id := entity_registry.async_get_entity_id(
|
|
Platform.MEDIA_PLAYER, DOMAIN, str(member_id)
|
|
)
|
|
)
|
|
]
|
|
return entity_ids or None
|
|
|
|
@callback
|
|
def _update_attributes(self) -> None:
|
|
"""Update core attributes of the media player."""
|
|
self._attr_group_members = self._get_group_members()
|
|
self._attr_source_list = self.coordinator.async_get_source_list()
|
|
self._attr_source = self.coordinator.async_get_current_source(
|
|
self._player.now_playing_media
|
|
)
|
|
self._attr_repeat = HEOS_HA_REPEAT_TYPE_MAP[self._player.repeat]
|
|
controls = self._player.now_playing_media.supported_controls
|
|
current_support = [CONTROL_TO_SUPPORT[control] for control in controls]
|
|
self._attr_supported_features = reduce(
|
|
ior, current_support, BASE_SUPPORTED_FEATURES
|
|
)
|
|
if self.support_next_track and self.support_previous_track:
|
|
self._attr_supported_features |= (
|
|
MediaPlayerEntityFeature.REPEAT_SET
|
|
| MediaPlayerEntityFeature.SHUFFLE_SET
|
|
)
|
|
|
|
async def async_added_to_hass(self) -> None:
|
|
"""Device added to hass."""
|
|
# Update state when attributes of the player change
|
|
self._update_attributes()
|
|
self.async_on_remove(self._player.add_on_player_event(self._player_update))
|
|
await super().async_added_to_hass()
|
|
|
|
@catch_action_error("get queue")
|
|
async def async_get_queue(self) -> ServiceResponse:
|
|
"""Get the queue for the current player."""
|
|
queue = await self._player.get_queue()
|
|
return {"queue": [dataclasses.asdict(item) for item in queue]}
|
|
|
|
@catch_action_error("clear playlist")
|
|
async def async_clear_playlist(self) -> None:
|
|
"""Clear players playlist."""
|
|
await self._player.clear_queue()
|
|
|
|
@catch_action_error("pause")
|
|
async def async_media_pause(self) -> None:
|
|
"""Send pause command."""
|
|
await self._player.pause()
|
|
|
|
@catch_action_error("play")
|
|
async def async_media_play(self) -> None:
|
|
"""Send play command."""
|
|
await self._player.play()
|
|
|
|
@catch_action_error("move to previous track")
|
|
async def async_media_previous_track(self) -> None:
|
|
"""Send previous track command."""
|
|
await self._player.play_previous()
|
|
|
|
@catch_action_error("move to next track")
|
|
async def async_media_next_track(self) -> None:
|
|
"""Send next track command."""
|
|
await self._player.play_next()
|
|
|
|
@catch_action_error("stop")
|
|
async def async_media_stop(self) -> None:
|
|
"""Send stop command."""
|
|
await self._player.stop()
|
|
|
|
@catch_action_error("set mute")
|
|
async def async_mute_volume(self, mute: bool) -> None:
|
|
"""Mute the volume."""
|
|
await self._player.set_mute(mute)
|
|
|
|
@catch_action_error("play media")
|
|
async def async_play_media(
|
|
self, media_type: MediaType | str, media_id: str, **kwargs: Any
|
|
) -> None:
|
|
"""Play a piece of media."""
|
|
if heos_source.is_media_uri(media_id):
|
|
media, data = heos_source.from_media_uri(media_id)
|
|
if not isinstance(media, MediaItem):
|
|
raise ValueError(f"Invalid media id '{media_id}'")
|
|
await self._player.play_media(
|
|
media,
|
|
HA_HEOS_ENQUEUE_MAP[kwargs.get(ATTR_MEDIA_ENQUEUE)],
|
|
)
|
|
return
|
|
|
|
if media_source.is_media_source_id(media_id):
|
|
media_type = MediaType.URL
|
|
play_item = await media_source.async_resolve_media(
|
|
self.hass, media_id, self.entity_id
|
|
)
|
|
media_id = play_item.url
|
|
|
|
if media_type in {MediaType.URL, MediaType.MUSIC}:
|
|
media_id = async_process_play_media_url(self.hass, media_id)
|
|
|
|
await self._player.play_url(media_id)
|
|
return
|
|
|
|
if media_type == "quick_select":
|
|
# media_id may be an int or a str
|
|
selects = await self._player.get_quick_selects()
|
|
try:
|
|
index: int | None = int(media_id)
|
|
except ValueError:
|
|
# Try finding index by name
|
|
index = next(
|
|
(index for index, select in selects.items() if select == media_id),
|
|
None,
|
|
)
|
|
if index is None:
|
|
raise ValueError(f"Invalid quick select '{media_id}'")
|
|
await self._player.play_quick_select(index)
|
|
return
|
|
|
|
if media_type == MediaType.PLAYLIST:
|
|
playlists = await self.coordinator.heos.get_playlists()
|
|
playlist = next((p for p in playlists if p.name == media_id), None)
|
|
if not playlist:
|
|
raise ValueError(f"Invalid playlist '{media_id}'")
|
|
add_queue_option = HA_HEOS_ENQUEUE_MAP[kwargs.get(ATTR_MEDIA_ENQUEUE)]
|
|
await self._player.play_media(playlist, add_queue_option)
|
|
return
|
|
|
|
if media_type == "favorite":
|
|
# media_id may be an int or str
|
|
try:
|
|
index = int(media_id)
|
|
except ValueError:
|
|
# Try finding index by name
|
|
index = self.coordinator.async_get_favorite_index(media_id)
|
|
if index is None:
|
|
raise ValueError(f"Invalid favorite '{media_id}'")
|
|
await self._player.play_preset_station(index)
|
|
return
|
|
|
|
if media_type == "queue":
|
|
# media_id must be an int
|
|
try:
|
|
queue_id = int(media_id)
|
|
except ValueError:
|
|
raise ValueError(f"Invalid queue id '{media_id}'") from None
|
|
await self._player.play_queue(queue_id)
|
|
return
|
|
|
|
raise ValueError(f"Unsupported media type '{media_type}'")
|
|
|
|
@catch_action_error("select source")
|
|
async def async_select_source(self, source: str) -> None:
|
|
"""Select input source."""
|
|
# Favorite
|
|
if (index := self.coordinator.async_get_favorite_index(source)) is not None:
|
|
await self._player.play_preset_station(index)
|
|
return
|
|
# Input source
|
|
for input_source in self.coordinator.inputs:
|
|
if input_source.name == source:
|
|
await self._player.play_media(input_source)
|
|
return
|
|
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="unknown_source",
|
|
translation_placeholders={"source": source},
|
|
)
|
|
|
|
@catch_action_error("set repeat")
|
|
async def async_set_repeat(self, repeat: RepeatMode) -> None:
|
|
"""Set repeat mode."""
|
|
await self._player.set_play_mode(
|
|
HA_HEOS_REPEAT_TYPE_MAP[repeat], self._player.shuffle
|
|
)
|
|
|
|
@catch_action_error("set shuffle")
|
|
async def async_set_shuffle(self, shuffle: bool) -> None:
|
|
"""Enable/disable shuffle mode."""
|
|
await self._player.set_play_mode(self._player.repeat, shuffle)
|
|
|
|
@catch_action_error("set volume level")
|
|
async def async_set_volume_level(self, volume: float) -> None:
|
|
"""Set volume level, range 0..1."""
|
|
await self._player.set_volume(int(volume * 100))
|
|
|
|
@catch_action_error("set group volume level")
|
|
async def async_set_group_volume_level(self, volume_level: float) -> None:
|
|
"""Set group volume level."""
|
|
if self._player.group_id is None:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="entity_not_grouped",
|
|
translation_placeholders={"entity_id": self.entity_id},
|
|
)
|
|
await self.coordinator.heos.set_group_volume(
|
|
self._player.group_id, int(volume_level * 100)
|
|
)
|
|
|
|
@catch_action_error("group volume down")
|
|
async def async_group_volume_down(self) -> None:
|
|
"""Turn group volume down for media player."""
|
|
if self._player.group_id is None:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="entity_not_grouped",
|
|
translation_placeholders={"entity_id": self.entity_id},
|
|
)
|
|
await self.coordinator.heos.group_volume_down(self._player.group_id)
|
|
|
|
@catch_action_error("group volume up")
|
|
async def async_group_volume_up(self) -> None:
|
|
"""Turn group volume up for media player."""
|
|
if self._player.group_id is None:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="entity_not_grouped",
|
|
translation_placeholders={"entity_id": self.entity_id},
|
|
)
|
|
await self.coordinator.heos.group_volume_up(self._player.group_id)
|
|
|
|
@catch_action_error("join players")
|
|
async def async_join_players(self, group_members: list[str]) -> None:
|
|
"""Join `group_members` as a player group with the current player."""
|
|
player_ids: list[int] = [self._player.player_id]
|
|
# Resolve entity_ids to player_ids
|
|
entity_registry = er.async_get(self.hass)
|
|
for entity_id in group_members:
|
|
entity_entry = entity_registry.async_get(entity_id)
|
|
if entity_entry is None:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="entity_not_found",
|
|
translation_placeholders={"entity_id": entity_id},
|
|
)
|
|
if entity_entry.platform != DOMAIN:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="not_heos_media_player",
|
|
translation_placeholders={"entity_id": entity_id},
|
|
)
|
|
player_id = int(entity_entry.unique_id)
|
|
if player_id not in player_ids:
|
|
player_ids.append(player_id)
|
|
await self.coordinator.heos.set_group(player_ids)
|
|
|
|
@catch_action_error("unjoin player")
|
|
async def async_unjoin_player(self) -> None:
|
|
"""Remove this player from any group."""
|
|
for group in self.coordinator.heos.groups.values():
|
|
if group.lead_player_id == self._player.player_id:
|
|
# Player is the group leader, this effectively removes the group.
|
|
await self.coordinator.heos.set_group([self._player.player_id])
|
|
return
|
|
if self._player.player_id in group.member_player_ids:
|
|
# Player is a group member, update the group to exclude it
|
|
new_members = [group.lead_player_id, *group.member_player_ids]
|
|
new_members.remove(self._player.player_id)
|
|
await self.coordinator.heos.set_group(new_members)
|
|
return
|
|
|
|
async def async_remove_from_queue(self, queue_ids: list[int]) -> None:
|
|
"""Remove items from the queue."""
|
|
await self._player.remove_from_queue(queue_ids)
|
|
|
|
@catch_action_error("move queue item")
|
|
async def async_move_queue_item(
|
|
self, queue_ids: list[int], destination_position: int
|
|
) -> None:
|
|
"""Move items in the queue."""
|
|
await self._player.move_queue_item(queue_ids, destination_position)
|
|
|
|
@property
|
|
def available(self) -> bool:
|
|
"""Return True if the device is available."""
|
|
return self._player.available
|
|
|
|
@property
|
|
def extra_state_attributes(self) -> dict[str, Any]:
|
|
"""Get additional attribute about the state."""
|
|
return {
|
|
"media_album_id": self._player.now_playing_media.album_id,
|
|
"media_queue_id": self._player.now_playing_media.queue_id,
|
|
"media_source_id": self._player.now_playing_media.source_id,
|
|
"media_station": self._player.now_playing_media.station,
|
|
"media_type": self._player.now_playing_media.type,
|
|
}
|
|
|
|
@property
|
|
def is_volume_muted(self) -> bool:
|
|
"""Boolean if volume is currently muted."""
|
|
return self._player.is_muted
|
|
|
|
@property
|
|
def media_album_name(self) -> str | None:
|
|
"""Album name of current playing media, music track only."""
|
|
return self._player.now_playing_media.album
|
|
|
|
@property
|
|
def media_artist(self) -> str | None:
|
|
"""Artist of current playing media, music track only."""
|
|
return self._player.now_playing_media.artist
|
|
|
|
@property
|
|
def media_content_id(self) -> str | None:
|
|
"""Content ID of current playing media."""
|
|
return self._player.now_playing_media.media_id
|
|
|
|
@property
|
|
def media_duration(self) -> int | None:
|
|
"""Duration of current playing media in seconds."""
|
|
duration = self._player.now_playing_media.duration
|
|
if isinstance(duration, int):
|
|
return int(duration / 1000)
|
|
return None
|
|
|
|
@property
|
|
def media_position(self) -> int | None:
|
|
"""Position of current playing media in seconds."""
|
|
# Some media doesn't have duration but reports position, return None
|
|
if not self._player.now_playing_media.duration:
|
|
return None
|
|
if isinstance(self._player.now_playing_media.current_position, int):
|
|
return int(self._player.now_playing_media.current_position / 1000)
|
|
return None
|
|
|
|
@property
|
|
def media_position_updated_at(self) -> datetime | None:
|
|
"""When was the position of the current playing media valid."""
|
|
# Some media doesn't have duration but reports position, return None
|
|
if not self._player.now_playing_media.duration:
|
|
return None
|
|
return self._media_position_updated_at
|
|
|
|
@property
|
|
def media_image_url(self) -> str | None:
|
|
"""Image url of current playing media."""
|
|
# May be an empty string, if so, return None
|
|
image_url = self._player.now_playing_media.image_url
|
|
return image_url if image_url else None
|
|
|
|
@property
|
|
def media_title(self) -> str | None:
|
|
"""Title of current playing media."""
|
|
return self._player.now_playing_media.song
|
|
|
|
@property
|
|
def shuffle(self) -> bool:
|
|
"""Boolean if shuffle is enabled."""
|
|
return self._player.shuffle
|
|
|
|
@property
|
|
def state(self) -> MediaPlayerState:
|
|
"""State of the player."""
|
|
return PLAY_STATE_TO_STATE[self._player.state]
|
|
|
|
@property
|
|
def volume_level(self) -> float:
|
|
"""Volume level of the media player (0..1)."""
|
|
return self._player.volume / 100
|
|
|
|
async def _async_browse_media_root(self) -> BrowseMedia:
|
|
"""Return media browsing root."""
|
|
if not self.coordinator.heos.music_sources:
|
|
try:
|
|
await self.coordinator.heos.get_music_sources()
|
|
except HeosError as error:
|
|
_LOGGER.debug("Unable to load music sources: %s", error)
|
|
children: list[BrowseMedia] = [
|
|
_media_to_browse_media(source)
|
|
for source in self.coordinator.heos.music_sources.values()
|
|
if source.available or source.source_id == heos_const.MUSIC_SOURCE_TUNEIN
|
|
]
|
|
root = BrowseMedia(
|
|
title="Music Sources",
|
|
media_class=MediaClass.DIRECTORY,
|
|
children_media_class=MediaClass.DIRECTORY,
|
|
media_content_type="",
|
|
media_content_id=BROWSE_ROOT,
|
|
can_expand=True,
|
|
can_play=False,
|
|
children=children,
|
|
)
|
|
# Append media source items
|
|
with suppress(BrowseError):
|
|
browse = await self._async_browse_media_source()
|
|
# If domain is None, it's an overview of available sources
|
|
if browse.domain is None and browse.children:
|
|
children.extend(browse.children)
|
|
else:
|
|
children.append(browse)
|
|
return root
|
|
|
|
async def _async_browse_heos_media(self, media_content_id: str) -> BrowseMedia:
|
|
"""Browse a HEOS media item."""
|
|
media, data = heos_source.from_media_uri(media_content_id)
|
|
browse_media = _media_to_browse_media(media)
|
|
try:
|
|
browse_result = await self.coordinator.heos.browse_media(media)
|
|
except HeosError as error:
|
|
_LOGGER.debug("Unable to browse media %s: %s", media, error)
|
|
else:
|
|
browse_media.children = [
|
|
_media_to_browse_media(item)
|
|
for item in browse_result.items
|
|
if item.browsable or item.playable
|
|
]
|
|
return browse_media
|
|
|
|
async def _async_browse_media_source(
|
|
self, media_content_id: str | None = None
|
|
) -> BrowseMediaSource:
|
|
"""Browse a media source item."""
|
|
return await media_source.async_browse_media(
|
|
self.hass,
|
|
media_content_id,
|
|
content_filter=lambda item: item.media_content_type.startswith("audio/"),
|
|
)
|
|
|
|
async def async_browse_media(
|
|
self,
|
|
media_content_type: MediaType | str | None = None,
|
|
media_content_id: str | None = None,
|
|
) -> BrowseMedia:
|
|
"""Implement the websocket media browsing helper."""
|
|
if media_content_id in (None, BROWSE_ROOT):
|
|
return await self._async_browse_media_root()
|
|
assert media_content_id is not None
|
|
if heos_source.is_media_uri(media_content_id):
|
|
return await self._async_browse_heos_media(media_content_id)
|
|
if media_source.is_media_source_id(media_content_id):
|
|
return await self._async_browse_media_source(media_content_id)
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="unsupported_media_content_id",
|
|
translation_placeholders={"media_content_id": media_content_id},
|
|
)
|
|
|
|
|
|
def _media_to_browse_media(media: MediaItem | MediaMusicSource) -> BrowseMedia:
|
|
"""Convert a HEOS media item to a browse media item."""
|
|
can_expand = False
|
|
can_play = False
|
|
|
|
if isinstance(media, MediaMusicSource):
|
|
can_expand = (
|
|
media.source_id == heos_const.MUSIC_SOURCE_TUNEIN or media.available
|
|
)
|
|
else:
|
|
can_expand = media.browsable
|
|
can_play = media.playable
|
|
|
|
return BrowseMedia(
|
|
can_expand=can_expand,
|
|
can_play=can_play,
|
|
media_content_id=heos_source.to_media_uri(media),
|
|
media_content_type="",
|
|
media_class=HEOS_MEDIA_TYPE_TO_MEDIA_CLASS[media.type],
|
|
title=media.name,
|
|
thumbnail=media.image_url,
|
|
)
|