192 lines
6.1 KiB
Python
192 lines
6.1 KiB
Python
"""Support for OASA Telematics from telematics.oasa.gr."""
|
|
import logging
|
|
from datetime import timedelta
|
|
from operator import itemgetter
|
|
|
|
import voluptuous as vol
|
|
|
|
import homeassistant.helpers.config_validation as cv
|
|
from homeassistant.components.sensor import PLATFORM_SCHEMA
|
|
from homeassistant.const import (
|
|
CONF_NAME, ATTR_ATTRIBUTION, DEVICE_CLASS_TIMESTAMP)
|
|
from homeassistant.helpers.entity import Entity
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
ATTR_STOP_ID = 'stop_id'
|
|
ATTR_STOP_NAME = 'stop_name'
|
|
ATTR_ROUTE_ID = 'route_id'
|
|
ATTR_ROUTE_NAME = 'route_name'
|
|
ATTR_NEXT_ARRIVAL = 'next_arrival'
|
|
ATTR_SECOND_NEXT_ARRIVAL = 'second_next_arrival'
|
|
ATTR_NEXT_DEPARTURE = 'next_departure'
|
|
|
|
ATTRIBUTION = "Data retrieved from telematics.oasa.gr"
|
|
|
|
CONF_STOP_ID = 'stop_id'
|
|
CONF_ROUTE_ID = 'route_id'
|
|
|
|
DEFAULT_NAME = 'OASA Telematics'
|
|
ICON = 'mdi:bus'
|
|
|
|
SCAN_INTERVAL = timedelta(seconds=60)
|
|
|
|
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
|
|
vol.Required(CONF_STOP_ID): cv.string,
|
|
vol.Required(CONF_ROUTE_ID): cv.string,
|
|
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string
|
|
})
|
|
|
|
|
|
def setup_platform(hass, config, add_entities, discovery_info=None):
|
|
"""Set up the OASA Telematics sensor."""
|
|
name = config[CONF_NAME]
|
|
stop_id = config[CONF_STOP_ID]
|
|
route_id = config.get(CONF_ROUTE_ID)
|
|
|
|
data = OASATelematicsData(stop_id, route_id)
|
|
|
|
add_entities([OASATelematicsSensor(
|
|
data, stop_id, route_id, name)], True)
|
|
|
|
|
|
class OASATelematicsSensor(Entity):
|
|
"""Implementation of the OASA Telematics sensor."""
|
|
|
|
def __init__(self, data, stop_id, route_id, name):
|
|
"""Initialize the sensor."""
|
|
self.data = data
|
|
self._name = name
|
|
self._stop_id = stop_id
|
|
self._route_id = route_id
|
|
self._name_data = self._times = self._state = None
|
|
|
|
@property
|
|
def name(self):
|
|
"""Return the name of the sensor."""
|
|
return self._name
|
|
|
|
@property
|
|
def device_class(self):
|
|
"""Return the class of this sensor."""
|
|
return DEVICE_CLASS_TIMESTAMP
|
|
|
|
@property
|
|
def state(self):
|
|
"""Return the state of the sensor."""
|
|
return self._state
|
|
|
|
@property
|
|
def device_state_attributes(self):
|
|
"""Return the state attributes."""
|
|
params = {}
|
|
if self._times is not None:
|
|
next_arrival_data = self._times[0]
|
|
if ATTR_NEXT_ARRIVAL in next_arrival_data:
|
|
next_arrival = next_arrival_data[ATTR_NEXT_ARRIVAL]
|
|
params.update({
|
|
ATTR_NEXT_ARRIVAL: next_arrival.isoformat()
|
|
})
|
|
if len(self._times) > 1:
|
|
second_next_arrival_time = self._times[1][ATTR_NEXT_ARRIVAL]
|
|
if second_next_arrival_time is not None:
|
|
second_arrival = second_next_arrival_time
|
|
params.update({
|
|
ATTR_SECOND_NEXT_ARRIVAL: second_arrival.isoformat()
|
|
})
|
|
params.update({
|
|
ATTR_ROUTE_ID: self._times[0][ATTR_ROUTE_ID],
|
|
ATTR_STOP_ID: self._stop_id,
|
|
ATTR_ATTRIBUTION: ATTRIBUTION,
|
|
})
|
|
params.update({
|
|
ATTR_ROUTE_NAME: self._name_data[ATTR_ROUTE_NAME],
|
|
ATTR_STOP_NAME: self._name_data[ATTR_STOP_NAME]
|
|
})
|
|
return {k: v for k, v in params.items() if v}
|
|
|
|
@property
|
|
def icon(self):
|
|
"""Icon to use in the frontend, if any."""
|
|
return ICON
|
|
|
|
def update(self):
|
|
"""Get the latest data from OASA API and update the states."""
|
|
self.data.update()
|
|
self._times = self.data.info
|
|
self._name_data = self.data.name_data
|
|
next_arrival_data = self._times[0]
|
|
if ATTR_NEXT_ARRIVAL in next_arrival_data:
|
|
self._state = next_arrival_data[ATTR_NEXT_ARRIVAL].isoformat()
|
|
|
|
|
|
class OASATelematicsData():
|
|
"""The class for handling data retrieval."""
|
|
|
|
def __init__(self, stop_id, route_id):
|
|
"""Initialize the data object."""
|
|
import oasatelematics
|
|
self.stop_id = stop_id
|
|
self.route_id = route_id
|
|
self.info = self.empty_result()
|
|
self.oasa_api = oasatelematics
|
|
self.name_data = {ATTR_ROUTE_NAME: self.get_route_name(),
|
|
ATTR_STOP_NAME: self.get_stop_name()}
|
|
|
|
def empty_result(self):
|
|
"""Object returned when no arrivals are found."""
|
|
return [{ATTR_ROUTE_ID: self.route_id}]
|
|
|
|
def get_route_name(self):
|
|
"""Get the route name from the API."""
|
|
try:
|
|
route = self.oasa_api.getRouteName(self.route_id)
|
|
if route:
|
|
return route[0].get('route_departure_eng')
|
|
except TypeError:
|
|
_LOGGER.error("Cannot get route name from OASA API")
|
|
return None
|
|
|
|
def get_stop_name(self):
|
|
"""Get the stop name from the API."""
|
|
try:
|
|
name_data = self.oasa_api.getStopNameAndXY(self.stop_id)
|
|
if name_data:
|
|
return name_data[0].get('stop_descr_matrix_eng')
|
|
except TypeError:
|
|
_LOGGER.error("Cannot get stop name from OASA API")
|
|
return None
|
|
|
|
def update(self):
|
|
"""Get the latest arrival data from telematics.oasa.gr API."""
|
|
self.info = []
|
|
|
|
results = self.oasa_api.getStopArrivals(self.stop_id)
|
|
|
|
if not results:
|
|
self.info = self.empty_result()
|
|
return
|
|
|
|
# Parse results
|
|
results = [r for r in results if r.get('route_code') in self.route_id]
|
|
current_time = dt_util.utcnow()
|
|
|
|
for result in results:
|
|
btime2 = result.get('btime2')
|
|
if btime2 is not None:
|
|
arrival_min = int(btime2)
|
|
timestamp = current_time + timedelta(minutes=arrival_min)
|
|
arrival_data = {ATTR_NEXT_ARRIVAL: timestamp,
|
|
ATTR_ROUTE_ID: self.route_id}
|
|
self.info.append(arrival_data)
|
|
|
|
if not self.info:
|
|
_LOGGER.debug("No arrivals with given parameters")
|
|
self.info = self.empty_result()
|
|
return
|
|
|
|
# Sort the data by time
|
|
sort = sorted(self.info, key=itemgetter(ATTR_NEXT_ARRIVAL))
|
|
self.info = sort
|