189 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			189 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
"""Module with location helpers.
 | 
						|
 | 
						|
detect_location_info and elevation are mocked by default during tests.
 | 
						|
"""
 | 
						|
 | 
						|
from __future__ import annotations
 | 
						|
 | 
						|
from functools import lru_cache
 | 
						|
import math
 | 
						|
from typing import Any, NamedTuple
 | 
						|
 | 
						|
import aiohttp
 | 
						|
 | 
						|
from homeassistant.const import __version__ as HA_VERSION
 | 
						|
 | 
						|
WHOAMI_URL = "https://services.home-assistant.io/whoami/v1"
 | 
						|
WHOAMI_URL_DEV = "https://services-dev.home-assistant.workers.dev/whoami/v1"
 | 
						|
 | 
						|
# Constants from https://github.com/maurycyp/vincenty
 | 
						|
# Earth ellipsoid according to WGS 84
 | 
						|
# Axis a of the ellipsoid (Radius of the earth in meters)
 | 
						|
AXIS_A = 6378137
 | 
						|
# Flattening f = (a-b) / a
 | 
						|
FLATTENING = 1 / 298.257223563
 | 
						|
# Axis b of the ellipsoid in meters.
 | 
						|
AXIS_B = 6356752.314245
 | 
						|
 | 
						|
MILES_PER_KILOMETER = 0.621371
 | 
						|
MAX_ITERATIONS = 200
 | 
						|
CONVERGENCE_THRESHOLD = 1e-12
 | 
						|
 | 
						|
 | 
						|
class LocationInfo(NamedTuple):
 | 
						|
    """Tuple with location information."""
 | 
						|
 | 
						|
    ip: str
 | 
						|
    country_code: str
 | 
						|
    currency: str
 | 
						|
    region_code: str
 | 
						|
    region_name: str
 | 
						|
    city: str
 | 
						|
    zip_code: str
 | 
						|
    time_zone: str
 | 
						|
    latitude: float
 | 
						|
    longitude: float
 | 
						|
    use_metric: bool
 | 
						|
 | 
						|
 | 
						|
async def async_detect_location_info(
 | 
						|
    session: aiohttp.ClientSession,
 | 
						|
) -> LocationInfo | None:
 | 
						|
    """Detect location information."""
 | 
						|
    if (data := await _get_whoami(session)) is None:
 | 
						|
        return None
 | 
						|
 | 
						|
    data["use_metric"] = data["country_code"] not in ("US", "MM", "LR")
 | 
						|
 | 
						|
    return LocationInfo(**data)
 | 
						|
 | 
						|
 | 
						|
@lru_cache
 | 
						|
def distance(
 | 
						|
    lat1: float | None, lon1: float | None, lat2: float, lon2: float
 | 
						|
) -> float | None:
 | 
						|
    """Calculate the distance in meters between two points.
 | 
						|
 | 
						|
    Async friendly.
 | 
						|
    """
 | 
						|
    if lat1 is None or lon1 is None:
 | 
						|
        return None
 | 
						|
    result = vincenty((lat1, lon1), (lat2, lon2))
 | 
						|
    if result is None:
 | 
						|
        return None
 | 
						|
    return result * 1000
 | 
						|
 | 
						|
 | 
						|
# Author: https://github.com/maurycyp
 | 
						|
# Source: https://github.com/maurycyp/vincenty
 | 
						|
# License: https://github.com/maurycyp/vincenty/blob/master/LICENSE
 | 
						|
