core/homeassistant/components/opensky/sensor.py

187 lines
5.5 KiB
Python
Raw Normal View History

"""Sensor for the Open Sky Network."""
from datetime import timedelta
import logging
import requests
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
2019-07-31 19:25:30 +00:00
ATTR_ATTRIBUTION,
ATTR_LATITUDE,
ATTR_LONGITUDE,
CONF_LATITUDE,
CONF_LONGITUDE,
CONF_NAME,
CONF_RADIUS,
2019-07-31 19:25:30 +00:00
LENGTH_KILOMETERS,
LENGTH_METERS,
)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.util import distance as util_distance, location as util_location
_LOGGER = logging.getLogger(__name__)
2019-07-31 19:25:30 +00:00
CONF_ALTITUDE = "altitude"
2019-07-31 19:25:30 +00:00
ATTR_CALLSIGN = "callsign"
ATTR_ALTITUDE = "altitude"
ATTR_ON_GROUND = "on_ground"
ATTR_SENSOR = "sensor"
ATTR_STATES = "states"
2019-07-31 19:25:30 +00:00
DOMAIN = "opensky"
DEFAULT_ALTITUDE = 0
EVENT_OPENSKY_ENTRY = f"{DOMAIN}_entry"
EVENT_OPENSKY_EXIT = f"{DOMAIN}_exit"
SCAN_INTERVAL = timedelta(seconds=12) # opensky public limit is 10 seconds
2019-07-31 19:25:30 +00:00
OPENSKY_ATTRIBUTION = (
"Information provided by the OpenSky Network " "(https://opensky-network.org)"
)
OPENSKY_API_URL = "https://opensky-network.org/api/states/all"
OPENSKY_API_FIELDS = [
2019-07-31 19:25:30 +00:00
"icao24",
ATTR_CALLSIGN,
"origin_country",
"time_position",
"time_velocity",
ATTR_LONGITUDE,
ATTR_LATITUDE,
ATTR_ALTITUDE,
ATTR_ON_GROUND,
"velocity",
"heading",
"vertical_rate",
"sensors",
]
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_RADIUS): vol.Coerce(float),
vol.Optional(CONF_NAME): cv.string,
vol.Inclusive(CONF_LATITUDE, "coordinates"): cv.latitude,
vol.Inclusive(CONF_LONGITUDE, "coordinates"): cv.longitude,
vol.Optional(CONF_ALTITUDE, default=DEFAULT_ALTITUDE): vol.Coerce(float),
}
)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Open Sky platform."""
latitude = config.get(CONF_LATITUDE, hass.config.latitude)
longitude = config.get(CONF_LONGITUDE, hass.config.longitude)
2019-07-31 19:25:30 +00:00
add_entities(
[
OpenSkySensor(
hass,
config.get(CONF_NAME, DOMAIN),
latitude,
longitude,
config.get(CONF_RADIUS),
config.get(CONF_ALTITUDE),
)
],
True,
)
class OpenSkySensor(Entity):
"""Open Sky Network Sensor."""
def __init__(self, hass, name, latitude, longitude, radius, altitude):
"""Initialize the sensor."""
self._session = requests.Session()
self._latitude = latitude
self._longitude = longitude
2019-07-31 19:25:30 +00:00
self._radius = util_distance.convert(radius, LENGTH_KILOMETERS, LENGTH_METERS)
self._altitude = altitude
self._state = 0
self._hass = hass
self._name = name
self._previously_tracked = None
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def state(self):
"""Return the state of the sensor."""
return self._state
def _handle_boundary(self, flights, event, metadata):
"""Handle flights crossing region boundary."""
for flight in flights:
if flight in metadata:
altitude = metadata[flight].get(ATTR_ALTITUDE)
else:
# Assume Flight has landed if missing.
altitude = 0
data = {
ATTR_CALLSIGN: flight,
ATTR_ALTITUDE: altitude,
ATTR_SENSOR: self._name,
}
self._hass.bus.fire(event, data)
def update(self):
"""Update device state."""
currently_tracked = set()
flight_metadata = {}
states = self._session.get(OPENSKY_API_URL).json().get(ATTR_STATES)
for state in states:
flight = dict(zip(OPENSKY_API_FIELDS, state))
callsign = flight[ATTR_CALLSIGN].strip()
2019-07-31 19:25:30 +00:00
if callsign != "":
flight_metadata[callsign] = flight
else:
continue
missing_location = (
2019-07-31 19:25:30 +00:00
flight.get(ATTR_LONGITUDE) is None or flight.get(ATTR_LATITUDE) is None
)
if missing_location:
continue
if flight.get(ATTR_ON_GROUND):
continue
distance = util_location.distance(
2019-07-31 19:25:30 +00:00
self._latitude,
self._longitude,
flight.get(ATTR_LATITUDE),
flight.get(ATTR_LONGITUDE),
)
if distance is None or distance > self._radius:
continue
altitude = flight.get(ATTR_ALTITUDE)
if altitude > self._altitude and self._altitude != 0:
continue
currently_tracked.add(callsign)
if self._previously_tracked is not None:
entries = currently_tracked - self._previously_tracked
exits = self._previously_tracked - currently_tracked
2019-07-31 19:25:30 +00:00
self._handle_boundary(entries, EVENT_OPENSKY_ENTRY, flight_metadata)
self._handle_boundary(exits, EVENT_OPENSKY_EXIT, flight_metadata)
self._state = len(currently_tracked)
self._previously_tracked = currently_tracked
@property
def device_state_attributes(self):
"""Return the state attributes."""
2019-07-31 19:25:30 +00:00
return {ATTR_ATTRIBUTION: OPENSKY_ATTRIBUTION}
@property
def unit_of_measurement(self):
"""Return the unit of measurement."""
2019-07-31 19:25:30 +00:00
return "flights"
@property
def icon(self):
"""Return the icon."""
2019-07-31 19:25:30 +00:00
return "mdi:airplane"