core/homeassistant/components/philips_js/media_player.py

351 lines
10 KiB
Python

"""Media Player component to integrate TVs exposing the Joint Space API."""
from typing import Any, Dict
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.media_player import (
DEVICE_CLASS_TV,
PLATFORM_SCHEMA,
BrowseMedia,
MediaPlayerEntity,
)
from homeassistant.components.media_player.const import (
MEDIA_CLASS_CHANNEL,
MEDIA_CLASS_DIRECTORY,
MEDIA_TYPE_CHANNEL,
MEDIA_TYPE_CHANNELS,
SUPPORT_BROWSE_MEDIA,
SUPPORT_NEXT_TRACK,
SUPPORT_PLAY_MEDIA,
SUPPORT_PREVIOUS_TRACK,
SUPPORT_SELECT_SOURCE,
SUPPORT_TURN_OFF,
SUPPORT_TURN_ON,
SUPPORT_VOLUME_MUTE,
SUPPORT_VOLUME_SET,
SUPPORT_VOLUME_STEP,
)
from homeassistant.components.media_player.errors import BrowseError
from homeassistant.components.philips_js import PhilipsTVDataUpdateCoordinator
from homeassistant.const import (
CONF_API_VERSION,
CONF_HOST,
CONF_NAME,
STATE_OFF,
STATE_ON,
)
from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import LOGGER as _LOGGER
from .const import CONF_SYSTEM, DOMAIN
SUPPORT_PHILIPS_JS = (
SUPPORT_TURN_OFF
| SUPPORT_VOLUME_STEP
| SUPPORT_VOLUME_SET
| SUPPORT_VOLUME_MUTE
| SUPPORT_SELECT_SOURCE
| SUPPORT_NEXT_TRACK
| SUPPORT_PREVIOUS_TRACK
| SUPPORT_PLAY_MEDIA
| SUPPORT_BROWSE_MEDIA
)
CONF_ON_ACTION = "turn_on_action"
DEFAULT_API_VERSION = "1"
PREFIX_SEPARATOR = ": "
PREFIX_SOURCE = "Input"
PREFIX_CHANNEL = "Channel"
PLATFORM_SCHEMA = vol.All(
cv.deprecated(CONF_HOST),
cv.deprecated(CONF_NAME),
cv.deprecated(CONF_API_VERSION),
cv.deprecated(CONF_ON_ACTION),
PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_HOST): cv.string,
vol.Remove(CONF_NAME): cv.string,
vol.Optional(CONF_API_VERSION, default=DEFAULT_API_VERSION): cv.string,
vol.Remove(CONF_ON_ACTION): cv.SCRIPT_SCHEMA,
}
),
)
def _inverted(data):
return {v: k for k, v in data.items()}
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Philips TV platform."""
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data=config,
)
)
async def async_setup_entry(
hass: HomeAssistantType,
config_entry: config_entries.ConfigEntry,
async_add_entities,
):
"""Set up the configuration entry."""
coordinator = hass.data[DOMAIN][config_entry.entry_id]
async_add_entities(
[
PhilipsTVMediaPlayer(
coordinator,
config_entry.data[CONF_SYSTEM],
config_entry.unique_id or config_entry.entry_id,
)
]
)
class PhilipsTVMediaPlayer(CoordinatorEntity, MediaPlayerEntity):
"""Representation of a Philips TV exposing the JointSpace API."""
def __init__(
self,
coordinator: PhilipsTVDataUpdateCoordinator,
system: Dict[str, Any],
unique_id: str,
):
"""Initialize the Philips TV."""
self._tv = coordinator.api
self._coordinator = coordinator
self._sources = {}
self._channels = {}
self._supports = SUPPORT_PHILIPS_JS
self._system = system
self._unique_id = unique_id
super().__init__(coordinator)
self._update_from_coordinator()
def _update_soon(self):
"""Reschedule update task."""
self.hass.add_job(self.coordinator.async_request_refresh)
@property
def name(self):
"""Return the device name."""
return self._system["name"]
@property
def supported_features(self):
"""Flag media player features that are supported."""
supports = self._supports
if self._coordinator.turn_on:
supports |= SUPPORT_TURN_ON
return supports
@property
def state(self):
"""Get the device state. An exception means OFF state."""
if self._tv.on:
return STATE_ON
return STATE_OFF
@property
def source(self):
"""Return the current input source."""
return self._sources.get(self._tv.source_id)
@property
def source_list(self):
"""List of available input sources."""
return list(self._sources.values())
def select_source(self, source):
"""Set the input source."""
data = source.split(PREFIX_SEPARATOR, 1)
if data[0] == PREFIX_SOURCE: # Legacy way to set source
source_id = _inverted(self._sources).get(data[1])
if source_id:
self._tv.setSource(source_id)
elif data[0] == PREFIX_CHANNEL: # Legacy way to set channel
channel_id = _inverted(self._channels).get(data[1])
if channel_id:
self._tv.setChannel(channel_id)
else:
source_id = _inverted(self._sources).get(source)
if source_id:
self._tv.setSource(source_id)
self._update_soon()
@property
def volume_level(self):
"""Volume level of the media player (0..1)."""
return self._tv.volume
@property
def is_volume_muted(self):
"""Boolean if volume is currently muted."""
return self._tv.muted
async def async_turn_on(self):
"""Turn on the device."""
await self._coordinator.turn_on.async_run(self.hass, self._context)
def turn_off(self):
"""Turn off the device."""
self._tv.sendKey("Standby")
self._tv.on = False
self._update_soon()
def volume_up(self):
"""Send volume up command."""
self._tv.sendKey("VolumeUp")
self._update_soon()
def volume_down(self):
"""Send volume down command."""
self._tv.sendKey("VolumeDown")
self._update_soon()
def mute_volume(self, mute):
"""Send mute command."""
self._tv.setVolume(None, mute)
self._update_soon()
def set_volume_level(self, volume):
"""Set volume level, range 0..1."""
self._tv.setVolume(volume, self._tv.muted)
self._update_soon()
def media_previous_track(self):
"""Send rewind command."""
self._tv.sendKey("Previous")
self._update_soon()
def media_next_track(self):
"""Send fast forward command."""
self._tv.sendKey("Next")
self._update_soon()
@property
def media_channel(self):
"""Get current channel if it's a channel."""
if self.media_content_type == MEDIA_TYPE_CHANNEL:
return self._channels.get(self._tv.channel_id)
return None
@property
def media_title(self):
"""Title of current playing media."""
if self.media_content_type == MEDIA_TYPE_CHANNEL:
return self._channels.get(self._tv.channel_id)
return self._sources.get(self._tv.source_id)
@property
def media_content_type(self):
"""Return content type of playing media."""
if self._tv.source_id == "tv" or self._tv.source_id == "11":
return MEDIA_TYPE_CHANNEL
if self._tv.source_id is None and self._tv.channels:
return MEDIA_TYPE_CHANNEL
return None
@property
def media_content_id(self):
"""Content type of current playing media."""
if self.media_content_type == MEDIA_TYPE_CHANNEL:
return self._channels.get(self._tv.channel_id)
return None
@property
def device_state_attributes(self):
"""Return the state attributes."""
return {"channel_list": list(self._channels.values())}
@property
def device_class(self):
"""Return the device class."""
return DEVICE_CLASS_TV
@property
def unique_id(self):
"""Return unique identifier if known."""
return self._unique_id
@property
def device_info(self):
"""Return a device description for device registry."""
return {
"name": self._system["name"],
"identifiers": {
(DOMAIN, self._unique_id),
},
"model": self._system.get("model"),
"manufacturer": "Philips",
"sw_version": self._system.get("softwareversion"),
}
def play_media(self, media_type, media_id, **kwargs):
"""Play a piece of media."""
_LOGGER.debug("Call play media type <%s>, Id <%s>", media_type, media_id)
if media_type == MEDIA_TYPE_CHANNEL:
channel_id = _inverted(self._channels).get(media_id)
if channel_id:
self._tv.setChannel(channel_id)
self._update_soon()
else:
_LOGGER.error("Unable to find channel <%s>", media_id)
else:
_LOGGER.error("Unsupported media type <%s>", media_type)
async def async_browse_media(self, media_content_type=None, media_content_id=None):
"""Implement the websocket media browsing helper."""
if media_content_id not in (None, ""):
raise BrowseError(
f"Media not found: {media_content_type} / {media_content_id}"
)
return BrowseMedia(
title="Channels",
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id="",
media_content_type=MEDIA_TYPE_CHANNELS,
can_play=False,
can_expand=True,
children=[
BrowseMedia(
title=channel,
media_class=MEDIA_CLASS_CHANNEL,
media_content_id=channel,
media_content_type=MEDIA_TYPE_CHANNEL,
can_play=True,
can_expand=False,
)
for channel in self._channels.values()
],
)
def _update_from_coordinator(self):
self._sources = {
srcid: source.get("name") or f"Source {srcid}"
for srcid, source in (self._tv.sources or {}).items()
}
self._channels = {
chid: channel.get("name") or f"Channel {chid}"
for chid, channel in (self._tv.channels or {}).items()
}
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self._update_from_coordinator()
super()._handle_coordinator_update()