core/homeassistant/components/teslemetry/media_player.py

310 lines
11 KiB
Python

"""Media player platform for Teslemetry integration."""
from __future__ import annotations
from tesla_fleet_api.const import Scope
from tesla_fleet_api.teslemetry import Vehicle
from homeassistant.components.media_player import (
MediaPlayerDeviceClass,
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.restore_state import RestoreEntity
from . import TeslemetryConfigEntry
from .entity import (
TeslemetryRootEntity,
TeslemetryVehiclePollingEntity,
TeslemetryVehicleStreamEntity,
)
from .helpers import handle_vehicle_command
from .models import TeslemetryVehicleData
STATES = {
"Playing": MediaPlayerState.PLAYING,
"Paused": MediaPlayerState.PAUSED,
"Stopped": MediaPlayerState.IDLE,
"Off": MediaPlayerState.OFF,
}
DISPLAY_STATES = {
"On": MediaPlayerState.IDLE,
"Accessory": MediaPlayerState.IDLE,
"Charging": MediaPlayerState.OFF,
"Sentry": MediaPlayerState.OFF,
"Off": MediaPlayerState.OFF,
}
# Tesla uses 31 steps, in 0.333 increments up to 10.333
VOLUME_STEP = 1 / 31
VOLUME_FACTOR = 31 / 3 # 10.333
PARALLEL_UPDATES = 0
async def async_setup_entry(
hass: HomeAssistant,
entry: TeslemetryConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the Teslemetry Media platform from a config entry."""
async_add_entities(
TeslemetryVehiclePollingMediaEntity(vehicle, entry.runtime_data.scopes)
if vehicle.api.pre2021 or vehicle.firmware < "2025.2.6"
else TeslemetryStreamingMediaEntity(vehicle, entry.runtime_data.scopes)
for vehicle in entry.runtime_data.vehicles
)
class TeslemetryMediaEntity(TeslemetryRootEntity, MediaPlayerEntity):
"""Base vehicle media player class."""
api: Vehicle
_attr_device_class = MediaPlayerDeviceClass.SPEAKER
_attr_volume_step = VOLUME_STEP
async def async_set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1."""
self.raise_for_scope(Scope.VEHICLE_CMDS)
await handle_vehicle_command(self.api.adjust_volume(volume * VOLUME_FACTOR))
self._attr_volume_level = volume
self.async_write_ha_state()
async def async_media_play(self) -> None:
"""Send play command."""
if self.state != MediaPlayerState.PLAYING:
self.raise_for_scope(Scope.VEHICLE_CMDS)
await handle_vehicle_command(self.api.media_toggle_playback())
self._attr_state = MediaPlayerState.PLAYING
self.async_write_ha_state()
async def async_media_pause(self) -> None:
"""Send pause command."""
if self.state == MediaPlayerState.PLAYING:
self.raise_for_scope(Scope.VEHICLE_CMDS)
await handle_vehicle_command(self.api.media_toggle_playback())
self._attr_state = MediaPlayerState.PAUSED
self.async_write_ha_state()
async def async_media_next_track(self) -> None:
"""Send next track command."""
self.raise_for_scope(Scope.VEHICLE_CMDS)
await handle_vehicle_command(self.api.media_next_track())
async def async_media_previous_track(self) -> None:
"""Send previous track command."""
self.raise_for_scope(Scope.VEHICLE_CMDS)
await handle_vehicle_command(self.api.media_prev_track())
class TeslemetryVehiclePollingMediaEntity(
TeslemetryVehiclePollingEntity, TeslemetryMediaEntity
):
"""Polling vehicle media player class."""
def __init__(
self,
data: TeslemetryVehicleData,
scopes: list[Scope],
) -> None:
"""Initialize the media player entity."""
super().__init__(data, "media")
self._attr_supported_features = (
MediaPlayerEntityFeature.NEXT_TRACK
| MediaPlayerEntityFeature.PAUSE
| MediaPlayerEntityFeature.PLAY
| MediaPlayerEntityFeature.PREVIOUS_TRACK
| MediaPlayerEntityFeature.VOLUME_SET
)
self.scoped = Scope.VEHICLE_CMDS in scopes
if not self.scoped:
self._attr_supported_features = MediaPlayerEntityFeature(0)
def _async_update_attrs(self) -> None:
"""Update entity attributes."""
state = self.get("vehicle_state_media_info_media_playback_status")
self._attr_state = STATES.get(state) if state else None
self._attr_volume_level = (
self.get("vehicle_state_media_info_audio_volume") or 0
) / VOLUME_FACTOR
duration = self.get("vehicle_state_media_info_now_playing_duration")
self._attr_media_duration = duration / 1000 if duration is not None else None
# Return media position only when a media duration is > 0.
elapsed = self.get("vehicle_state_media_info_now_playing_elapsed")
self._attr_media_position = (
elapsed / 1000 if duration and elapsed is not None else None
)
self._attr_media_title = self.get("vehicle_state_media_info_now_playing_title")
self._attr_media_artist = self.get(
"vehicle_state_media_info_now_playing_artist"
)
self._attr_media_album_name = self.get(
"vehicle_state_media_info_now_playing_album"
)
self._attr_media_playlist = self.get(
"vehicle_state_media_info_now_playing_station"
)
self._attr_source = self.get("vehicle_state_media_info_now_playing_source")
class TeslemetryStreamingMediaEntity(
TeslemetryVehicleStreamEntity, TeslemetryMediaEntity, RestoreEntity
):
"""Streaming vehicle media player class."""
def __init__(
self,
data: TeslemetryVehicleData,
scopes: list[Scope],
) -> None:
"""Initialize the media player entity."""
super().__init__(data, "media")
self._attr_supported_features = (
MediaPlayerEntityFeature.NEXT_TRACK
| MediaPlayerEntityFeature.PAUSE
| MediaPlayerEntityFeature.PLAY
| MediaPlayerEntityFeature.PREVIOUS_TRACK
| MediaPlayerEntityFeature.VOLUME_SET
)
self.scoped = Scope.VEHICLE_CMDS in scopes
if not self.scoped:
self._attr_supported_features = MediaPlayerEntityFeature(0)
async def async_added_to_hass(self) -> None:
"""Call when entity is added to hass."""
await super().async_added_to_hass()
if (state := await self.async_get_last_state()) is not None:
try:
self._attr_state = MediaPlayerState(state.state)
except ValueError:
self._attr_state = None
self._attr_volume_level = state.attributes.get("volume_level")
self._attr_media_title = state.attributes.get("media_title")
self._attr_media_artist = state.attributes.get("media_artist")
self._attr_media_album_name = state.attributes.get("media_album_name")
self._attr_media_playlist = state.attributes.get("media_playlist")
self._attr_media_duration = state.attributes.get("media_duration")
self._attr_media_position = state.attributes.get("media_position")
self._attr_source = state.attributes.get("source")
self.async_write_ha_state()
self.async_on_remove(
self.vehicle.stream_vehicle.listen_CenterDisplay(
self._async_handle_center_display
)
)
self.async_on_remove(
self.vehicle.stream_vehicle.listen_MediaPlaybackStatus(
self._async_handle_media_playback_status
)
)
self.async_on_remove(
self.vehicle.stream_vehicle.listen_MediaPlaybackSource(
self._async_handle_media_playback_source
)
)
self.async_on_remove(
self.vehicle.stream_vehicle.listen_MediaAudioVolume(
self._async_handle_media_audio_volume
)
)
self.async_on_remove(
self.vehicle.stream_vehicle.listen_MediaNowPlayingDuration(
self._async_handle_media_now_playing_duration
)
)
self.async_on_remove(
self.vehicle.stream_vehicle.listen_MediaNowPlayingElapsed(
self._async_handle_media_now_playing_elapsed
)
)
self.async_on_remove(
self.vehicle.stream_vehicle.listen_MediaNowPlayingArtist(
self._async_handle_media_now_playing_artist
)
)
self.async_on_remove(
self.vehicle.stream_vehicle.listen_MediaNowPlayingAlbum(
self._async_handle_media_now_playing_album
)
)
self.async_on_remove(
self.vehicle.stream_vehicle.listen_MediaNowPlayingTitle(
self._async_handle_media_now_playing_title
)
)
self.async_on_remove(
self.vehicle.stream_vehicle.listen_MediaNowPlayingStation(
self._async_handle_media_now_playing_station
)
)
def _async_handle_center_display(self, value: str | None) -> None:
"""Update entity attributes."""
if value is not None:
self._attr_state = DISPLAY_STATES.get(value)
self.async_write_ha_state()
def _async_handle_media_playback_status(self, value: str | None) -> None:
"""Update entity attributes."""
self._attr_state = MediaPlayerState.OFF if value is None else STATES.get(value)
self.async_write_ha_state()
def _async_handle_media_playback_source(self, value: str | None) -> None:
"""Update entity attributes."""
self._attr_source = value
self.async_write_ha_state()
def _async_handle_media_audio_volume(self, value: float | None) -> None:
"""Update entity attributes."""
self._attr_volume_level = None if value is None else value / VOLUME_FACTOR
self.async_write_ha_state()
def _async_handle_media_now_playing_duration(self, value: int | None) -> None:
"""Update entity attributes."""
self._attr_media_duration = None if value is None else int(value / 1000)
self.async_write_ha_state()
def _async_handle_media_now_playing_elapsed(self, value: int | None) -> None:
"""Update entity attributes."""
self._attr_media_position = None if value is None else int(value / 1000)
self.async_write_ha_state()
def _async_handle_media_now_playing_artist(self, value: str | None) -> None:
"""Update entity attributes."""
self._attr_media_artist = value # Check if this is album artist or not
self.async_write_ha_state()
def _async_handle_media_now_playing_album(self, value: str | None) -> None:
"""Update entity attributes."""
self._attr_media_album_name = value
self.async_write_ha_state()
def _async_handle_media_now_playing_title(self, value: str | None) -> None:
"""Update entity attributes."""
self._attr_media_title = value
self.async_write_ha_state()
def _async_handle_media_now_playing_station(self, value: str | None) -> None:
"""Update entity attributes."""
self._attr_media_channel = (
value # could also be _attr_media_playlist when Spotify
)
self.async_write_ha_state()