def vincenty(
 | 
						|
    point1: tuple[float, float], point2: tuple[float, float], miles: bool = False
 | 
						|
) -> float | None:
 | 
						|
    """Vincenty formula (inverse method) to calculate the distance.
 | 
						|
 | 
						|
    Result in kilometers or miles between two points on the surface of a
 | 
						|
    spheroid.
 | 
						|
 | 
						|
    Async friendly.
 | 
						|
    """
 | 
						|
    # short-circuit coincident points
 | 
						|
    if point1[0] == point2[0] and point1[1] == point2[1]:
 | 
						|
        return 0.0
 | 
						|
 | 
						|
    U1 = math.atan((1 - FLATTENING) * math.tan(math.radians(point1[0])))
 | 
						|
    U2 = math.atan((1 - FLATTENING) * math.tan(math.radians(point2[0])))
 | 
						|
    L = math.radians(point2[1] - point1[1])
 | 
						|
    Lambda = L
 | 
						|
 | 
						|
    sinU1 = math.sin(U1)
 | 
						|
    cosU1 = math.cos(U1)
 | 
						|
    sinU2 = math.sin(U2)
 | 
						|
    cosU2 = math.cos(U2)
 | 
						|
 | 
						|
    for _ in range(MAX_ITERATIONS):
 | 
						|
        sinLambda = math.sin(Lambda)
 | 
						|
        cosLambda = math.cos(Lambda)
 | 
						|
        sinSigma = math.sqrt(
 | 
						|
            (cosU2 * sinLambda) ** 2 + (cosU1 * sinU2 - sinU1 * cosU2 * cosLambda) ** 2
 | 
						|
        )
 | 
						|
        if sinSigma == 0.0:
 | 
						|
            return 0.0  # coincident points
 | 
						|
        cosSigma = sinU1 * sinU2 + cosU1 * cosU2 * cosLambda
 | 
						|
        sigma = math.atan2(sinSigma, cosSigma)
 | 
						|
        sinAlpha = cosU1 * cosU2 * sinLambda / sinSigma
 | 
						|
        cosSqAlpha = 1 - sinAlpha**2
 | 
						|
        try:
 | 
						|
            cos2SigmaM = cosSigma - 2 * sinU1 * sinU2 / cosSqAlpha
 | 
						|
        except ZeroDivisionError:
 | 
						|
            cos2SigmaM = 0
 | 
						|
        C = FLATTENING / 16 * cosSqAlpha * (4 + FLATTENING * (4 - 3 * cosSqAlpha))
 | 
						|
        LambdaPrev = Lambda
 | 
						|
        Lambda = L + (1 - C) * FLATTENING * sinAlpha * (
 | 
						|
            sigma
 | 
						|
            + C * sinSigma * (cos2SigmaM + C * cosSigma * (-1 + 2 * cos2SigmaM**2))
 | 
						|
        )
 | 
						|
        if abs(Lambda - LambdaPrev) < CONVERGENCE_THRESHOLD:
 | 
						|
            break  # successful convergence
 | 
						|
    else:
 | 
						|
        return None  # failure to converge
 | 
						|
 | 
						|
    uSq = cosSqAlpha * (AXIS_A**2 - AXIS_B**2) / (AXIS_B**2)
 | 
						|
    A = 1 + uSq / 16384 * (4096 + uSq * (-768 + uSq * (320 - 175 * uSq)))
 | 
						|
    B = uSq / 1024 * (256 + uSq * (-128 + uSq * (74 - 47 * uSq)))
 | 
						|
    # fmt: off
 | 
						|
    deltaSigma = (
 | 
						|
        B
 | 
						|
        * sinSigma
 | 
						|
        * (
 | 
						|
            cos2SigmaM
 | 
						|
            + B
 | 
						|
            / 4
 | 
						|
            * (
 | 
						|
                cosSigma * (-1 + 2 * cos2SigmaM**2)
 | 
						|
                - B
 | 
						|
                / 6
 | 
						|
                * cos2SigmaM
 | 
						|
                * (-3 + 4 * sinSigma ** 2)
 | 
						|
                * (-3 + 4 * cos2SigmaM ** 2)
 | 
						|
            )
 | 
						|
        )
 | 
						|
    )
 | 
						|
    # fmt: on
 | 
						|
    s = AXIS_B * A * (sigma - deltaSigma)
 | 
						|
 | 
						|
    s /= 1000  # Conversion of meters to kilometers
 | 
						|
    if miles:
 | 
						|
        s *= MILES_PER_KILOMETER  # kilometers to miles
 | 
						|
 | 
						|
    return round(s, 6)
 | 
						|
 | 
						|
 | 
						|
async def _get_whoami(session: aiohttp.ClientSession) -> dict[str, Any] | None:
 | 
						|
    """Query whoami.home-assistant.io for location data."""
 | 
						|
    try:
 | 
						|
        resp = await session.get(
 | 
						|
            WHOAMI_URL_DEV if HA_VERSION.endswith("0.dev0") else WHOAMI_URL,
 | 
						|
            timeout=aiohttp.ClientTimeout(total=30),
 | 
						|
        )
 | 
						|
    except (aiohttp.ClientError, TimeoutError):
 | 
						|
        return None
 | 
						|
 | 
						|
    try:
 | 
						|
        raw_info = await resp.json()
 | 
						|
    except (aiohttp.ClientError, ValueError):
 | 
						|
        return None
 | 
						|
 | 
						|
    return {
 | 
						|
        "ip": raw_info.get("ip"),
 | 
						|
        "country_code": raw_info.get("country"),
 | 
						|
        "currency": raw_info.get("currency"),
 | 
						|
        "region_code": raw_info.get("region_code"),
 | 
						|
        "region_name": raw_info.get("region"),
 | 
						|
        "city": raw_info.get("city"),
 | 
						|
        "zip_code": raw_info.get("postal_code"),
 | 
						|
        "time_zone": raw_info.get("timezone"),
 | 
						|
        "latitude": float(raw_info.get("latitude")),
 | 
						|
        "longitude": float(raw_info.get("longitude")),
 | 
						|
    }
 |