core/homeassistant/components/rmvtransport/sensor.py

262 lines
8.0 KiB
Python
Raw Normal View History

"""Support for departure information for Rhein-Main public transport."""
import asyncio
from datetime import timedelta
import logging
2019-10-12 05:21:53 +00:00
from RMVtransport import RMVtransport
from RMVtransport.rmvtransport import RMVtransportApiConnectionError
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import ATTR_ATTRIBUTION, CONF_NAME, TIME_MINUTES
from homeassistant.exceptions import PlatformNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.util import Throttle
_LOGGER = logging.getLogger(__name__)
2019-07-31 19:25:30 +00:00
CONF_NEXT_DEPARTURE = "next_departure"
2019-07-31 19:25:30 +00:00
CONF_STATION = "station"
CONF_DESTINATIONS = "destinations"
CONF_DIRECTION = "direction"
CONF_LINES = "lines"
CONF_PRODUCTS = "products"
CONF_TIME_OFFSET = "time_offset"
CONF_MAX_JOURNEYS = "max_journeys"
CONF_TIMEOUT = "timeout"
2019-07-31 19:25:30 +00:00
DEFAULT_NAME = "RMV Journey"
2019-07-31 19:25:30 +00:00
VALID_PRODUCTS = ["U-Bahn", "Tram", "Bus", "S", "RB", "RE", "EC", "IC", "ICE"]
ICONS = {
2019-07-31 19:25:30 +00:00
"U-Bahn": "mdi:subway",
"Tram": "mdi:tram",
"Bus": "mdi:bus",
"S": "mdi:train",
"RB": "mdi:train",
"RE": "mdi:train",
"EC": "mdi:train",
"IC": "mdi:train",
"ICE": "mdi:train",
"SEV": "mdi:checkbox-blank-circle-outline",
None: "mdi:clock",
}
ATTRIBUTION = "Data provided by opendata.rmv.de"
SCAN_INTERVAL = timedelta(seconds=60)
2019-07-31 19:25:30 +00:00
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_NEXT_DEPARTURE): [
{
vol.Required(CONF_STATION): cv.string,
vol.Optional(CONF_DESTINATIONS, default=[]): vol.All(
cv.ensure_list, [cv.string]
),
vol.Optional(CONF_DIRECTION): cv.string,
vol.Optional(CONF_LINES, default=[]): vol.All(
cv.ensure_list, [cv.positive_int, cv.string]
),
vol.Optional(CONF_PRODUCTS, default=VALID_PRODUCTS): vol.All(
cv.ensure_list, [vol.In(VALID_PRODUCTS)]
),
vol.Optional(CONF_TIME_OFFSET, default=0): cv.positive_int,
vol.Optional(CONF_MAX_JOURNEYS, default=5): cv.positive_int,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
}
],
vol.Optional(CONF_TIMEOUT, default=10): cv.positive_int,
}
)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the RMV departure sensor."""
timeout = config.get(CONF_TIMEOUT)
session = async_get_clientsession(hass)
sensors = []
for next_departure in config.get(CONF_NEXT_DEPARTURE):
sensors.append(
RMVDepartureSensor(
session,
next_departure[CONF_STATION],
next_departure.get(CONF_DESTINATIONS),
2018-10-10 15:59:55 +00:00
next_departure.get(CONF_DIRECTION),
next_departure.get(CONF_LINES),
next_departure.get(CONF_PRODUCTS),
next_departure.get(CONF_TIME_OFFSET),
next_departure.get(CONF_MAX_JOURNEYS),
next_departure.get(CONF_NAME),
2019-07-31 19:25:30 +00:00
timeout,
)
)
tasks = [sensor.async_update() for sensor in sensors]
if tasks:
await asyncio.wait(tasks)
if not all(sensor.data.departures for sensor in sensors):
raise PlatformNotReady
async_add_entities(sensors)
class RMVDepartureSensor(Entity):
"""Implementation of an RMV departure sensor."""
2019-07-31 19:25:30 +00:00
def __init__(
self,
session,
station,
destinations,
direction,
lines,
products,
time_offset,
max_journeys,
name,
timeout,
):
"""Initialize the sensor."""
self._station = station
self._name = name
self._state = None
2019-07-31 19:25:30 +00:00
self.data = RMVDepartureData(
session,
station,
destinations,
direction,
lines,
products,
time_offset,
max_journeys,
timeout,
)
self._icon = ICONS[None]
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def available(self):
"""Return True if entity is available."""
return self._state is not None
@property
def state(self):
"""Return the next departure time."""
return self._state
@property
def state_attributes(self):
"""Return the state attributes."""
try:
return {
"next_departures": self.data.departures[1:],
2019-07-31 19:25:30 +00:00
"direction": self.data.departures[0].get("direction"),
"line": self.data.departures[0].get("line"),
"minutes": self.data.departures[0].get("minutes"),
"departure_time": self.data.departures[0].get("departure_time"),
"product": self.data.departures[0].get("product"),
}
except IndexError:
return {}
@property
def icon(self):
"""Icon to use in the frontend, if any."""
return self._icon
@property
def unit_of_measurement(self):
"""Return the unit this state is expressed in."""
return TIME_MINUTES
async def async_update(self):
"""Get the latest data and update the state."""
await self.data.async_update()
if not self.data.departures:
self._state = None
self._icon = ICONS[None]
return
if self._name == DEFAULT_NAME:
self._name = self.data.station
self._station = self.data.station
2019-07-31 19:25:30 +00:00
self._state = self.data.departures[0].get("minutes")
self._icon = ICONS[self.data.departures[0].get("product")]
class RMVDepartureData:
"""Pull data from the opendata.rmv.de web page."""
2019-07-31 19:25:30 +00:00
def __init__(
self,
session,
station_id,
destinations,
direction,
lines,
products,
time_offset,
max_journeys,
timeout,
):
"""Initialize the sensor."""
self.station = None
self._station_id = station_id
self._destinations = destinations
2018-10-10 15:59:55 +00:00
self._direction = direction
self._lines = lines
self._products = products
self._time_offset = time_offset
self._max_journeys = max_journeys
self.rmv = RMVtransport(session, timeout)
self.departures = []
@Throttle(SCAN_INTERVAL)
async def async_update(self):
"""Update the connection data."""
try:
2019-07-31 19:25:30 +00:00
_data = await self.rmv.get_departures(
self._station_id,
products=self._products,
2019-10-11 16:29:27 +00:00
direction_id=self._direction,
max_journeys=50,
2019-07-31 19:25:30 +00:00
)
except RMVtransportApiConnectionError:
self.departures = []
_LOGGER.warning("Could not retrieve data from rmv.de")
return
2019-07-31 19:25:30 +00:00
self.station = _data.get("station")
_deps = []
2019-07-31 19:25:30 +00:00
for journey in _data["journeys"]:
# find the first departure meeting the criteria
_nextdep = {ATTR_ATTRIBUTION: ATTRIBUTION}
if self._destinations:
dest_found = False
for dest in self._destinations:
2019-07-31 19:25:30 +00:00
if dest in journey["stops"]:
dest_found = True
2019-07-31 19:25:30 +00:00
_nextdep["destination"] = dest
if not dest_found:
continue
2019-07-31 19:25:30 +00:00
elif self._lines and journey["number"] not in self._lines:
continue
2019-07-31 19:25:30 +00:00
elif journey["minutes"] < self._time_offset:
continue
2019-07-31 19:25:30 +00:00
for attr in ["direction", "departure_time", "product", "minutes"]:
_nextdep[attr] = journey.get(attr, "")
_nextdep["line"] = journey.get("number", "")
_deps.append(_nextdep)
if len(_deps) > self._max_journeys:
break
self.departures = _deps