Add musicassistant integration (#128919)

Co-authored-by: Marcel van der Veldt <m.vanderveldt@outlook.com>
pull/129501/head^2
Jozef Kruszynski 2024-10-30 14:57:01 +01:00 committed by GitHub
parent 2303521778
commit 568bdef61f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 1319 additions and 0 deletions

View File

@ -324,6 +324,7 @@ homeassistant.components.moon.*
homeassistant.components.mopeka.*
homeassistant.components.motionmount.*
homeassistant.components.mqtt.*
homeassistant.components.music_assistant.*
homeassistant.components.my.*
homeassistant.components.mysensors.*
homeassistant.components.myuplink.*

View File

@ -954,6 +954,8 @@ build.json @home-assistant/supervisor
/homeassistant/components/msteams/ @peroyvind
/homeassistant/components/mullvad/ @meichthys
/tests/components/mullvad/ @meichthys
/homeassistant/components/music_assistant/ @music-assistant
/tests/components/music_assistant/ @music-assistant
/homeassistant/components/mutesync/ @currentoor
/tests/components/mutesync/ @currentoor
/homeassistant/components/my/ @home-assistant/core

View File

@ -0,0 +1,164 @@
"""Music Assistant (music-assistant.io) integration."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import TYPE_CHECKING
from music_assistant_client import MusicAssistantClient
from music_assistant_client.exceptions import CannotConnect, InvalidServerVersion
from music_assistant_models.enums import EventType
from music_assistant_models.errors import MusicAssistantError
from homeassistant.config_entries import ConfigEntry, ConfigEntryState
from homeassistant.const import CONF_URL, EVENT_HOMEASSISTANT_STOP, Platform
from homeassistant.core import Event, HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.issue_registry import (
IssueSeverity,
async_create_issue,
async_delete_issue,
)
from .const import DOMAIN, LOGGER
if TYPE_CHECKING:
from music_assistant_models.event import MassEvent
type MusicAssistantConfigEntry = ConfigEntry[MusicAssistantEntryData]
PLATFORMS = [Platform.MEDIA_PLAYER]
CONNECT_TIMEOUT = 10
LISTEN_READY_TIMEOUT = 30
@dataclass
class MusicAssistantEntryData:
"""Hold Mass data for the config entry."""
mass: MusicAssistantClient
listen_task: asyncio.Task
async def async_setup_entry(
hass: HomeAssistant, entry: MusicAssistantConfigEntry
) -> bool:
"""Set up from a config entry."""
http_session = async_get_clientsession(hass, verify_ssl=False)
mass_url = entry.data[CONF_URL]
mass = MusicAssistantClient(mass_url, http_session)
try:
async with asyncio.timeout(CONNECT_TIMEOUT):
await mass.connect()
except (TimeoutError, CannotConnect) as err:
raise ConfigEntryNotReady(
f"Failed to connect to music assistant server {mass_url}"
) from err
except InvalidServerVersion as err:
async_create_issue(
hass,
DOMAIN,
"invalid_server_version",
is_fixable=False,
severity=IssueSeverity.ERROR,
translation_key="invalid_server_version",
)
raise ConfigEntryNotReady(f"Invalid server version: {err}") from err
except MusicAssistantError as err:
LOGGER.exception("Failed to connect to music assistant server", exc_info=err)
raise ConfigEntryNotReady(
f"Unknown error connecting to the Music Assistant server {mass_url}"
) from err
async_delete_issue(hass, DOMAIN, "invalid_server_version")
async def on_hass_stop(event: Event) -> None:
"""Handle incoming stop event from Home Assistant."""
await mass.disconnect()
entry.async_on_unload(
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, on_hass_stop)
)
# launch the music assistant client listen task in the background
# use the init_ready event to wait until initialization is done
init_ready = asyncio.Event()
listen_task = asyncio.create_task(_client_listen(hass, entry, mass, init_ready))
try:
async with asyncio.timeout(LISTEN_READY_TIMEOUT):
await init_ready.wait()
except TimeoutError as err:
listen_task.cancel()
raise ConfigEntryNotReady("Music Assistant client not ready") from err
entry.runtime_data = MusicAssistantEntryData(mass, listen_task)
# If the listen task is already failed, we need to raise ConfigEntryNotReady
if listen_task.done() and (listen_error := listen_task.exception()) is not None:
await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
try:
await mass.disconnect()
finally:
raise ConfigEntryNotReady(listen_error) from listen_error
# initialize platforms
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
# register listener for removed players
async def handle_player_removed(event: MassEvent) -> None:
"""Handle Mass Player Removed event."""
if event.object_id is None:
return
dev_reg = dr.async_get(hass)
if hass_device := dev_reg.async_get_device({(DOMAIN, event.object_id)}):
dev_reg.async_update_device(
hass_device.id, remove_config_entry_id=entry.entry_id
)
entry.async_on_unload(
mass.subscribe(handle_player_removed, EventType.PLAYER_REMOVED)
)
return True
async def _client_listen(
hass: HomeAssistant,
entry: ConfigEntry,
mass: MusicAssistantClient,
init_ready: asyncio.Event,
) -> None:
"""Listen with the client."""
try:
await mass.start_listening(init_ready)
except MusicAssistantError as err:
if entry.state != ConfigEntryState.LOADED:
raise
LOGGER.error("Failed to listen: %s", err)
except Exception as err: # pylint: disable=broad-except
# We need to guard against unknown exceptions to not crash this task.
if entry.state != ConfigEntryState.LOADED:
raise
LOGGER.exception("Unexpected exception: %s", err)
if not hass.is_stopping:
LOGGER.debug("Disconnected from server. Reloading integration")
hass.async_create_task(hass.config_entries.async_reload(entry.entry_id))
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
mass_entry_data: MusicAssistantEntryData = entry.runtime_data
mass_entry_data.listen_task.cancel()
await mass_entry_data.mass.disconnect()
return unload_ok

View File

@ -0,0 +1,137 @@
"""Config flow for MusicAssistant integration."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from music_assistant_client import MusicAssistantClient
from music_assistant_client.exceptions import (
CannotConnect,
InvalidServerVersion,
MusicAssistantClientException,
)
from music_assistant_models.api import ServerInfoMessage
import voluptuous as vol
from homeassistant.components import zeroconf
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_URL
from homeassistant.core import HomeAssistant
from homeassistant.helpers import aiohttp_client
from .const import DOMAIN, LOGGER
DEFAULT_URL = "http://mass.local:8095"
DEFAULT_TITLE = "Music Assistant"
def get_manual_schema(user_input: dict[str, Any]) -> vol.Schema:
"""Return a schema for the manual step."""
default_url = user_input.get(CONF_URL, DEFAULT_URL)
return vol.Schema(
{
vol.Required(CONF_URL, default=default_url): str,
}
)
async def get_server_info(hass: HomeAssistant, url: str) -> ServerInfoMessage:
"""Validate the user input allows us to connect."""
async with MusicAssistantClient(
url, aiohttp_client.async_get_clientsession(hass)
) as client:
if TYPE_CHECKING:
assert client.server_info is not None
return client.server_info
class MusicAssistantConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for MusicAssistant."""
VERSION = 1
def __init__(self) -> None:
"""Set up flow instance."""
self.server_info: ServerInfoMessage | None = None
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a manual configuration."""
errors: dict[str, str] = {}
if user_input is not None:
try:
self.server_info = await get_server_info(
self.hass, user_input[CONF_URL]
)
await self.async_set_unique_id(
self.server_info.server_id, raise_on_progress=False
)
self._abort_if_unique_id_configured(
updates={CONF_URL: self.server_info.base_url},
reload_on_update=True,
)
except CannotConnect:
errors["base"] = "cannot_connect"
except InvalidServerVersion:
errors["base"] = "invalid_server_version"
except MusicAssistantClientException:
LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
return self.async_create_entry(
title=DEFAULT_TITLE,
data={
CONF_URL: self.server_info.base_url,
},
)
return self.async_show_form(
step_id="user", data_schema=get_manual_schema(user_input), errors=errors
)
return self.async_show_form(step_id="user", data_schema=get_manual_schema({}))
async def async_step_zeroconf(
self, discovery_info: zeroconf.ZeroconfServiceInfo
) -> ConfigFlowResult:
"""Handle a discovered Mass server.
This flow is triggered by the Zeroconf component. It will check if the
host is already configured and delegate to the import step if not.
"""
# abort if discovery info is not what we expect
if "server_id" not in discovery_info.properties:
return self.async_abort(reason="missing_server_id")
# abort if we already have exactly this server_id
# reload the integration if the host got updated
self.server_info = ServerInfoMessage.from_dict(discovery_info.properties)
await self.async_set_unique_id(self.server_info.server_id)
self._abort_if_unique_id_configured(
updates={CONF_URL: self.server_info.base_url},
reload_on_update=True,
)
try:
await get_server_info(self.hass, self.server_info.base_url)
except CannotConnect:
return self.async_abort(reason="cannot_connect")
return await self.async_step_discovery_confirm()
async def async_step_discovery_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle user-confirmation of discovered server."""
if TYPE_CHECKING:
assert self.server_info is not None
if user_input is not None:
return self.async_create_entry(
title=DEFAULT_TITLE,
data={
CONF_URL: self.server_info.base_url,
},
)
self._set_confirm_only()
return self.async_show_form(
step_id="discovery_confirm",
description_placeholders={"url": self.server_info.base_url},
)

View File

@ -0,0 +1,18 @@
"""Constants for Music Assistant Component."""
import logging
DOMAIN = "music_assistant"
DOMAIN_EVENT = f"{DOMAIN}_event"
DEFAULT_NAME = "Music Assistant"
ATTR_IS_GROUP = "is_group"
ATTR_GROUP_MEMBERS = "group_members"
ATTR_GROUP_PARENTS = "group_parents"
ATTR_MASS_PLAYER_TYPE = "mass_player_type"
ATTR_ACTIVE_QUEUE = "active_queue"
ATTR_STREAM_TITLE = "stream_title"
LOGGER = logging.getLogger(__package__)

View File

@ -0,0 +1,86 @@
"""Base entity model."""
from __future__ import annotations
from typing import TYPE_CHECKING
from music_assistant_models.enums import EventType
from music_assistant_models.event import MassEvent
from music_assistant_models.player import Player
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity import Entity
from .const import DOMAIN
if TYPE_CHECKING:
from music_assistant_client import MusicAssistantClient
class MusicAssistantEntity(Entity):
"""Base Entity from Music Assistant Player."""
_attr_has_entity_name = True
_attr_should_poll = False
def __init__(self, mass: MusicAssistantClient, player_id: str) -> None:
"""Initialize MediaPlayer entity."""
self.mass = mass
self.player_id = player_id
provider = self.mass.get_provider(self.player.provider)
if TYPE_CHECKING:
assert provider is not None
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, player_id)},
manufacturer=self.player.device_info.manufacturer or provider.name,
model=self.player.device_info.model or self.player.name,
name=self.player.display_name,
configuration_url=f"{mass.server_url}/#/settings/editplayer/{player_id}",
)
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
await self.async_on_update()
self.async_on_remove(
self.mass.subscribe(
self.__on_mass_update, EventType.PLAYER_UPDATED, self.player_id
)
)
self.async_on_remove(
self.mass.subscribe(
self.__on_mass_update,
EventType.QUEUE_UPDATED,
)
)
@property
def player(self) -> Player:
"""Return the Mass Player attached to this HA entity."""
return self.mass.players[self.player_id]
@property
def unique_id(self) -> str | None:
"""Return unique id for entity."""
_base = self.player_id
if hasattr(self, "entity_description"):
return f"{_base}_{self.entity_description.key}"
return _base
@property
def available(self) -> bool:
"""Return availability of entity."""
return self.player.available and bool(self.mass.connection.connected)
async def __on_mass_update(self, event: MassEvent) -> None:
"""Call when we receive an event from MusicAssistant."""
if event.event == EventType.QUEUE_UPDATED and event.object_id not in (
self.player.active_source,
self.player.active_group,
self.player.player_id,
):
return
await self.async_on_update()
self.async_write_ha_state()
async def async_on_update(self) -> None:
"""Handle player updates."""

View File

@ -0,0 +1,13 @@
{
"domain": "music_assistant",
"name": "Music Assistant",
"after_dependencies": ["media_source", "media_player"],
"codeowners": ["@music-assistant"],
"config_flow": true,
"documentation": "https://music-assistant.io",
"iot_class": "local_push",
"issue_tracker": "https://github.com/music-assistant/hass-music-assistant/issues",
"loggers": ["music_assistant"],
"requirements": ["music-assistant-client==1.0.3"],
"zeroconf": ["_mass._tcp.local."]
}

View File

@ -0,0 +1,557 @@
"""MediaPlayer platform for Music Assistant integration."""
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable, Coroutine, Mapping
from contextlib import suppress
import functools
import os
from typing import TYPE_CHECKING, Any
from music_assistant_models.enums import (
EventType,
MediaType,
PlayerFeature,
QueueOption,
RepeatMode as MassRepeatMode,
)
from music_assistant_models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant_models.event import MassEvent
from music_assistant_models.media_items import ItemMapping, MediaItemType, Track
from homeassistant.components import media_source
from homeassistant.components.media_player import (
ATTR_MEDIA_EXTRA,
BrowseMedia,
MediaPlayerDeviceClass,
MediaPlayerEnqueue,
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
MediaType as HAMediaType,
RepeatMode,
async_process_play_media_url,
)
from homeassistant.const import STATE_OFF
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.util.dt import utc_from_timestamp
from . import MusicAssistantConfigEntry
from .const import ATTR_ACTIVE_QUEUE, ATTR_MASS_PLAYER_TYPE, DOMAIN
from .entity import MusicAssistantEntity
if TYPE_CHECKING:
from music_assistant_client import MusicAssistantClient
from music_assistant_models.player import Player
from music_assistant_models.player_queue import PlayerQueue
SUPPORTED_FEATURES = (
MediaPlayerEntityFeature.PAUSE
| MediaPlayerEntityFeature.VOLUME_SET
| MediaPlayerEntityFeature.STOP
| MediaPlayerEntityFeature.PREVIOUS_TRACK
| MediaPlayerEntityFeature.NEXT_TRACK
| MediaPlayerEntityFeature.SHUFFLE_SET
| MediaPlayerEntityFeature.REPEAT_SET
| MediaPlayerEntityFeature.TURN_ON
| MediaPlayerEntityFeature.TURN_OFF
| MediaPlayerEntityFeature.PLAY
| MediaPlayerEntityFeature.PLAY_MEDIA
| MediaPlayerEntityFeature.VOLUME_STEP
| MediaPlayerEntityFeature.CLEAR_PLAYLIST
| MediaPlayerEntityFeature.BROWSE_MEDIA
| MediaPlayerEntityFeature.MEDIA_ENQUEUE
| MediaPlayerEntityFeature.MEDIA_ANNOUNCE
| MediaPlayerEntityFeature.SEEK
)
QUEUE_OPTION_MAP = {
# map from HA enqueue options to MA enqueue options
# which are the same but just in case
MediaPlayerEnqueue.ADD: QueueOption.ADD,
MediaPlayerEnqueue.NEXT: QueueOption.NEXT,
MediaPlayerEnqueue.PLAY: QueueOption.PLAY,
MediaPlayerEnqueue.REPLACE: QueueOption.REPLACE,
}
ATTR_RADIO_MODE = "radio_mode"
ATTR_MEDIA_ID = "media_id"
ATTR_MEDIA_TYPE = "media_type"
ATTR_ARTIST = "artist"
ATTR_ALBUM = "album"
ATTR_URL = "url"
ATTR_USE_PRE_ANNOUNCE = "use_pre_announce"
ATTR_ANNOUNCE_VOLUME = "announce_volume"
ATTR_SOURCE_PLAYER = "source_player"
ATTR_AUTO_PLAY = "auto_play"
def catch_musicassistant_error[_R, **P](
func: Callable[..., Awaitable[_R]],
) -> Callable[..., Coroutine[Any, Any, _R | None]]:
"""Check and log commands to players."""
@functools.wraps(func)
async def wrapper(
self: MusicAssistantPlayer, *args: P.args, **kwargs: P.kwargs
) -> _R | None:
"""Catch Music Assistant errors and convert to Home Assistant error."""
try:
return await func(self, *args, **kwargs)
except MusicAssistantError as err:
error_msg = str(err) or err.__class__.__name__
raise HomeAssistantError(error_msg) from err
return wrapper
async def async_setup_entry(
hass: HomeAssistant,
entry: MusicAssistantConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up Music Assistant MediaPlayer(s) from Config Entry."""
mass = entry.runtime_data.mass
added_ids = set()
async def handle_player_added(event: MassEvent) -> None:
"""Handle Mass Player Added event."""
if TYPE_CHECKING:
assert event.object_id is not None
if event.object_id in added_ids:
return
added_ids.add(event.object_id)
async_add_entities([MusicAssistantPlayer(mass, event.object_id)])
# register listener for new players
entry.async_on_unload(mass.subscribe(handle_player_added, EventType.PLAYER_ADDED))
mass_players = []
# add all current players
for player in mass.players:
added_ids.add(player.player_id)
mass_players.append(MusicAssistantPlayer(mass, player.player_id))
async_add_entities(mass_players)
class MusicAssistantPlayer(MusicAssistantEntity, MediaPlayerEntity):
"""Representation of MediaPlayerEntity from Music Assistant Player."""
_attr_name = None
_attr_media_image_remotely_accessible = True
_attr_media_content_type = HAMediaType.MUSIC
def __init__(self, mass: MusicAssistantClient, player_id: str) -> None:
"""Initialize MediaPlayer entity."""
super().__init__(mass, player_id)
self._attr_icon = self.player.icon.replace("mdi-", "mdi:")
self._attr_supported_features = SUPPORTED_FEATURES
if PlayerFeature.SYNC in self.player.supported_features:
self._attr_supported_features |= MediaPlayerEntityFeature.GROUPING
self._attr_device_class = MediaPlayerDeviceClass.SPEAKER
self._prev_time: float = 0
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
await super().async_added_to_hass()
# we subscribe to player queue time update but we only
# accept a state change on big time jumps (e.g. seeking)
async def queue_time_updated(event: MassEvent) -> None:
if event.object_id != self.player.active_source:
return
if abs((self._prev_time or 0) - event.data) > 5:
await self.async_on_update()
self.async_write_ha_state()
self._prev_time = event.data
self.async_on_remove(
self.mass.subscribe(
queue_time_updated,
EventType.QUEUE_TIME_UPDATED,
)
)
@property
def active_queue(self) -> PlayerQueue | None:
"""Return the active queue for this player (if any)."""
if not self.player.active_source:
return None
return self.mass.player_queues.get(self.player.active_source)
@property
def extra_state_attributes(self) -> Mapping[str, Any]:
"""Return additional state attributes."""
return {
ATTR_MASS_PLAYER_TYPE: self.player.type.value,
ATTR_ACTIVE_QUEUE: (
self.active_queue.queue_id if self.active_queue else None
),
}
async def async_on_update(self) -> None:
"""Handle player updates."""
if not self.available:
return
player = self.player
active_queue = self.active_queue
# update generic attributes
if player.powered and active_queue is not None:
self._attr_state = MediaPlayerState(active_queue.state.value)
if player.powered and player.state is not None:
self._attr_state = MediaPlayerState(player.state.value)
else:
self._attr_state = MediaPlayerState(STATE_OFF)
group_members_entity_ids: list[str] = []
if player.group_childs:
# translate MA group_childs to HA group_members as entity id's
entity_registry = er.async_get(self.hass)
group_members_entity_ids = [
entity_id
for child_id in player.group_childs
if (
entity_id := entity_registry.async_get_entity_id(
self.platform.domain, DOMAIN, child_id
)
)
]
self._attr_group_members = group_members_entity_ids
self._attr_volume_level = (
player.volume_level / 100 if player.volume_level is not None else None
)
self._attr_is_volume_muted = player.volume_muted
self._update_media_attributes(player, active_queue)
self._update_media_image_url(player, active_queue)
@catch_musicassistant_error
async def async_media_play(self) -> None:
"""Send play command to device."""
await self.mass.players.player_command_play(self.player_id)
@catch_musicassistant_error
async def async_media_pause(self) -> None:
"""Send pause command to device."""
await self.mass.players.player_command_pause(self.player_id)
@catch_musicassistant_error
async def async_media_stop(self) -> None:
"""Send stop command to device."""
await self.mass.players.player_command_stop(self.player_id)
@catch_musicassistant_error
async def async_media_next_track(self) -> None:
"""Send next track command to device."""
await self.mass.players.player_command_next_track(self.player_id)
@catch_musicassistant_error
async def async_media_previous_track(self) -> None:
"""Send previous track command to device."""
await self.mass.players.player_command_previous_track(self.player_id)
@catch_musicassistant_error
async def async_media_seek(self, position: float) -> None:
"""Send seek command."""
position = int(position)
await self.mass.players.player_command_seek(self.player_id, position)
@catch_musicassistant_error
async def async_mute_volume(self, mute: bool) -> None:
"""Mute the volume."""
await self.mass.players.player_command_volume_mute(self.player_id, mute)
@catch_musicassistant_error
async def async_set_volume_level(self, volume: float) -> None:
"""Send new volume_level to device."""
volume = int(volume * 100)
await self.mass.players.player_command_volume_set(self.player_id, volume)
@catch_musicassistant_error
async def async_volume_up(self) -> None:
"""Send new volume_level to device."""
await self.mass.players.player_command_volume_up(self.player_id)
@catch_musicassistant_error
async def async_volume_down(self) -> None:
"""Send new volume_level to device."""
await self.mass.players.player_command_volume_down(self.player_id)
@catch_musicassistant_error
async def async_turn_on(self) -> None:
"""Turn on device."""
await self.mass.players.player_command_power(self.player_id, True)
@catch_musicassistant_error
async def async_turn_off(self) -> None:
"""Turn off device."""
await self.mass.players.player_command_power(self.player_id, False)
@catch_musicassistant_error
async def async_set_shuffle(self, shuffle: bool) -> None:
"""Set shuffle state."""
if not self.active_queue:
return
await self.mass.player_queues.queue_command_shuffle(
self.active_queue.queue_id, shuffle
)
@catch_musicassistant_error
async def async_set_repeat(self, repeat: RepeatMode) -> None:
"""Set repeat state."""
if not self.active_queue:
return
await self.mass.player_queues.queue_command_repeat(
self.active_queue.queue_id, MassRepeatMode(repeat)
)
@catch_musicassistant_error
async def async_clear_playlist(self) -> None:
"""Clear players playlist."""
if TYPE_CHECKING:
assert self.player.active_source is not None
if queue := self.mass.player_queues.get(self.player.active_source):
await self.mass.player_queues.queue_command_clear(queue.queue_id)
@catch_musicassistant_error
async def async_play_media(
self,
media_type: MediaType | str,
media_id: str,
enqueue: MediaPlayerEnqueue | None = None,
announce: bool | None = None,
**kwargs: Any,
) -> None:
"""Send the play_media command to the media player."""
if media_source.is_media_source_id(media_id):
# Handle media_source
sourced_media = await media_source.async_resolve_media(
self.hass, media_id, self.entity_id
)
media_id = sourced_media.url
media_id = async_process_play_media_url(self.hass, media_id)
if announce:
await self._async_handle_play_announcement(
media_id,
use_pre_announce=kwargs[ATTR_MEDIA_EXTRA].get("use_pre_announce"),
announce_volume=kwargs[ATTR_MEDIA_EXTRA].get("announce_volume"),
)
return
# forward to our advanced play_media handler
await self._async_handle_play_media(
media_id=[media_id],
enqueue=enqueue,
media_type=media_type,
radio_mode=kwargs[ATTR_MEDIA_EXTRA].get(ATTR_RADIO_MODE),
)
@catch_musicassistant_error
async def async_join_players(self, group_members: list[str]) -> None:
"""Join `group_members` as a player group with the current player."""
player_ids: list[str] = []
for child_entity_id in group_members:
# resolve HA entity_id to MA player_id
if (hass_state := self.hass.states.get(child_entity_id)) is None:
continue
if (mass_player_id := hass_state.attributes.get("mass_player_id")) is None:
continue
player_ids.append(mass_player_id)
await self.mass.players.player_command_sync_many(self.player_id, player_ids)
@catch_musicassistant_error
async def async_unjoin_player(self) -> None:
"""Remove this player from any group."""
await self.mass.players.player_command_unsync(self.player_id)
@catch_musicassistant_error
async def _async_handle_play_media(
self,
media_id: list[str],
enqueue: MediaPlayerEnqueue | QueueOption | None = None,
radio_mode: bool | None = None,
media_type: str | None = None,
) -> None:
"""Send the play_media command to the media player."""
media_uris: list[str] = []
item: MediaItemType | ItemMapping | None = None
# work out (all) uri(s) to play
for media_id_str in media_id:
# URL or URI string
if "://" in media_id_str:
media_uris.append(media_id_str)
continue
# try content id as library id
if media_type and media_id_str.isnumeric():
with suppress(MediaNotFoundError):
item = await self.mass.music.get_item(
MediaType(media_type), media_id_str, "library"
)
if isinstance(item, MediaItemType | ItemMapping) and item.uri:
media_uris.append(item.uri)
continue
# try local accessible filename
elif await asyncio.to_thread(os.path.isfile, media_id_str):
media_uris.append(media_id_str)
continue
if not media_uris:
raise HomeAssistantError(
f"Could not resolve {media_id} to playable media item"
)
# determine active queue to send the play request to
if TYPE_CHECKING:
assert self.player.active_source is not None
if queue := self.mass.player_queues.get(self.player.active_source):
queue_id = queue.queue_id
else:
queue_id = self.player_id
await self.mass.player_queues.play_media(
queue_id,
media=media_uris,
option=self._convert_queueoption_to_media_player_enqueue(enqueue),
radio_mode=radio_mode if radio_mode else False,
)
@catch_musicassistant_error
async def _async_handle_play_announcement(
self,
url: str,
use_pre_announce: bool | None = None,
announce_volume: int | None = None,
) -> None:
"""Send the play_announcement command to the media player."""
await self.mass.players.play_announcement(
self.player_id, url, use_pre_announce, announce_volume
)
async def async_browse_media(
self,
media_content_type: MediaType | str | None = None,
media_content_id: str | None = None,
) -> BrowseMedia:
"""Implement the websocket media browsing helper."""
return await media_source.async_browse_media(
self.hass,
media_content_id,
content_filter=lambda item: item.media_content_type.startswith("audio/"),
)
def _update_media_image_url(
self, player: Player, queue: PlayerQueue | None
) -> None:
"""Update image URL for the active queue item."""
if queue is None or queue.current_item is None:
self._attr_media_image_url = None
return
if image_url := self.mass.get_media_item_image_url(queue.current_item):
self._attr_media_image_remotely_accessible = (
self.mass.server_url not in image_url
)
self._attr_media_image_url = image_url
return
self._attr_media_image_url = None
def _update_media_attributes(
self, player: Player, queue: PlayerQueue | None
) -> None:
"""Update media attributes for the active queue item."""
# pylint: disable=too-many-statements
self._attr_media_artist = None
self._attr_media_album_artist = None
self._attr_media_album_name = None
self._attr_media_title = None
self._attr_media_content_id = None
self._attr_media_duration = None
self._attr_media_position = None
self._attr_media_position_updated_at = None
if queue is None and player.current_media:
# player has some external source active
self._attr_media_content_id = player.current_media.uri
self._attr_app_id = player.active_source
self._attr_media_title = player.current_media.title
self._attr_media_artist = player.current_media.artist
self._attr_media_album_name = player.current_media.album
self._attr_media_duration = player.current_media.duration
# shuffle and repeat are not (yet) supported for external sources
self._attr_shuffle = None
self._attr_repeat = None
if TYPE_CHECKING:
assert player.elapsed_time is not None
self._attr_media_position = int(player.elapsed_time)
self._attr_media_position_updated_at = (
utc_from_timestamp(player.elapsed_time_last_updated)
if player.elapsed_time_last_updated
else None
)
if TYPE_CHECKING:
assert player.elapsed_time is not None
self._prev_time = player.elapsed_time
return
if queue is None:
# player has no MA queue active
self._attr_source = player.active_source
self._attr_app_id = player.active_source
return
# player has an MA queue active (either its own queue or some group queue)
self._attr_app_id = DOMAIN
self._attr_shuffle = queue.shuffle_enabled
self._attr_repeat = queue.repeat_mode.value
if not (cur_item := queue.current_item):
# queue is empty
return
self._attr_media_content_id = queue.current_item.uri
self._attr_media_duration = queue.current_item.duration
self._attr_media_position = int(queue.elapsed_time)
self._attr_media_position_updated_at = utc_from_timestamp(
queue.elapsed_time_last_updated
)
self._prev_time = queue.elapsed_time
# handle stream title (radio station icy metadata)
if (stream_details := cur_item.streamdetails) and stream_details.stream_title:
self._attr_media_album_name = cur_item.name
if " - " in stream_details.stream_title:
stream_title_parts = stream_details.stream_title.split(" - ", 1)
self._attr_media_title = stream_title_parts[1]
self._attr_media_artist = stream_title_parts[0]
else:
self._attr_media_title = stream_details.stream_title
return
if not (media_item := cur_item.media_item):
# queue is not playing a regular media item (edge case?!)
self._attr_media_title = cur_item.name
return
# queue is playing regular media item
self._attr_media_title = media_item.name
# for tracks we can extract more info
if media_item.media_type == MediaType.TRACK:
if TYPE_CHECKING:
assert isinstance(media_item, Track)
self._attr_media_artist = media_item.artist_str
if media_item.version:
self._attr_media_title += f" ({media_item.version})"
if media_item.album:
self._attr_media_album_name = media_item.album.name
self._attr_media_album_artist = getattr(
media_item.album, "artist_str", None
)
def _convert_queueoption_to_media_player_enqueue(
self, queue_option: MediaPlayerEnqueue | QueueOption | None
) -> QueueOption | None:
"""Convert a QueueOption to a MediaPlayerEnqueue."""
if isinstance(queue_option, MediaPlayerEnqueue):
queue_option = QUEUE_OPTION_MAP.get(queue_option)
return queue_option

View File

@ -0,0 +1,51 @@
{
"config": {
"step": {
"init": {
"data": {
"url": "URL of the Music Assistant server"
}
},
"manual": {
"title": "Manually add Music Assistant Server",
"description": "Enter the URL to your already running Music Assistant Server. If you do not have the Music Assistant Server running, you should install it first.",
"data": {
"url": "URL of the Music Assistant server"
}
},
"discovery_confirm": {
"description": "Do you want to add the Music Assistant Server `{url}` to Home Assistant?",
"title": "Discovered Music Assistant Server"
}
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_server_version": "The Music Assistant server is not the correct version",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"already_in_progress": "Configuration flow is already in progress",
"reconfiguration_successful": "Successfully reconfigured the Music Assistant integration.",
"cannot_connect": "Failed to connect",
"reconfigure_successful": "[%key:common::config_flow::abort::reconfigure_successful%]"
}
},
"issues": {
"invalid_server_version": {
"title": "The Music Assistant server is not the correct version",
"description": "Check if there are updates available for the Music Assistant Server and/or integration."
}
},
"selector": {
"enqueue": {
"options": {
"play": "Play",
"next": "Play next",
"add": "Add to queue",
"replace": "Play now and clear queue",
"replace_next": "Play next and clear queue"
}
}
}
}

View File

@ -383,6 +383,7 @@ FLOWS = {
"mpd",
"mqtt",
"mullvad",
"music_assistant",
"mutesync",
"mysensors",
"mystrom",

View File

@ -3944,6 +3944,12 @@
"iot_class": "cloud_polling",
"single_config_entry": true
},
"music_assistant": {
"name": "Music Assistant",
"integration_type": "hub",
"config_flow": true,
"iot_class": "local_push"
},
"mutesync": {
"name": "mutesync",
"integration_type": "hub",

View File

@ -639,6 +639,11 @@ ZEROCONF = {
},
},
],
"_mass._tcp.local.": [
{
"domain": "music_assistant",
},
],
"_matter._tcp.local.": [
{
"domain": "matter",

View File

@ -2995,6 +2995,16 @@ disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.music_assistant.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.my.*]
check_untyped_defs = true
disallow_incomplete_defs = true

View File

@ -1405,6 +1405,9 @@ mozart-api==4.1.1.116.0
# homeassistant.components.mullvad
mullvad-api==1.0.0
# homeassistant.components.music_assistant
music-assistant-client==1.0.3
# homeassistant.components.tts
mutagen==1.47.0

View File

@ -1174,6 +1174,9 @@ mozart-api==4.1.1.116.0
# homeassistant.components.mullvad
mullvad-api==1.0.0
# homeassistant.components.music_assistant
music-assistant-client==1.0.3
# homeassistant.components.tts
mutagen==1.47.0

View File

@ -0,0 +1 @@
"""The tests for the Music Assistant component."""

View File

@ -0,0 +1,35 @@
"""Music Assistant test fixtures."""
from collections.abc import Generator
from unittest.mock import patch
from music_assistant_models.api import ServerInfoMessage
import pytest
from homeassistant.components.music_assistant.config_flow import CONF_URL
from homeassistant.components.music_assistant.const import DOMAIN
from tests.common import AsyncMock, MockConfigEntry, load_fixture
@pytest.fixture
def mock_get_server_info() -> Generator[AsyncMock]:
"""Mock the function to get server info."""
with patch(
"homeassistant.components.music_assistant.config_flow.get_server_info"
) as mock_get_server_info:
mock_get_server_info.return_value = ServerInfoMessage.from_json(
load_fixture("server_info_message.json", DOMAIN)
)
yield mock_get_server_info
@pytest.fixture
def mock_config_entry() -> MockConfigEntry:
"""Mock a config entry."""
return MockConfigEntry(
domain=DOMAIN,
title="Music Assistant",
data={CONF_URL: "http://localhost:8095"},
unique_id="1234",
)

View File

@ -0,0 +1,9 @@
{
"server_id": "1234",
"server_version": "0.0.0",
"schema_version": 23,
"min_supported_schema_version": 23,
"base_url": "http://localhost:8095",
"homeassistant_addon": false,
"onboard_done": false
}

View File

@ -0,0 +1,217 @@
"""Define tests for the Music Assistant Integration config flow."""
from copy import deepcopy
from ipaddress import ip_address
from unittest import mock
from unittest.mock import AsyncMock
from music_assistant_client.exceptions import (
CannotConnect,
InvalidServerVersion,
MusicAssistantClientException,
)
from music_assistant_models.api import ServerInfoMessage
import pytest
from homeassistant.components.music_assistant.config_flow import CONF_URL
from homeassistant.components.music_assistant.const import DEFAULT_NAME, DOMAIN
from homeassistant.components.zeroconf import ZeroconfServiceInfo
from homeassistant.config_entries import SOURCE_USER, SOURCE_ZEROCONF
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from tests.common import MockConfigEntry, load_fixture
SERVER_INFO = {
"server_id": "1234",
"base_url": "http://localhost:8095",
"server_version": "0.0.0",
"schema_version": 23,
"min_supported_schema_version": 23,
"homeassistant_addon": True,
}
ZEROCONF_DATA = ZeroconfServiceInfo(
ip_address=ip_address("127.0.0.1"),
ip_addresses=[ip_address("127.0.0.1")],
hostname="mock_hostname",
port=None,
type=mock.ANY,
name=mock.ANY,
properties=SERVER_INFO,
)
async def test_full_flow(
hass: HomeAssistant,
mock_get_server_info: AsyncMock,
) -> None:
"""Test full flow."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_URL: "http://localhost:8095"},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == DEFAULT_NAME
assert result["data"] == {
CONF_URL: "http://localhost:8095",
}
assert result["result"].unique_id == "1234"
async def test_zero_conf_flow(
hass: HomeAssistant,
mock_get_server_info: AsyncMock,
) -> None:
"""Test zeroconf flow."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data=ZEROCONF_DATA,
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "discovery_confirm"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == DEFAULT_NAME
assert result["data"] == {
CONF_URL: "http://localhost:8095",
}
assert result["result"].unique_id == "1234"
async def test_zero_conf_missing_server_id(
hass: HomeAssistant,
mock_get_server_info: AsyncMock,
) -> None:
"""Test zeroconf flow with missing server id."""
bad_zero_conf_data = deepcopy(ZEROCONF_DATA)
bad_zero_conf_data.properties.pop("server_id")
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data=bad_zero_conf_data,
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "missing_server_id"
async def test_duplicate_user(
hass: HomeAssistant,
mock_get_server_info: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test duplicate user flow."""
mock_config_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_URL: "http://localhost:8095"},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_duplicate_zeroconf(
hass: HomeAssistant,
mock_get_server_info: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test duplicate zeroconf flow."""
mock_config_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data=ZEROCONF_DATA,
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
@pytest.mark.parametrize(
("exception", "error_message"),
[
(InvalidServerVersion("invalid_server_version"), "invalid_server_version"),
(CannotConnect("cannot_connect"), "cannot_connect"),
(MusicAssistantClientException("unknown"), "unknown"),
],
)
async def test_flow_user_server_version_invalid(
hass: HomeAssistant,
mock_get_server_info: AsyncMock,
exception: MusicAssistantClientException,
error_message: str,
) -> None:
"""Test user flow when server url is invalid."""
mock_get_server_info.side_effect = exception
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_URL: "http://localhost:8095"},
)
await hass.async_block_till_done()
assert result["errors"] == {"base": error_message}
mock_get_server_info.side_effect = None
mock_get_server_info.return_value = ServerInfoMessage.from_json(
load_fixture("server_info_message.json", DOMAIN)
)
assert result["type"] is FlowResultType.FORM
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_URL: "http://localhost:8095"},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
async def test_flow_zeroconf_connect_issue(
hass: HomeAssistant,
mock_get_server_info: AsyncMock,
) -> None:
"""Test zeroconf flow when server connect be reached."""
mock_get_server_info.side_effect = CannotConnect("cannot_connect")
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data=ZEROCONF_DATA,
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "cannot_connect"