200 lines
6.0 KiB
Python
200 lines
6.0 KiB
Python
"""Sensor for the Open Sky Network."""
|
|
from __future__ import annotations
|
|
|
|
from datetime import timedelta
|
|
|
|
from python_opensky import BoundingBox, OpenSky, StateVector
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.components.sensor import PLATFORM_SCHEMA, SensorEntity
|
|
from homeassistant.const import (
|
|
ATTR_LATITUDE,
|
|
ATTR_LONGITUDE,
|
|
CONF_LATITUDE,
|
|
CONF_LONGITUDE,
|
|
CONF_NAME,
|
|
CONF_RADIUS,
|
|
)
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
|
import homeassistant.helpers.config_validation as cv
|
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
|
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
|
|
|
CONF_ALTITUDE = "altitude"
|
|
|
|
ATTR_ICAO24 = "icao24"
|
|
ATTR_CALLSIGN = "callsign"
|
|
ATTR_ALTITUDE = "altitude"
|
|
ATTR_ON_GROUND = "on_ground"
|
|
ATTR_SENSOR = "sensor"
|
|
ATTR_STATES = "states"
|
|
|
|
DOMAIN = "opensky"
|
|
|
|
DEFAULT_ALTITUDE = 0
|
|
|
|
EVENT_OPENSKY_ENTRY = f"{DOMAIN}_entry"
|
|
EVENT_OPENSKY_EXIT = f"{DOMAIN}_exit"
|
|
# OpenSky free user has 400 credits, with 4 credits per API call. 100/24 = ~4 requests per hour
|
|
SCAN_INTERVAL = timedelta(minutes=15)
|
|
|
|
OPENSKY_API_URL = "https://opensky-network.org/api/states/all"
|
|
OPENSKY_API_FIELDS = [
|
|
ATTR_ICAO24,
|
|
ATTR_CALLSIGN,
|
|
"origin_country",
|
|
"time_position",
|
|
"time_velocity",
|
|
ATTR_LONGITUDE,
|
|
ATTR_LATITUDE,
|
|
ATTR_ALTITUDE,
|
|
ATTR_ON_GROUND,
|
|
"velocity",
|
|
"heading",
|
|
"vertical_rate",
|
|
"sensors",
|
|
]
|
|
|
|
|
|
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
|
|
{
|
|
vol.Required(CONF_RADIUS): vol.Coerce(float),
|
|
vol.Optional(CONF_NAME): cv.string,
|
|
vol.Inclusive(CONF_LATITUDE, "coordinates"): cv.latitude,
|
|
vol.Inclusive(CONF_LONGITUDE, "coordinates"): cv.longitude,
|
|
vol.Optional(CONF_ALTITUDE, default=DEFAULT_ALTITUDE): vol.Coerce(float),
|
|
}
|
|
)
|
|
|
|
|
|
def setup_platform(
|
|
hass: HomeAssistant,
|
|
config: ConfigType,
|
|
add_entities: AddEntitiesCallback,
|
|
discovery_info: DiscoveryInfoType | None = None,
|
|
) -> None:
|
|
"""Set up the Open Sky platform."""
|
|
latitude = config.get(CONF_LATITUDE, hass.config.latitude)
|
|
longitude = config.get(CONF_LONGITUDE, hass.config.longitude)
|
|
radius = config.get(CONF_RADIUS, 0)
|
|
bounding_box = OpenSky.get_bounding_box(latitude, longitude, radius)
|
|
session = async_get_clientsession(hass)
|
|
opensky = OpenSky(session=session)
|
|
add_entities(
|
|
[
|
|
OpenSkySensor(
|
|
hass,
|
|
config.get(CONF_NAME, DOMAIN),
|
|
opensky,
|
|
bounding_box,
|
|
config[CONF_ALTITUDE],
|
|
)
|
|
],
|
|
True,
|
|
)
|
|
|
|
|
|
class OpenSkySensor(SensorEntity):
|
|
"""Open Sky Network Sensor."""
|
|
|
|
_attr_attribution = (
|
|
"Information provided by the OpenSky Network (https://opensky-network.org)"
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
name: str,
|
|
opensky: OpenSky,
|
|
bounding_box: BoundingBox,
|
|
altitude: float,
|
|
) -> None:
|
|
"""Initialize the sensor."""
|
|
self._altitude = altitude
|
|
self._state = 0
|
|
self._hass = hass
|
|
self._name = name
|
|
self._previously_tracked: set[str] = set()
|
|
self._opensky = opensky
|
|
self._bounding_box = bounding_box
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
"""Return the name of the sensor."""
|
|
return self._name
|
|
|
|
@property
|
|
def native_value(self) -> int:
|
|
"""Return the state of the sensor."""
|
|
return self._state
|
|
|
|
def _handle_boundary(
|
|
self, flights: set[str], event: str, metadata: dict[str, StateVector]
|
|
) -> None:
|
|
"""Handle flights crossing region boundary."""
|
|
for flight in flights:
|
|
if flight in metadata:
|
|
altitude = metadata[flight].barometric_altitude
|
|
longitude = metadata[flight].longitude
|
|
latitude = metadata[flight].latitude
|
|
icao24 = metadata[flight].icao24
|
|
else:
|
|
# Assume Flight has landed if missing.
|
|
altitude = 0
|
|
longitude = None
|
|
latitude = None
|
|
icao24 = None
|
|
|
|
data = {
|
|
ATTR_CALLSIGN: flight,
|
|
ATTR_ALTITUDE: altitude,
|
|
ATTR_SENSOR: self._name,
|
|
ATTR_LONGITUDE: longitude,
|
|
ATTR_LATITUDE: latitude,
|
|
ATTR_ICAO24: icao24,
|
|
}
|
|
self._hass.bus.fire(event, data)
|
|
|
|
async def async_update(self) -> None:
|
|
"""Update device state."""
|
|
currently_tracked = set()
|
|
flight_metadata: dict[str, StateVector] = {}
|
|
response = await self._opensky.get_states(bounding_box=self._bounding_box)
|
|
for flight in response.states:
|
|
if not flight.callsign:
|
|
continue
|
|
callsign = flight.callsign.strip()
|
|
if callsign != "":
|
|
flight_metadata[callsign] = flight
|
|
else:
|
|
continue
|
|
if (
|
|
flight.longitude is None
|
|
or flight.latitude is None
|
|
or flight.on_ground
|
|
or flight.barometric_altitude is None
|
|
):
|
|
continue
|
|
altitude = flight.barometric_altitude
|
|
if altitude > self._altitude and self._altitude != 0:
|
|
continue
|
|
currently_tracked.add(callsign)
|
|
if self._previously_tracked is not None:
|
|
entries = currently_tracked - self._previously_tracked
|
|
exits = self._previously_tracked - currently_tracked
|
|
self._handle_boundary(entries, EVENT_OPENSKY_ENTRY, flight_metadata)
|
|
self._handle_boundary(exits, EVENT_OPENSKY_EXIT, flight_metadata)
|
|
self._state = len(currently_tracked)
|
|
self._previously_tracked = currently_tracked
|
|
|
|
@property
|
|
def native_unit_of_measurement(self) -> str:
|
|
"""Return the unit of measurement."""
|
|
return "flights"
|
|
|
|
@property
|
|
def icon(self) -> str:
|
|
"""Return the icon."""
|
|
return "mdi:airplane"
|