core/homeassistant/components/directv/media_player.py

484 lines
14 KiB
Python

"""Support for the DirecTV receivers."""
import logging
from typing import Callable, Dict, List, Optional
from DirectPy import DIRECTV
from requests.exceptions import RequestException
import voluptuous as vol
from homeassistant.components.media_player import PLATFORM_SCHEMA, MediaPlayerDevice
from homeassistant.components.media_player.const import (
MEDIA_TYPE_CHANNEL,
MEDIA_TYPE_MOVIE,
MEDIA_TYPE_TVSHOW,
SUPPORT_NEXT_TRACK,
SUPPORT_PAUSE,
SUPPORT_PLAY,
SUPPORT_PLAY_MEDIA,
SUPPORT_PREVIOUS_TRACK,
SUPPORT_STOP,
SUPPORT_TURN_OFF,
SUPPORT_TURN_ON,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_DEVICE,
CONF_HOST,
CONF_NAME,
CONF_PORT,
STATE_OFF,
STATE_PAUSED,
STATE_PLAYING,
)
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.util import dt as dt_util
from .const import (
ATTR_MEDIA_CURRENTLY_RECORDING,
ATTR_MEDIA_RATING,
ATTR_MEDIA_RECORDED,
ATTR_MEDIA_START_TIME,
DATA_CLIENT,
DATA_LOCATIONS,
DATA_VERSION_INFO,
DEFAULT_DEVICE,
DEFAULT_MANUFACTURER,
DEFAULT_NAME,
DEFAULT_PORT,
DOMAIN,
MODEL_CLIENT,
MODEL_HOST,
)
_LOGGER = logging.getLogger(__name__)
SUPPORT_DTV = (
SUPPORT_PAUSE
| SUPPORT_TURN_ON
| SUPPORT_TURN_OFF
| SUPPORT_PLAY_MEDIA
| SUPPORT_STOP
| SUPPORT_NEXT_TRACK
| SUPPORT_PREVIOUS_TRACK
| SUPPORT_PLAY
)
SUPPORT_DTV_CLIENT = (
SUPPORT_PAUSE
| SUPPORT_PLAY_MEDIA
| SUPPORT_STOP
| SUPPORT_NEXT_TRACK
| SUPPORT_PREVIOUS_TRACK
| SUPPORT_PLAY
)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Optional(CONF_DEVICE, default=DEFAULT_DEVICE): cv.string,
}
)
def get_dtv_instance(
host: str, port: int = DEFAULT_PORT, client_addr: str = "0"
) -> DIRECTV:
"""Retrieve a DIRECTV instance for the receiver or client device."""
try:
return DIRECTV(host, port, client_addr)
except RequestException as exception:
_LOGGER.debug(
"Request exception %s trying to retrieve DIRECTV instance for client address %s on device %s",
exception,
client_addr,
host,
)
return None
async def async_setup_entry(
hass: HomeAssistantType,
entry: ConfigEntry,
async_add_entities: Callable[[List, bool], None],
) -> bool:
"""Set up the DirecTV config entry."""
locations = hass.data[DOMAIN][entry.entry_id][DATA_LOCATIONS]
version_info = hass.data[DOMAIN][entry.entry_id][DATA_VERSION_INFO]
entities = []
for loc in locations["locations"]:
if "locationName" not in loc or "clientAddr" not in loc:
continue
if loc["clientAddr"] != "0":
# directpy does IO in constructor.
dtv = await hass.async_add_executor_job(
get_dtv_instance, entry.data[CONF_HOST], DEFAULT_PORT, loc["clientAddr"]
)
else:
dtv = hass.data[DOMAIN][entry.entry_id][DATA_CLIENT]
if not dtv:
continue
entities.append(
DirecTvDevice(
str.title(loc["locationName"]), loc["clientAddr"], dtv, version_info,
)
)
async_add_entities(entities, True)
class DirecTvDevice(MediaPlayerDevice):
"""Representation of a DirecTV receiver on the network."""
def __init__(
self,
name: str,
device: str,
dtv: DIRECTV,
version_info: Optional[Dict] = None,
enabled_default: bool = True,
):
"""Initialize the device."""
self.dtv = dtv
self._name = name
self._unique_id = None
self._is_standby = True
self._current = None
self._last_update = None
self._paused = None
self._last_position = None
self._is_recorded = None
self._is_client = device != "0"
self._assumed_state = None
self._available = False
self._enabled_default = enabled_default
self._first_error_timestamp = None
self._model = None
self._receiver_id = None
self._software_version = None
if self._is_client:
self._model = MODEL_CLIENT
self._unique_id = device
if version_info:
self._receiver_id = "".join(version_info["receiverId"].split())
if not self._is_client:
self._unique_id = self._receiver_id
self._model = MODEL_HOST
self._software_version = version_info["stbSoftwareVersion"]
if self._is_client:
_LOGGER.debug(
"Created DirecTV media player for client %s on device %s",
self._name,
device,
)
else:
_LOGGER.debug("Created DirecTV media player for device %s", self._name)
def update(self):
"""Retrieve latest state."""
_LOGGER.debug("%s: Updating status", self.entity_id)
try:
self._available = True
self._is_standby = self.dtv.get_standby()
if self._is_standby:
self._current = None
self._is_recorded = None
self._paused = None
self._assumed_state = False
self._last_position = None
self._last_update = None
else:
self._current = self.dtv.get_tuned()
if self._current["status"]["code"] == 200:
self._first_error_timestamp = None
self._is_recorded = self._current.get("uniqueId") is not None
self._paused = self._last_position == self._current["offset"]
self._assumed_state = self._is_recorded
self._last_position = self._current["offset"]
self._last_update = (
dt_util.utcnow()
if not self._paused or self._last_update is None
else self._last_update
)
else:
# If an error is received then only set to unavailable if
# this started at least 1 minute ago.
log_message = f"{self.entity_id}: Invalid status {self._current['status']['code']} received"
if self._check_state_available():
_LOGGER.debug(log_message)
else:
_LOGGER.error(log_message)
except RequestException as exception:
_LOGGER.error(
"%s: Request error trying to update current status: %s",
self.entity_id,
exception,
)
self._check_state_available()
except Exception as exception:
_LOGGER.error(
"%s: Exception trying to update current status: %s",
self.entity_id,
exception,
)
self._available = False
if not self._first_error_timestamp:
self._first_error_timestamp = dt_util.utcnow()
raise
def _check_state_available(self):
"""Set to unavailable if issue been occurring over 1 minute."""
if not self._first_error_timestamp:
self._first_error_timestamp = dt_util.utcnow()
else:
tdelta = dt_util.utcnow() - self._first_error_timestamp
if tdelta.total_seconds() >= 60:
self._available = False
return self._available
@property
def device_state_attributes(self):
"""Return device specific state attributes."""
attributes = {}
if not self._is_standby:
attributes[ATTR_MEDIA_CURRENTLY_RECORDING] = self.media_currently_recording
attributes[ATTR_MEDIA_RATING] = self.media_rating
attributes[ATTR_MEDIA_RECORDED] = self.media_recorded
attributes[ATTR_MEDIA_START_TIME] = self.media_start_time
return attributes
@property
def name(self):
"""Return the name of the device."""
return self._name
@property
def unique_id(self):
"""Return a unique ID to use for this media player."""
return self._unique_id
@property
def device_info(self):
"""Return device specific attributes."""
return {
"name": self.name,
"identifiers": {(DOMAIN, self.unique_id)},
"manufacturer": DEFAULT_MANUFACTURER,
"model": self._model,
"sw_version": self._software_version,
"via_device": (DOMAIN, self._receiver_id),
}
@property
def entity_registry_enabled_default(self) -> bool:
"""Return if the entity should be enabled when first added to the entity registry."""
return self._enabled_default
# MediaPlayerDevice properties and methods
@property
def state(self):
"""Return the state of the device."""
if self._is_standby:
return STATE_OFF
# For recorded media we can determine if it is paused or not.
# For live media we're unable to determine and will always return
# playing instead.
if self._paused:
return STATE_PAUSED
return STATE_PLAYING
@property
def available(self):
"""Return if able to retrieve information from DVR or not."""
return self._available
@property
def assumed_state(self):
"""Return if we assume the state or not."""
return self._assumed_state
@property
def media_content_id(self):
"""Return the content ID of current playing media."""
if self._is_standby:
return None
return self._current["programId"]
@property
def media_content_type(self):
"""Return the content type of current playing media."""
if self._is_standby:
return None
if "episodeTitle" in self._current:
return MEDIA_TYPE_TVSHOW
return MEDIA_TYPE_MOVIE
@property
def media_duration(self):
"""Return the duration of current playing media in seconds."""
if self._is_standby:
return None
return self._current["duration"]
@property
def media_position(self):
"""Position of current playing media in seconds."""
if self._is_standby:
return None
return self._last_position
@property
def media_position_updated_at(self):
"""When was the position of the current playing media valid.
Returns value from homeassistant.util.dt.utcnow().
"""
if self._is_standby:
return None
return self._last_update
@property
def media_title(self):
"""Return the title of current playing media."""
if self._is_standby:
return None
return self._current["title"]
@property
def media_series_title(self):
"""Return the title of current episode of TV show."""
if self._is_standby:
return None
return self._current.get("episodeTitle")
@property
def media_channel(self):
"""Return the channel current playing media."""
if self._is_standby:
return None
return f"{self._current['callsign']} ({self._current['major']})"
@property
def source(self):
"""Name of the current input source."""
if self._is_standby:
return None
return self._current["major"]
@property
def supported_features(self):
"""Flag media player features that are supported."""
return SUPPORT_DTV_CLIENT if self._is_client else SUPPORT_DTV
@property
def media_currently_recording(self):
"""If the media is currently being recorded or not."""
if self._is_standby:
return None
return self._current["isRecording"]
@property
def media_rating(self):
"""TV Rating of the current playing media."""
if self._is_standby:
return None
return self._current["rating"]
@property
def media_recorded(self):
"""If the media was recorded or live."""
if self._is_standby:
return None
return self._is_recorded
@property
def media_start_time(self):
"""Start time the program aired."""
if self._is_standby:
return None
return dt_util.as_local(dt_util.utc_from_timestamp(self._current["startTime"]))
def turn_on(self):
"""Turn on the receiver."""
if self._is_client:
raise NotImplementedError()
_LOGGER.debug("Turn on %s", self._name)
self.dtv.key_press("poweron")
def turn_off(self):
"""Turn off the receiver."""
if self._is_client:
raise NotImplementedError()
_LOGGER.debug("Turn off %s", self._name)
self.dtv.key_press("poweroff")
def media_play(self):
"""Send play command."""
_LOGGER.debug("Play on %s", self._name)
self.dtv.key_press("play")
def media_pause(self):
"""Send pause command."""
_LOGGER.debug("Pause on %s", self._name)
self.dtv.key_press("pause")
def media_stop(self):
"""Send stop command."""
_LOGGER.debug("Stop on %s", self._name)
self.dtv.key_press("stop")
def media_previous_track(self):
"""Send rewind command."""
_LOGGER.debug("Rewind on %s", self._name)
self.dtv.key_press("rew")
def media_next_track(self):
"""Send fast forward command."""
_LOGGER.debug("Fast forward on %s", self._name)
self.dtv.key_press("ffwd")
def play_media(self, media_type, media_id, **kwargs):
"""Select input source."""
if media_type != MEDIA_TYPE_CHANNEL:
_LOGGER.error(
"Invalid media type %s. Only %s is supported",
media_type,
MEDIA_TYPE_CHANNEL,
)
return
_LOGGER.debug("Changing channel on %s to %s", self._name, media_id)
self.dtv.tune_channel(media_id)