2016-02-21 19:13:40 +00:00
|
|
|
"""Location helpers for Home Assistant."""
|
2021-03-17 17:34:19 +00:00
|
|
|
from __future__ import annotations
|
2016-02-21 19:13:40 +00:00
|
|
|
|
2020-07-03 18:28:44 +00:00
|
|
|
import logging
|
2021-03-17 17:34:19 +00:00
|
|
|
from typing import Sequence
|
2016-08-07 23:26:35 +00:00
|
|
|
|
2020-07-03 18:28:44 +00:00
|
|
|
import voluptuous as vol
|
|
|
|
|
2016-02-21 19:13:40 +00:00
|
|
|
from homeassistant.const import ATTR_LATITUDE, ATTR_LONGITUDE
|
2021-03-27 11:55:24 +00:00
|
|
|
from homeassistant.core import HomeAssistant, State
|
2016-02-21 19:13:40 +00:00
|
|
|
from homeassistant.util import location as loc_util
|
|
|
|
|
2020-07-03 18:28:44 +00:00
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
2016-02-21 19:13:40 +00:00
|
|
|
|
2016-08-07 23:26:35 +00:00
|
|
|
def has_location(state: State) -> bool:
|
2016-11-18 22:35:08 +00:00
|
|
|
"""Test if state contains a valid location.
|
|
|
|
|
|
|
|
Async friendly.
|
|
|
|
"""
|
2019-07-31 19:25:30 +00:00
|
|
|
return (
|
2021-01-30 21:33:53 +00:00
|
|
|
isinstance(state, State)
|
2019-07-31 20:08:31 +00:00
|
|
|
and isinstance(state.attributes.get(ATTR_LATITUDE), float)
|
2019-07-31 19:25:30 +00:00
|
|
|
and isinstance(state.attributes.get(ATTR_LONGITUDE), float)
|
|
|
|
)
|
2016-02-21 19:13:40 +00:00
|
|
|
|
|
|
|
|
2021-03-17 17:34:19 +00:00
|
|
|
def closest(latitude: float, longitude: float, states: Sequence[State]) -> State | None:
|
2016-11-18 22:35:08 +00:00
|
|
|
"""Return closest state to point.
|
|
|
|
|
|
|
|
Async friendly.
|
|
|
|
"""
|
2016-02-21 19:13:40 +00:00
|
|
|
with_location = [state for state in states if has_location(state)]
|
|
|
|
|
|
|
|
if not with_location:
|
|
|
|
return None
|
|
|
|
|
|
|
|
return min(
|
|
|
|
with_location,
|
|
|
|
key=lambda state: loc_util.distance(
|
2018-10-28 19:12:52 +00:00
|
|
|
state.attributes.get(ATTR_LATITUDE),
|
|
|
|
state.attributes.get(ATTR_LONGITUDE),
|
2019-07-31 19:25:30 +00:00
|
|
|
latitude,
|
|
|
|
longitude,
|
2020-10-13 00:17:30 +00:00
|
|
|
)
|
|
|
|
or 0,
|
2016-02-21 19:13:40 +00:00
|
|
|
)
|
2020-07-03 18:28:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
def find_coordinates(
|
2021-03-27 11:55:24 +00:00
|
|
|
hass: HomeAssistant, entity_id: str, recursion_history: list | None = None
|
2021-03-17 17:34:19 +00:00
|
|
|
) -> str | None:
|
2020-07-03 18:28:44 +00:00
|
|
|
"""Find the gps coordinates of the entity in the form of '90.000,180.000'."""
|
|
|
|
entity_state = hass.states.get(entity_id)
|
|
|
|
|
|
|
|
if entity_state is None:
|
|
|
|
_LOGGER.error("Unable to find entity %s", entity_id)
|
|
|
|
return None
|
|
|
|
|
|
|
|
# Check if the entity has location attributes
|
|
|
|
if has_location(entity_state):
|
|
|
|
return _get_location_from_attributes(entity_state)
|
|
|
|
|
|
|
|
# Check if device is in a zone
|
|
|
|
zone_entity = hass.states.get(f"zone.{entity_state.state}")
|
|
|
|
if has_location(zone_entity): # type: ignore
|
|
|
|
_LOGGER.debug(
|
|
|
|
"%s is in %s, getting zone location", entity_id, zone_entity.entity_id # type: ignore
|
|
|
|
)
|
|
|
|
return _get_location_from_attributes(zone_entity) # type: ignore
|
|
|
|
|
|
|
|
# Resolve nested entity
|
|
|
|
if recursion_history is None:
|
|
|
|
recursion_history = []
|
|
|
|
recursion_history.append(entity_id)
|
|
|
|
if entity_state.state in recursion_history:
|
|
|
|
_LOGGER.error(
|
|
|
|
"Circular reference detected while trying to find coordinates of an entity. The state of %s has already been checked",
|
|
|
|
entity_state.state,
|
|
|
|
)
|
|
|
|
return None
|
|
|
|
_LOGGER.debug("Getting nested entity for state: %s", entity_state.state)
|
|
|
|
nested_entity = hass.states.get(entity_state.state)
|
|
|
|
if nested_entity is not None:
|
|
|
|
_LOGGER.debug("Resolving nested entity_id: %s", entity_state.state)
|
|
|
|
return find_coordinates(hass, entity_state.state, recursion_history)
|
|
|
|
|
|
|
|
# Check if state is valid coordinate set
|
|
|
|
try:
|
2020-11-05 05:06:19 +00:00
|
|
|
# Import here, not at top-level to avoid circular import
|
|
|
|
import homeassistant.helpers.config_validation as cv # pylint: disable=import-outside-toplevel
|
|
|
|
|
2020-07-03 18:28:44 +00:00
|
|
|
cv.gps(entity_state.state.split(","))
|
|
|
|
except vol.Invalid:
|
|
|
|
_LOGGER.error(
|
|
|
|
"Entity %s does not contain a location and does not point at an entity that does: %s",
|
|
|
|
entity_id,
|
|
|
|
entity_state.state,
|
|
|
|
)
|
|
|
|
return None
|
|
|
|
else:
|
|
|
|
return entity_state.state
|
|
|
|
|
|
|
|
|
|
|
|
def _get_location_from_attributes(entity_state: State) -> str:
|
|
|
|
"""Get the lat/long string from an entities attributes."""
|
|
|
|
attr = entity_state.attributes
|
|
|
|
return "{},{}".format(attr.get(ATTR_LATITUDE), attr.get(ATTR_LONGITUDE))
|