"""Base class for common speaker tasks.""" from __future__ import annotations import asyncio from collections.abc import Coroutine import contextlib import datetime from functools import partial import logging from typing import Any, Callable import urllib.parse import async_timeout from pysonos.core import MUSIC_SRC_LINE_IN, MUSIC_SRC_RADIO, MUSIC_SRC_TV, SoCo from pysonos.data_structures import DidlAudioBroadcast, DidlPlaylistContainer from pysonos.events_base import Event as SonosEvent, SubscriptionBase from pysonos.exceptions import SoCoException from pysonos.music_library import MusicLibrary from pysonos.plugins.sharelink import ShareLinkPlugin from pysonos.snapshot import Snapshot from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN from homeassistant.components.media_player import DOMAIN as MP_DOMAIN from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN from homeassistant.core import HomeAssistant, callback from homeassistant.helpers import entity_registry as ent_reg from homeassistant.helpers.dispatcher import ( async_dispatcher_send, dispatcher_connect, dispatcher_send, ) from homeassistant.util import dt as dt_util from .alarms import SonosAlarms from .const import ( BATTERY_SCAN_INTERVAL, DATA_SONOS, DOMAIN, PLATFORMS, SCAN_INTERVAL, SEEN_EXPIRE_TIME, SONOS_CREATE_ALARM, SONOS_CREATE_BATTERY, SONOS_CREATE_MEDIA_PLAYER, SONOS_ENTITY_CREATED, SONOS_GROUP_UPDATE, SONOS_POLL_UPDATE, SONOS_REBOOTED, SONOS_SEEN, SONOS_STATE_PLAYING, SONOS_STATE_TRANSITIONING, SONOS_STATE_UPDATED, SOURCE_LINEIN, SOURCE_TV, SUBSCRIPTION_TIMEOUT, ) from .favorites import SonosFavorites from .helpers import soco_error EVENT_CHARGING = { "CHARGING": True, "NOT_CHARGING": False, } SUBSCRIPTION_SERVICES = [ "alarmClock", "avTransport", "contentDirectory", "deviceProperties", "renderingControl", "zoneGroupTopology", ] UNAVAILABLE_VALUES = {"", "NOT_IMPLEMENTED", None} _LOGGER = logging.getLogger(__name__) def fetch_battery_info_or_none(soco: SoCo) -> dict[str, Any] | None: """Fetch battery_info from the given SoCo object. Returns None if the device doesn't support battery info or if the device is offline. """ with contextlib.suppress(ConnectionError, TimeoutError, SoCoException): return soco.get_battery_info() def _timespan_secs(timespan: str | None) -> None | float: """Parse a time-span into number of seconds.""" if timespan in UNAVAILABLE_VALUES: return None assert timespan is not None return sum(60 ** x[0] * int(x[1]) for x in enumerate(reversed(timespan.split(":")))) class SonosMedia: """Representation of the current Sonos media.""" def __init__(self, soco: SoCo) -> None: """Initialize a SonosMedia.""" self.library = MusicLibrary(soco) self.play_mode: str | None = None self.playback_status: str | None = None self.album_name: str | None = None self.artist: str | None = None self.channel: str | None = None self.duration: float | None = None self.image_url: str | None = None self.queue_position: int | None = None self.playlist_name: str | None = None self.source_name: str | None = None self.title: str | None = None self.uri: str | None = None self.position: float | None = None self.position_updated_at: datetime.datetime | None = None def clear(self) -> None: """Clear basic media info.""" self.album_name = None self.artist = None self.channel = None self.duration = None self.image_url = None self.playlist_name = None self.queue_position = None self.source_name = None self.title = None self.uri = None def clear_position(self) -> None: """Clear the position attributes.""" self.position = None self.position_updated_at = None class SonosSpeaker: """Representation of a Sonos speaker.""" def __init__( self, hass: HomeAssistant, soco: SoCo, speaker_info: dict[str, Any] ) -> None: """Initialize a SonosSpeaker.""" self.hass = hass self.soco = soco self.household_id: str = soco.household_id self.media = SonosMedia(soco) self._share_link_plugin: ShareLinkPlugin | None = None # Synchronization helpers self._is_ready: bool = False self._platforms_ready: set[str] = set() # Subscriptions and events self.subscriptions_failed: bool = False self._subscriptions: list[SubscriptionBase] = [] self._resubscription_lock: asyncio.Lock | None = None self._event_dispatchers: dict[str, Callable] = {} # Scheduled callback handles self._poll_timer: Callable | None = None self._seen_timer: Callable | None = None # Dispatcher handles self._entity_creation_dispatcher: Callable | None = None self._group_dispatcher: Callable | None = None self._reboot_dispatcher: Callable | None = None self._seen_dispatcher: Callable | None = None # Device information self.mac_address = speaker_info["mac_address"] self.model_name = speaker_info["model_name"] self.version = speaker_info["display_version"] self.zone_name = speaker_info["zone_name"] # Battery self.battery_info: dict[str, Any] = {} self._last_battery_event: datetime.datetime | None = None self._battery_poll_timer: Callable | None = None # Volume / Sound self.volume: int | None = None self.muted: bool | None = None self.night_mode: bool | None = None self.dialog_mode: bool | None = None # Grouping self.coordinator: SonosSpeaker | None = None self.sonos_group: list[SonosSpeaker] = [self] self.sonos_group_entities: list[str] = [] self.soco_snapshot: Snapshot | None = None self.snapshot_group: list[SonosSpeaker] | None = None def setup(self) -> None: """Run initial setup of the speaker.""" self.set_basic_info() self._entity_creation_dispatcher = dispatcher_connect( self.hass, f"{SONOS_ENTITY_CREATED}-{self.soco.uid}", self.async_handle_new_entity, ) self._group_dispatcher = dispatcher_connect( self.hass, SONOS_GROUP_UPDATE, self.async_update_groups, ) self._seen_dispatcher = dispatcher_connect( self.hass, f"{SONOS_SEEN}-{self.soco.uid}", self.async_seen ) self._reboot_dispatcher = dispatcher_connect( self.hass, f"{SONOS_REBOOTED}-{self.soco.uid}", self.async_rebooted ) if battery_info := fetch_battery_info_or_none(self.soco): self.battery_info = battery_info # Battery events can be infrequent, polling is still necessary self._battery_poll_timer = self.hass.helpers.event.track_time_interval( self.async_poll_battery, BATTERY_SCAN_INTERVAL ) dispatcher_send(self.hass, SONOS_CREATE_BATTERY, self) else: self._platforms_ready.update({BINARY_SENSOR_DOMAIN, SENSOR_DOMAIN}) if new_alarms := [ alarm.alarm_id for alarm in self.alarms if alarm.zone.uid == self.soco.uid ]: dispatcher_send(self.hass, SONOS_CREATE_ALARM, self, new_alarms) else: self._platforms_ready.add(SWITCH_DOMAIN) self._event_dispatchers = { "AlarmClock": self.async_dispatch_alarms, "AVTransport": self.async_dispatch_media_update, "ContentDirectory": self.async_dispatch_favorites, "DeviceProperties": self.async_dispatch_device_properties, "RenderingControl": self.async_update_volume, "ZoneGroupTopology": self.async_update_groups, } dispatcher_send(self.hass, SONOS_CREATE_MEDIA_PLAYER, self) # # Entity management # async def async_handle_new_entity(self, entity_type: str) -> None: """Listen to new entities to trigger first subscription.""" if self._platforms_ready == PLATFORMS: return self._platforms_ready.add(entity_type) if self._platforms_ready == PLATFORMS: self._resubscription_lock = asyncio.Lock() await self.async_subscribe() self._is_ready = True def write_entity_states(self) -> None: """Write states for associated SonosEntity instances.""" dispatcher_send(self.hass, f"{SONOS_STATE_UPDATED}-{self.soco.uid}") @callback def async_write_entity_states(self) -> None: """Write states for associated SonosEntity instances.""" async_dispatcher_send(self.hass, f"{SONOS_STATE_UPDATED}-{self.soco.uid}") def set_basic_info(self) -> None: """Set basic information when speaker is reconnected.""" self.media.play_mode = self.soco.play_mode self.update_volume() # # Properties # @property def available(self) -> bool: """Return whether this speaker is available.""" return self._seen_timer is not None @property def alarms(self) -> SonosAlarms: """Return the SonosAlarms instance for this household.""" return self.hass.data[DATA_SONOS].alarms[self.household_id] @property def favorites(self) -> SonosFavorites: """Return the SonosFavorites instance for this household.""" return self.hass.data[DATA_SONOS].favorites[self.household_id] @property def is_coordinator(self) -> bool: """Return true if player is a coordinator.""" return self.coordinator is None @property def share_link(self) -> ShareLinkPlugin: """Cache the ShareLinkPlugin instance for this speaker.""" if not self._share_link_plugin: self._share_link_plugin = ShareLinkPlugin(self.soco) return self._share_link_plugin @property def subscription_address(self) -> str | None: """Return the current subscription callback address if any.""" if self._subscriptions: addr, port = self._subscriptions[0].event_listener.address return ":".join([addr, str(port)]) return None # # Subscription handling and event dispatchers # async def async_subscribe(self) -> bool: """Initiate event subscriptions.""" _LOGGER.debug("Creating subscriptions for %s", self.zone_name) try: await self.hass.async_add_executor_job(self.set_basic_info) if self._subscriptions: raise RuntimeError( f"Attempted to attach subscriptions to player: {self.soco} " f"when existing subscriptions exist: {self._subscriptions}" ) subscriptions = [ self._subscribe(getattr(self.soco, service), self.async_dispatch_event) for service in SUBSCRIPTION_SERVICES ] await asyncio.gather(*subscriptions) return True except SoCoException as ex: _LOGGER.warning("Could not connect %s: %s", self.zone_name, ex) return False async def _subscribe( self, target: SubscriptionBase, sub_callback: Callable ) -> None: """Create a Sonos subscription.""" subscription = await target.subscribe( auto_renew=True, requested_timeout=SUBSCRIPTION_TIMEOUT ) subscription.callback = sub_callback subscription.auto_renew_fail = self.async_renew_failed self._subscriptions.append(subscription) async def async_unsubscribe(self) -> None: """Cancel all subscriptions.""" _LOGGER.debug("Unsubscribing from events for %s", self.zone_name) await asyncio.gather( *[subscription.unsubscribe() for subscription in self._subscriptions], return_exceptions=True, ) self._subscriptions = [] @callback def async_renew_failed(self, exception: Exception) -> None: """Handle a failed subscription renewal.""" self.hass.async_create_task(self.async_resubscribe(exception)) async def async_resubscribe(self, exception: Exception) -> None: """Attempt to resubscribe when a renewal failure is detected.""" async with self._resubscription_lock: if not self.available: return if getattr(exception, "status", None) == 412: _LOGGER.warning( "Subscriptions for %s failed, speaker may have lost power", self.zone_name, ) else: _LOGGER.error( "Subscription renewals for %s failed", self.zone_name, exc_info=exception, ) await self.async_unseen() @callback def async_dispatch_event(self, event: SonosEvent) -> None: """Handle callback event and route as needed.""" if self._poll_timer: _LOGGER.debug( "Received event, cancelling poll timer for %s", self.zone_name ) self._poll_timer() self._poll_timer = None dispatcher = self._event_dispatchers[event.service.service_type] dispatcher(event) @callback def async_dispatch_alarms(self, event: SonosEvent) -> None: """Add the soco instance associated with the event to the callback.""" if not (event_id := event.variables.get("alarm_list_version")): return self.alarms.async_handle_event(event_id, self.soco) @callback def async_dispatch_device_properties(self, event: SonosEvent) -> None: """Update device properties from an event.""" self.hass.async_create_task(self.async_update_device_properties(event)) async def async_update_device_properties(self, event: SonosEvent) -> None: """Update device properties from an event.""" if more_info := event.variables.get("more_info"): battery_dict = dict(x.split(":") for x in more_info.split(",")) if "BattChg" not in battery_dict: _LOGGER.debug( "Unknown device properties update for %s (%s), please report an issue: '%s'", self.zone_name, self.model_name, more_info, ) return await self.async_update_battery_info(battery_dict) self.async_write_entity_states() @callback def async_dispatch_favorites(self, event: SonosEvent) -> None: """Add the soco instance associated with the event to the callback.""" if not (event_id := event.variables.get("favorites_update_id")): return self.favorites.async_handle_event(event_id, self.soco) @callback def async_dispatch_media_update(self, event: SonosEvent) -> None: """Update information about currently playing media from an event.""" self.hass.async_add_executor_job(self.update_media, event) @callback def async_update_volume(self, event: SonosEvent) -> None: """Update information about currently volume settings.""" variables = event.variables if "volume" in variables: self.volume = int(variables["volume"]["Master"]) if "mute" in variables: self.muted = variables["mute"]["Master"] == "1" if "night_mode" in variables: self.night_mode = variables["night_mode"] == "1" if "dialog_level" in variables: self.dialog_mode = variables["dialog_level"] == "1" self.async_write_entity_states() # # Speaker availability methods # async def async_seen(self, soco: SoCo | None = None) -> None: """Record that this speaker was seen right now.""" if soco is not None: self.soco = soco was_available = self.available _LOGGER.debug("Async seen: %s, was_available: %s", self.soco, was_available) if self._seen_timer: self._seen_timer() self._seen_timer = self.hass.helpers.event.async_call_later( SEEN_EXPIRE_TIME.total_seconds(), self.async_unseen ) if was_available: self.async_write_entity_states() return self._poll_timer = self.hass.helpers.event.async_track_time_interval( partial( async_dispatcher_send, self.hass, f"{SONOS_POLL_UPDATE}-{self.soco.uid}", ), SCAN_INTERVAL, ) if self._is_ready and not self.subscriptions_failed: done = await self.async_subscribe() if not done: assert self._seen_timer is not None self._seen_timer() await self.async_unseen() self.async_write_entity_states() async def async_unseen( self, now: datetime.datetime | None = None, will_reconnect: bool = False ) -> None: """Make this player unavailable when it was not seen recently.""" self._share_link_plugin = None if self._seen_timer: self._seen_timer() self._seen_timer = None if self._poll_timer: self._poll_timer() self._poll_timer = None await self.async_unsubscribe() if not will_reconnect: self.hass.data[DATA_SONOS].ssdp_known.remove(self.soco.uid) self.async_write_entity_states() async def async_rebooted(self, soco: SoCo) -> None: """Handle a detected speaker reboot.""" _LOGGER.warning( "%s rebooted or lost network connectivity, reconnecting with %s", self.zone_name, soco, ) await self.async_unseen(will_reconnect=True) await self.async_seen(soco) # # Battery management # async def async_update_battery_info(self, battery_dict: dict[str, Any]) -> None: """Update battery info using the decoded SonosEvent.""" self._last_battery_event = dt_util.utcnow() is_charging = EVENT_CHARGING[battery_dict["BattChg"]] if not self._battery_poll_timer: # Battery info received for an S1 speaker new_battery = not self.battery_info self.battery_info.update( { "Level": int(battery_dict["BattPct"]), "PowerSource": "EXTERNAL" if is_charging else "BATTERY", } ) if new_battery: _LOGGER.warning( "S1 firmware detected on %s, battery info may update infrequently", self.zone_name, ) async_dispatcher_send(self.hass, SONOS_CREATE_BATTERY, self) return if is_charging == self.charging: self.battery_info.update({"Level": int(battery_dict["BattPct"])}) else: if battery_info := await self.hass.async_add_executor_job( fetch_battery_info_or_none, self.soco ): self.battery_info = battery_info @property def power_source(self) -> str | None: """Return the name of the current power source. Observed to be either BATTERY or SONOS_CHARGING_RING or USB_POWER. May be an empty dict if used with an S1 Move. """ return self.battery_info.get("PowerSource") @property def charging(self) -> bool | None: """Return the charging status of the speaker.""" if self.power_source: return self.power_source != "BATTERY" return None async def async_poll_battery(self, now: datetime.datetime | None = None) -> None: """Poll the device for the current battery state.""" if not self.available: return if ( self._last_battery_event and dt_util.utcnow() - self._last_battery_event < BATTERY_SCAN_INTERVAL ): return if battery_info := await self.hass.async_add_executor_job( fetch_battery_info_or_none, self.soco ): self.battery_info = battery_info self.async_write_entity_states() # # Group management # def update_groups(self, event: SonosEvent | None = None) -> None: """Handle callback for topology change event.""" coro = self.create_update_groups_coro(event) if coro: self.hass.add_job(coro) # type: ignore @callback def async_update_groups(self, event: SonosEvent | None = None) -> None: """Handle callback for topology change event.""" coro = self.create_update_groups_coro(event) if coro: self.hass.async_add_job(coro) # type: ignore def create_update_groups_coro( self, event: SonosEvent | None = None ) -> Coroutine | None: """Handle callback for topology change event.""" def _get_soco_group() -> list[str]: """Ask SoCo cache for existing topology.""" coordinator_uid = self.soco.uid slave_uids = [] with contextlib.suppress(OSError, SoCoException): if self.soco.group and self.soco.group.coordinator: coordinator_uid = self.soco.group.coordinator.uid slave_uids = [ p.uid for p in self.soco.group.members if p.uid != coordinator_uid ] return [coordinator_uid] + slave_uids async def _async_extract_group(event: SonosEvent) -> list[str]: """Extract group layout from a topology event.""" group = event and event.zone_player_uui_ds_in_group if group: assert isinstance(group, str) return group.split(",") return await self.hass.async_add_executor_job(_get_soco_group) @callback def _async_regroup(group: list[str]) -> None: """Rebuild internal group layout.""" entity_registry = ent_reg.async_get(self.hass) sonos_group = [] sonos_group_entities = [] for uid in group: speaker = self.hass.data[DATA_SONOS].discovered.get(uid) if speaker: sonos_group.append(speaker) entity_id = entity_registry.async_get_entity_id( MP_DOMAIN, DOMAIN, uid ) sonos_group_entities.append(entity_id) self.coordinator = None self.sonos_group = sonos_group self.sonos_group_entities = sonos_group_entities self.async_write_entity_states() for slave_uid in group[1:]: slave = self.hass.data[DATA_SONOS].discovered.get(slave_uid) if slave: slave.coordinator = self slave.sonos_group = sonos_group slave.sonos_group_entities = sonos_group_entities slave.async_write_entity_states() async def _async_handle_group_event(event: SonosEvent) -> None: """Get async lock and handle event.""" async with self.hass.data[DATA_SONOS].topology_condition: group = await _async_extract_group(event) if self.soco.uid == group[0]: _async_regroup(group) self.hass.data[DATA_SONOS].topology_condition.notify_all() if event and not hasattr(event, "zone_player_uui_ds_in_group"): return None return _async_handle_group_event(event) @soco_error() def join(self, slaves: list[SonosSpeaker]) -> list[SonosSpeaker]: """Form a group with other players.""" if self.coordinator: self.unjoin() group = [self] else: group = self.sonos_group.copy() for slave in slaves: if slave.soco.uid != self.soco.uid: slave.soco.join(self.soco) slave.coordinator = self if slave not in group: group.append(slave) return group @staticmethod async def join_multi( hass: HomeAssistant, master: SonosSpeaker, speakers: list[SonosSpeaker], ) -> None: """Form a group with other players.""" async with hass.data[DATA_SONOS].topology_condition: group: list[SonosSpeaker] = await hass.async_add_executor_job( master.join, speakers ) await SonosSpeaker.wait_for_groups(hass, [group]) @soco_error() def unjoin(self) -> None: """Unjoin the player from a group.""" self.soco.unjoin() self.coordinator = None @staticmethod async def unjoin_multi(hass: HomeAssistant, speakers: list[SonosSpeaker]) -> None: """Unjoin several players from their group.""" def _unjoin_all(speakers: list[SonosSpeaker]) -> None: """Sync helper.""" # Unjoin slaves first to prevent inheritance of queues coordinators = [s for s in speakers if s.is_coordinator] slaves = [s for s in speakers if not s.is_coordinator] for speaker in slaves + coordinators: speaker.unjoin() async with hass.data[DATA_SONOS].topology_condition: await hass.async_add_executor_job(_unjoin_all, speakers) await SonosSpeaker.wait_for_groups(hass, [[s] for s in speakers]) @soco_error() def snapshot(self, with_group: bool) -> None: """Snapshot the state of a player.""" self.soco_snapshot = Snapshot(self.soco) self.soco_snapshot.snapshot() if with_group: self.snapshot_group = self.sonos_group.copy() else: self.snapshot_group = None @staticmethod async def snapshot_multi( hass: HomeAssistant, speakers: list[SonosSpeaker], with_group: bool ) -> None: """Snapshot all the speakers and optionally their groups.""" def _snapshot_all(speakers: list[SonosSpeaker]) -> None: """Sync helper.""" for speaker in speakers: speaker.snapshot(with_group) # Find all affected players speakers_set = set(speakers) if with_group: for speaker in list(speakers_set): speakers_set.update(speaker.sonos_group) async with hass.data[DATA_SONOS].topology_condition: await hass.async_add_executor_job(_snapshot_all, speakers_set) @soco_error() def restore(self) -> None: """Restore a snapshotted state to a player.""" try: assert self.soco_snapshot is not None self.soco_snapshot.restore() except (TypeError, AssertionError, AttributeError, SoCoException) as ex: # Can happen if restoring a coordinator onto a current slave _LOGGER.warning("Error on restore %s: %s", self.zone_name, ex) self.soco_snapshot = None self.snapshot_group = None @staticmethod async def restore_multi( hass: HomeAssistant, speakers: list[SonosSpeaker], with_group: bool ) -> None: """Restore snapshots for all the speakers.""" def _restore_groups( speakers: list[SonosSpeaker], with_group: bool ) -> list[list[SonosSpeaker]]: """Pause all current coordinators and restore groups.""" for speaker in (s for s in speakers if s.is_coordinator): if speaker.media.playback_status == SONOS_STATE_PLAYING: speaker.soco.pause() groups = [] if with_group: # Unjoin slaves first to prevent inheritance of queues for speaker in [s for s in speakers if not s.is_coordinator]: if speaker.snapshot_group != speaker.sonos_group: speaker.unjoin() # Bring back the original group topology for speaker in (s for s in speakers if s.snapshot_group): assert speaker.snapshot_group is not None if speaker.snapshot_group[0] == speaker: speaker.join(speaker.snapshot_group) groups.append(speaker.snapshot_group.copy()) return groups def _restore_players(speakers: list[SonosSpeaker]) -> None: """Restore state of all players.""" for speaker in (s for s in speakers if not s.is_coordinator): speaker.restore() for speaker in (s for s in speakers if s.is_coordinator): speaker.restore() # Find all affected players speakers_set = {s for s in speakers if s.soco_snapshot} if with_group: for speaker in [s for s in speakers_set if s.snapshot_group]: assert speaker.snapshot_group is not None speakers_set.update(speaker.snapshot_group) async with hass.data[DATA_SONOS].topology_condition: groups = await hass.async_add_executor_job( _restore_groups, speakers_set, with_group ) await SonosSpeaker.wait_for_groups(hass, groups) await hass.async_add_executor_job(_restore_players, speakers_set) @staticmethod async def wait_for_groups( hass: HomeAssistant, groups: list[list[SonosSpeaker]] ) -> None: """Wait until all groups are present, or timeout.""" def _test_groups(groups: list[list[SonosSpeaker]]) -> bool: """Return whether all groups exist now.""" for group in groups: coordinator = group[0] # Test that coordinator is coordinating current_group = coordinator.sonos_group if coordinator != current_group[0]: return False # Test that slaves match if set(group[1:]) != set(current_group[1:]): return False return True try: with async_timeout.timeout(5): while not _test_groups(groups): await hass.data[DATA_SONOS].topology_condition.wait() except asyncio.TimeoutError: _LOGGER.warning("Timeout waiting for target groups %s", groups) for speaker in hass.data[DATA_SONOS].discovered.values(): speaker.soco._zgs_cache.clear() # pylint: disable=protected-access # # Media and playback state handlers # def update_volume(self) -> None: """Update information about current volume settings.""" self.volume = self.soco.volume self.muted = self.soco.mute self.night_mode = self.soco.night_mode self.dialog_mode = self.soco.dialog_mode def update_media(self, event: SonosEvent | None = None) -> None: """Update information about currently playing media.""" variables = event and event.variables if variables and "transport_state" in variables: # If the transport has an error then transport_state will # not be set new_status = variables["transport_state"] else: transport_info = self.soco.get_current_transport_info() new_status = transport_info["current_transport_state"] # Ignore transitions, we should get the target state soon if new_status == SONOS_STATE_TRANSITIONING: return self.media.clear() update_position = new_status != self.media.playback_status self.media.playback_status = new_status if variables and "transport_state" in variables: self.media.play_mode = variables["current_play_mode"] track_uri = ( variables["enqueued_transport_uri"] or variables["current_track_uri"] ) music_source = self.soco.music_source_from_uri(track_uri) if uri_meta_data := variables.get("enqueued_transport_uri_meta_data"): if isinstance(uri_meta_data, DidlPlaylistContainer): self.media.playlist_name = uri_meta_data.title else: self.media.play_mode = self.soco.play_mode music_source = self.soco.music_source if music_source == MUSIC_SRC_TV: self.update_media_linein(SOURCE_TV) elif music_source == MUSIC_SRC_LINE_IN: self.update_media_linein(SOURCE_LINEIN) else: track_info = self.soco.get_current_track_info() if not track_info["uri"]: self.media.clear_position() else: self.media.uri = track_info["uri"] self.media.artist = track_info.get("artist") self.media.album_name = track_info.get("album") self.media.title = track_info.get("title") if music_source == MUSIC_SRC_RADIO: self.update_media_radio(variables) else: self.update_media_music(track_info) self.update_media_position(update_position, track_info) self.write_entity_states() # Also update slaves speakers = self.hass.data[DATA_SONOS].discovered.values() for speaker in speakers: if speaker.coordinator == self: speaker.write_entity_states() def update_media_linein(self, source: str) -> None: """Update state when playing from line-in/tv.""" self.media.clear_position() self.media.title = source self.media.source_name = source def update_media_radio(self, variables: dict | None) -> None: """Update state when streaming radio.""" self.media.clear_position() try: album_art_uri = variables["current_track_meta_data"].album_art_uri self.media.image_url = self.media.library.build_album_art_full_uri( album_art_uri ) except (TypeError, KeyError, AttributeError): pass if not self.media.artist: try: self.media.artist = variables["current_track_meta_data"].creator except (TypeError, KeyError, AttributeError): pass # Radios without tagging can have part of the radio URI as title. # In this case we try to use the radio name instead. try: uri_meta_data = variables["enqueued_transport_uri_meta_data"] if isinstance(uri_meta_data, DidlAudioBroadcast) and ( self.soco.music_source_from_uri(self.media.title) == MUSIC_SRC_RADIO or ( isinstance(self.media.title, str) and isinstance(self.media.uri, str) and ( self.media.title in self.media.uri or self.media.title in urllib.parse.unquote(self.media.uri) ) ) ): self.media.title = uri_meta_data.title except (TypeError, KeyError, AttributeError): pass media_info = self.soco.get_current_media_info() self.media.channel = media_info["channel"] # Check if currently playing radio station is in favorites for fav in self.favorites: if fav.reference.get_uri() == media_info["uri"]: self.media.source_name = fav.title def update_media_music(self, track_info: dict) -> None: """Update state when playing music tracks.""" self.media.image_url = track_info.get("album_art") playlist_position = int(track_info.get("playlist_position")) # type: ignore if playlist_position > 0: self.media.queue_position = playlist_position - 1 def update_media_position( self, update_media_position: bool, track_info: dict ) -> None: """Update state when playing music tracks.""" self.media.duration = _timespan_secs(track_info.get("duration")) current_position = _timespan_secs(track_info.get("position")) if self.media.duration == 0: self.media.clear_position() return # player started reporting position? if current_position is not None and self.media.position is None: update_media_position = True # position jumped? if current_position is not None and self.media.position is not None: if self.media.playback_status == SONOS_STATE_PLAYING: assert self.media.position_updated_at is not None time_delta = dt_util.utcnow() - self.media.position_updated_at time_diff = time_delta.total_seconds() else: time_diff = 0 calculated_position = self.media.position + time_diff if abs(calculated_position - current_position) > 1.5: update_media_position = True if current_position is None: self.media.clear_position() elif update_media_position: self.media.position = current_position self.media.position_updated_at = dt_util.utcnow()