core/homeassistant/components/aprs/device_tracker.py

188 lines
5.7 KiB
Python

"""Support for APRS device tracking."""
import logging
import threading
import voluptuous as vol
from homeassistant.components.device_tracker import PLATFORM_SCHEMA
from homeassistant.const import (
ATTR_GPS_ACCURACY, ATTR_LATITUDE, ATTR_LONGITUDE,
CONF_HOST, CONF_PASSWORD, CONF_TIMEOUT, CONF_USERNAME,
EVENT_HOMEASSISTANT_STOP)
import homeassistant.helpers.config_validation as cv
from homeassistant.util import slugify
DOMAIN = 'aprs'
_LOGGER = logging.getLogger(__name__)
ATTR_ALTITUDE = 'altitude'
ATTR_COURSE = 'course'
ATTR_COMMENT = 'comment'
ATTR_FROM = 'from'
ATTR_FORMAT = 'format'
ATTR_POS_AMBIGUITY = 'posambiguity'
ATTR_SPEED = 'speed'
CONF_CALLSIGNS = 'callsigns'
DEFAULT_HOST = 'rotate.aprs2.net'
DEFAULT_PASSWORD = '-1'
DEFAULT_TIMEOUT = 30.0
FILTER_PORT = 14580
MSG_FORMATS = ['compressed', 'uncompressed', 'mic-e']
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_CALLSIGNS): cv.ensure_list,
vol.Required(CONF_USERNAME): cv.string,
vol.Optional(CONF_PASSWORD,
default=DEFAULT_PASSWORD): cv.string,
vol.Optional(CONF_HOST,
default=DEFAULT_HOST): cv.string,
vol.Optional(CONF_TIMEOUT,
default=DEFAULT_TIMEOUT): vol.Coerce(float),
})
def make_filter(callsigns: list) -> str:
"""Make a server-side filter from a list of callsigns."""
return ' '.join('b/{0}'.format(cs.upper()) for cs in callsigns)
def gps_accuracy(gps, posambiguity: int) -> int:
"""Calculate the GPS accuracy based on APRS posambiguity."""
import geopy.distance
pos_a_map = {0: 0,
1: 1 / 600,
2: 1 / 60,
3: 1 / 6,
4: 1}
if posambiguity in pos_a_map:
degrees = pos_a_map[posambiguity]
gps2 = (gps[0], gps[1] + degrees)
dist_m = geopy.distance.distance(gps, gps2).m
accuracy = round(dist_m)
else:
message = "APRS position ambiguity must be 0-4, not '{0}'.".format(
posambiguity)
raise ValueError(message)
return accuracy
def setup_scanner(hass, config, see, discovery_info=None):
"""Set up the APRS tracker."""
callsigns = config.get(CONF_CALLSIGNS)
server_filter = make_filter(callsigns)
callsign = config.get(CONF_USERNAME)
password = config.get(CONF_PASSWORD)
host = config.get(CONF_HOST)
timeout = config.get(CONF_TIMEOUT)
aprs_listener = AprsListenerThread(
callsign, password, host, server_filter, see)
def aprs_disconnect(event):
"""Stop the APRS connection."""
aprs_listener.stop()
aprs_listener.start()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, aprs_disconnect)
if not aprs_listener.start_event.wait(timeout):
_LOGGER.error("Timeout waiting for APRS to connect.")
return
if not aprs_listener.start_success:
_LOGGER.error(aprs_listener.start_message)
return
_LOGGER.debug(aprs_listener.start_message)
return True
class AprsListenerThread(threading.Thread):
"""APRS message listener."""
def __init__(self, callsign: str, password: str, host: str,
server_filter: str, see):
"""Initialize the class."""
super().__init__()
import aprslib
self.callsign = callsign
self.host = host
self.start_event = threading.Event()
self.see = see
self.server_filter = server_filter
self.start_message = ""
self.start_success = False
self.ais = aprslib.IS(
self.callsign, passwd=password, host=self.host, port=FILTER_PORT)
def start_complete(self, success: bool, message: str):
"""Complete startup process."""
self.start_message = message
self.start_success = success
self.start_event.set()
def run(self):
"""Connect to APRS and listen for data."""
self.ais.set_filter(self.server_filter)
from aprslib import ConnectionError as AprsConnectionError
from aprslib import LoginError
try:
_LOGGER.info("Opening connection to %s with callsign %s.",
self.host, self.callsign)
self.ais.connect()
self.start_complete(
True,
"Connected to {0} with callsign {1}.".format(
self.host, self.callsign))
self.ais.consumer(callback=self.rx_msg, immortal=True)
except (AprsConnectionError, LoginError) as err:
self.start_complete(False, str(err))
except OSError:
_LOGGER.info("Closing connection to %s with callsign %s.",
self.host, self.callsign)
def stop(self):
"""Close the connection to the APRS network."""
self.ais.close()
def rx_msg(self, msg: dict):
"""Receive message and process if position."""
_LOGGER.debug("APRS message received: %s", str(msg))
if msg[ATTR_FORMAT] in MSG_FORMATS:
dev_id = slugify(msg[ATTR_FROM])
lat = msg[ATTR_LATITUDE]
lon = msg[ATTR_LONGITUDE]
attrs = {}
if ATTR_POS_AMBIGUITY in msg:
pos_amb = msg[ATTR_POS_AMBIGUITY]
try:
attrs[ATTR_GPS_ACCURACY] = gps_accuracy((lat, lon),
pos_amb)
except ValueError:
_LOGGER.warning(
"APRS message contained invalid posambiguity: %s",
str(pos_amb))
for attr in [ATTR_ALTITUDE,
ATTR_COMMENT,
ATTR_COURSE,
ATTR_SPEED]:
if attr in msg:
attrs[attr] = msg[attr]
self.see(dev_id=dev_id, gps=(lat, lon), attributes=attrs)