core/homeassistant/components/directv/media_player.py

409 lines
13 KiB
Python

"""Support for the DirecTV receivers."""
import logging
import requests
import voluptuous as vol
from homeassistant.components.media_player import (
MediaPlayerDevice, PLATFORM_SCHEMA)
from homeassistant.components.media_player.const import (
MEDIA_TYPE_CHANNEL, MEDIA_TYPE_MOVIE, MEDIA_TYPE_TVSHOW,
SUPPORT_NEXT_TRACK, SUPPORT_PAUSE, SUPPORT_PLAY, SUPPORT_PLAY_MEDIA,
SUPPORT_PREVIOUS_TRACK, SUPPORT_STOP, SUPPORT_TURN_OFF, SUPPORT_TURN_ON)
from homeassistant.const import (
CONF_DEVICE, CONF_HOST, CONF_NAME, CONF_PORT, STATE_OFF, STATE_PAUSED,
STATE_PLAYING)
import homeassistant.helpers.config_validation as cv
import homeassistant.util.dt as dt_util
_LOGGER = logging.getLogger(__name__)
ATTR_MEDIA_CURRENTLY_RECORDING = 'media_currently_recording'
ATTR_MEDIA_RATING = 'media_rating'
ATTR_MEDIA_RECORDED = 'media_recorded'
ATTR_MEDIA_START_TIME = 'media_start_time'
DEFAULT_DEVICE = '0'
DEFAULT_NAME = "DirecTV Receiver"
DEFAULT_PORT = 8080
SUPPORT_DTV = SUPPORT_PAUSE | SUPPORT_TURN_ON | SUPPORT_TURN_OFF | \
SUPPORT_PLAY_MEDIA | SUPPORT_STOP | SUPPORT_NEXT_TRACK | \
SUPPORT_PREVIOUS_TRACK | SUPPORT_PLAY
SUPPORT_DTV_CLIENT = SUPPORT_PAUSE | \
SUPPORT_PLAY_MEDIA | SUPPORT_STOP | SUPPORT_NEXT_TRACK | \
SUPPORT_PREVIOUS_TRACK | SUPPORT_PLAY
DATA_DIRECTV = 'data_directv'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Optional(CONF_DEVICE, default=DEFAULT_DEVICE): cv.string,
})
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the DirecTV platform."""
known_devices = hass.data.get(DATA_DIRECTV, set())
hosts = []
if CONF_HOST in config:
_LOGGER.debug("Adding configured device %s with client address %s ",
config.get(CONF_NAME), config.get(CONF_DEVICE))
hosts.append([
config.get(CONF_NAME), config.get(CONF_HOST),
config.get(CONF_PORT), config.get(CONF_DEVICE)
])
elif discovery_info:
host = discovery_info.get('host')
name = 'DirecTV_{}'.format(discovery_info.get('serial', ''))
# Attempt to discover additional RVU units
_LOGGER.debug("Doing discovery of DirecTV devices on %s", host)
from DirectPy import DIRECTV
dtv = DIRECTV(host, DEFAULT_PORT)
try:
resp = dtv.get_locations()
except requests.exceptions.RequestException as ex:
# Bail out and just go forward with uPnP data
# Make sure that this device is not already configured
# Comparing based on host (IP) and clientAddr.
_LOGGER.debug("Request exception %s trying to get locations", ex)
resp = {
'locations': [{
'locationName': name,
'clientAddr': DEFAULT_DEVICE
}]
}
_LOGGER.debug("Known devices: %s", known_devices)
for loc in resp.get("locations") or []:
if "locationName" not in loc or "clientAddr" not in loc:
continue
# Make sure that this device is not already configured
# Comparing based on host (IP) and clientAddr.
if (host, loc["clientAddr"]) in known_devices:
_LOGGER.debug("Discovered device %s on host %s with "
"client address %s is already "
"configured",
str.title(loc["locationName"]),
host, loc["clientAddr"])
else:
_LOGGER.debug("Adding discovered device %s with"
" client address %s",
str.title(loc["locationName"]),
loc["clientAddr"])
hosts.append([str.title(loc["locationName"]), host,
DEFAULT_PORT, loc["clientAddr"]])
dtvs = []
for host in hosts:
dtvs.append(DirecTvDevice(*host))
hass.data.setdefault(DATA_DIRECTV, set()).add((host[1], host[3]))
add_entities(dtvs)
class DirecTvDevice(MediaPlayerDevice):
"""Representation of a DirecTV receiver on the network."""
def __init__(self, name, host, port, device):
"""Initialize the device."""
from DirectPy import DIRECTV
self.dtv = DIRECTV(host, port, device)
self._name = name
self._is_standby = True
self._current = None
self._last_update = None
self._paused = None
self._last_position = None
self._is_recorded = None
self._is_client = device != '0'
self._assumed_state = None
self._available = False
self._first_error_timestamp = None
if self._is_client:
_LOGGER.debug("Created DirecTV client %s for device %s",
self._name, device)
else:
_LOGGER.debug("Created DirecTV device for %s", self._name)
def update(self):
"""Retrieve latest state."""
_LOGGER.debug("%s: Updating status", self.entity_id)
try:
self._available = True
self._is_standby = self.dtv.get_standby()
if self._is_standby:
self._current = None
self._is_recorded = None
self._paused = None
self._assumed_state = False
self._last_position = None
self._last_update = None
else:
self._current = self.dtv.get_tuned()
if self._current['status']['code'] == 200:
self._first_error_timestamp = None
self._is_recorded = self._current.get('uniqueId')\
is not None
self._paused = self._last_position == \
self._current['offset']
self._assumed_state = self._is_recorded
self._last_position = self._current['offset']
self._last_update = dt_util.utcnow() if not self._paused \
or self._last_update is None else self._last_update
else:
# If an error is received then only set to unavailable if
# this started at least 1 minute ago.
log_message = "{}: Invalid status {} received".format(
self.entity_id,
self._current['status']['code']
)
if self._check_state_available():
_LOGGER.debug(log_message)
else:
_LOGGER.error(log_message)
except requests.RequestException as ex:
_LOGGER.error("%s: Request error trying to update current status: "
"%s", self.entity_id, ex)
self._check_state_available()
except Exception as ex:
_LOGGER.error("%s: Exception trying to update current status: %s",
self.entity_id, ex)
self._available = False
if not self._first_error_timestamp:
self._first_error_timestamp = dt_util.utcnow()
raise
def _check_state_available(self):
"""Set to unavailable if issue been occurring over 1 minute."""
if not self._first_error_timestamp:
self._first_error_timestamp = dt_util.utcnow()
else:
tdelta = dt_util.utcnow() - self._first_error_timestamp
if tdelta.total_seconds() >= 60:
self._available = False
return self._available
@property
def device_state_attributes(self):
"""Return device specific state attributes."""
attributes = {}
if not self._is_standby:
attributes[ATTR_MEDIA_CURRENTLY_RECORDING] =\
self.media_currently_recording
attributes[ATTR_MEDIA_RATING] = self.media_rating
attributes[ATTR_MEDIA_RECORDED] = self.media_recorded
attributes[ATTR_MEDIA_START_TIME] = self.media_start_time
return attributes
@property
def name(self):
"""Return the name of the device."""
return self._name
# MediaPlayerDevice properties and methods
@property
def state(self):
"""Return the state of the device."""
if self._is_standby:
return STATE_OFF
# For recorded media we can determine if it is paused or not.
# For live media we're unable to determine and will always return
# playing instead.
if self._paused:
return STATE_PAUSED
return STATE_PLAYING
@property
def available(self):
"""Return if able to retrieve information from DVR or not."""
return self._available
@property
def assumed_state(self):
"""Return if we assume the state or not."""
return self._assumed_state
@property
def media_content_id(self):
"""Return the content ID of current playing media."""
if self._is_standby:
return None
return self._current['programId']
@property
def media_content_type(self):
"""Return the content type of current playing media."""
if self._is_standby:
return None
if 'episodeTitle' in self._current:
return MEDIA_TYPE_TVSHOW
return MEDIA_TYPE_MOVIE
@property
def media_duration(self):
"""Return the duration of current playing media in seconds."""
if self._is_standby:
return None
return self._current['duration']
@property
def media_position(self):
"""Position of current playing media in seconds."""
if self._is_standby:
return None
return self._last_position
@property
def media_position_updated_at(self):
"""When was the position of the current playing media valid.
Returns value from homeassistant.util.dt.utcnow().
"""
if self._is_standby:
return None
return self._last_update
@property
def media_title(self):
"""Return the title of current playing media."""
if self._is_standby:
return None
return self._current['title']
@property
def media_series_title(self):
"""Return the title of current episode of TV show."""
if self._is_standby:
return None
return self._current.get('episodeTitle')
@property
def media_channel(self):
"""Return the channel current playing media."""
if self._is_standby:
return None
return "{} ({})".format(
self._current['callsign'], self._current['major'])
@property
def source(self):
"""Name of the current input source."""
if self._is_standby:
return None
return self._current['major']
@property
def supported_features(self):
"""Flag media player features that are supported."""
return SUPPORT_DTV_CLIENT if self._is_client else SUPPORT_DTV
@property
def media_currently_recording(self):
"""If the media is currently being recorded or not."""
if self._is_standby:
return None
return self._current['isRecording']
@property
def media_rating(self):
"""TV Rating of the current playing media."""
if self._is_standby:
return None
return self._current['rating']
@property
def media_recorded(self):
"""If the media was recorded or live."""
if self._is_standby:
return None
return self._is_recorded
@property
def media_start_time(self):
"""Start time the program aired."""
if self._is_standby:
return None
return dt_util.as_local(
dt_util.utc_from_timestamp(self._current['startTime']))
def turn_on(self):
"""Turn on the receiver."""
if self._is_client:
raise NotImplementedError()
_LOGGER.debug("Turn on %s", self._name)
self.dtv.key_press('poweron')
def turn_off(self):
"""Turn off the receiver."""
if self._is_client:
raise NotImplementedError()
_LOGGER.debug("Turn off %s", self._name)
self.dtv.key_press('poweroff')
def media_play(self):
"""Send play command."""
_LOGGER.debug("Play on %s", self._name)
self.dtv.key_press('play')
def media_pause(self):
"""Send pause command."""
_LOGGER.debug("Pause on %s", self._name)
self.dtv.key_press('pause')
def media_stop(self):
"""Send stop command."""
_LOGGER.debug("Stop on %s", self._name)
self.dtv.key_press('stop')
def media_previous_track(self):
"""Send rewind command."""
_LOGGER.debug("Rewind on %s", self._name)
self.dtv.key_press('rew')
def media_next_track(self):
"""Send fast forward command."""
_LOGGER.debug("Fast forward on %s", self._name)
self.dtv.key_press('ffwd')
def play_media(self, media_type, media_id, **kwargs):
"""Select input source."""
if media_type != MEDIA_TYPE_CHANNEL:
_LOGGER.error("Invalid media type %s. Only %s is supported",
media_type, MEDIA_TYPE_CHANNEL)
return
_LOGGER.debug("Changing channel on %s to %s", self._name, media_id)
self.dtv.tune_channel(media_id)