core/homeassistant/components/sensor/rmvtransport.py

203 lines
6.9 KiB
Python

"""
Support for real-time departure information for Rhein-Main public transport.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.rmvtransport/
"""
import logging
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (CONF_NAME, ATTR_ATTRIBUTION)
REQUIREMENTS = ['PyRMVtransport==0.1']
_LOGGER = logging.getLogger(__name__)
CONF_NEXT_DEPARTURE = 'next_departure'
CONF_STATION = 'station'
CONF_DESTINATIONS = 'destinations'
CONF_DIRECTIONS = 'directions'
CONF_LINES = 'lines'
CONF_PRODUCTS = 'products'
CONF_TIME_OFFSET = 'time_offset'
CONF_MAX_JOURNEYS = 'max_journeys'
DEFAULT_NAME = 'RMV Journey'
VALID_PRODUCTS = ['U-Bahn', 'Tram', 'Bus', 'S', 'RB', 'RE', 'EC', 'IC', 'ICE']
ICONS = {
'U-Bahn': 'mdi:subway',
'Tram': 'mdi:tram',
'Bus': 'mdi:bus',
'S': 'mdi:train',
'RB': 'mdi:train',
'RE': 'mdi:train',
'EC': 'mdi:train',
'IC': 'mdi:train',
'ICE': 'mdi:train',
'SEV': 'mdi:checkbox-blank-circle-outline',
None: 'mdi:clock'
}
ATTRIBUTION = "Data provided by opendata.rmv.de"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_NEXT_DEPARTURE): [{
vol.Required(CONF_STATION): cv.string,
vol.Optional(CONF_DESTINATIONS, default=[]):
vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_DIRECTIONS, default=[]):
vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_LINES, default=[]):
vol.All(cv.ensure_list, [cv.positive_int, cv.string]),
vol.Optional(CONF_PRODUCTS, default=VALID_PRODUCTS):
vol.All(cv.ensure_list, [vol.In(VALID_PRODUCTS)]),
vol.Optional(CONF_TIME_OFFSET, default=0): cv.positive_int,
vol.Optional(CONF_MAX_JOURNEYS, default=5): cv.positive_int,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string}]
})
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the RMV departure sensor."""
sensors = []
for next_departure in config.get(CONF_NEXT_DEPARTURE):
sensors.append(
RMVDepartureSensor(
next_departure[CONF_STATION],
next_departure.get(CONF_DESTINATIONS),
next_departure.get(CONF_DIRECTIONS),
next_departure.get(CONF_LINES),
next_departure.get(CONF_PRODUCTS),
next_departure.get(CONF_TIME_OFFSET),
next_departure.get(CONF_MAX_JOURNEYS),
next_departure.get(CONF_NAME)))
add_entities(sensors, True)
class RMVDepartureSensor(Entity):
"""Implementation of an RMV departure sensor."""
def __init__(self, station, destinations, directions,
lines, products, time_offset, max_journeys, name):
"""Initialize the sensor."""
self._station = station
self._name = name
self._state = None
self.data = RMVDepartureData(station, destinations, directions, lines,
products, time_offset, max_journeys)
self._icon = ICONS[None]
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def available(self):
"""Return True if entity is available."""
return self._state is not None
@property
def state(self):
"""Return the next departure time."""
return self._state
@property
def state_attributes(self):
"""Return the state attributes."""
try:
return {
'next_departures': [val for val in self.data.departures[1:]],
'direction': self.data.departures[0].get('direction'),
'line': self.data.departures[0].get('line'),
'minutes': self.data.departures[0].get('minutes'),
'departure_time':
self.data.departures[0].get('departure_time'),
'product': self.data.departures[0].get('product'),
}
except IndexError:
return {}
@property
def icon(self):
"""Icon to use in the frontend, if any."""
return self._icon
@property
def unit_of_measurement(self):
"""Return the unit this state is expressed in."""
return "min"
def update(self):
"""Get the latest data and update the state."""
self.data.update()
if not self.data.departures:
self._state = None
self._icon = ICONS[None]
return
if self._name == DEFAULT_NAME:
self._name = self.data.station
self._station = self.data.station
self._state = self.data.departures[0].get('minutes')
self._icon = ICONS[self.data.departures[0].get('product')]
class RMVDepartureData:
"""Pull data from the opendata.rmv.de web page."""
def __init__(self, station_id, destinations, directions,
lines, products, time_offset, max_journeys):
"""Initialize the sensor."""
import RMVtransport
self.station = None
self._station_id = station_id
self._destinations = destinations
self._directions = directions
self._lines = lines
self._products = products
self._time_offset = time_offset
self._max_journeys = max_journeys
self.rmv = RMVtransport.RMVtransport()
self.departures = []
def update(self):
"""Update the connection data."""
try:
_data = self.rmv.get_departures(self._station_id,
products=self._products,
maxJourneys=50)
except ValueError:
self.departures = []
_LOGGER.warning("Returned data not understood")
return
self.station = _data.get('station')
_deps = []
for journey in _data['journeys']:
# find the first departure meeting the criteria
_nextdep = {ATTR_ATTRIBUTION: ATTRIBUTION}
if self._destinations:
dest_found = False
for dest in self._destinations:
if dest in journey['stops']:
dest_found = True
_nextdep['destination'] = dest
if not dest_found:
continue
elif self._lines and journey['number'] not in self._lines:
continue
elif journey['minutes'] < self._time_offset:
continue
for attr in ['direction', 'departure_time', 'product', 'minutes']:
_nextdep[attr] = journey.get(attr, '')
_nextdep['line'] = journey.get('number', '')
_deps.append(_nextdep)
if len(_deps) > self._max_journeys:
break
self.departures = _deps