Add template functions to get area_id and area_name (#54248)

* Add template function to get area_id

* fix int bug

* Prefer area name lookup

* remove unnecessary checks

* fix import

* Add area_name function

* change behavior to fail in ambiguous scenarios

* Revert lotto winning exception checking

* review comments

* try except else
pull/55227/head
Raman Gupta 2021-08-25 15:16:51 -04:00 committed by GitHub
parent d60f5e1721
commit 2f7a7b0309
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 245 additions and 3 deletions

View File

@ -44,6 +44,7 @@ from homeassistant.core import (
) )
from homeassistant.exceptions import TemplateError from homeassistant.exceptions import TemplateError
from homeassistant.helpers import ( from homeassistant.helpers import (
area_registry,
device_registry, device_registry,
entity_registry, entity_registry,
location as loc_helper, location as loc_helper,
@ -949,6 +950,71 @@ def is_device_attr(
return bool(device_attr(hass, device_or_entity_id, attr_name) == attr_value) return bool(device_attr(hass, device_or_entity_id, attr_name) == attr_value)
def area_id(hass: HomeAssistant, lookup_value: str) -> str | None:
"""Get the area ID from an area name, device id, or entity id."""
area_reg = area_registry.async_get(hass)
if area := area_reg.async_get_area_by_name(str(lookup_value)):
return area.id
ent_reg = entity_registry.async_get(hass)
# Import here, not at top-level to avoid circular import
from homeassistant.helpers import ( # pylint: disable=import-outside-toplevel
config_validation as cv,
)
try:
cv.entity_id(lookup_value)
except vol.Invalid:
pass
else:
if entity := ent_reg.async_get(lookup_value):
return entity.area_id
# Check if this could be a device ID (hex string)
dev_reg = device_registry.async_get(hass)
if device := dev_reg.async_get(lookup_value):
return device.area_id
return None
def _get_area_name(area_reg: area_registry.AreaRegistry, valid_area_id: str) -> str:
"""Get area name from valid area ID."""
area = area_reg.async_get_area(valid_area_id)
assert area
return area.name
def area_name(hass: HomeAssistant, lookup_value: str) -> str | None:
"""Get the area name from an area id, device id, or entity id."""
area_reg = area_registry.async_get(hass)
area = area_reg.async_get_area(lookup_value)
if area:
return area.name
ent_reg = entity_registry.async_get(hass)
# Import here, not at top-level to avoid circular import
from homeassistant.helpers import ( # pylint: disable=import-outside-toplevel
config_validation as cv,
)
try:
cv.entity_id(lookup_value)
except vol.Invalid:
pass
else:
if entity := ent_reg.async_get(lookup_value):
if entity.area_id:
return _get_area_name(area_reg, entity.area_id)
return None
dev_reg = device_registry.async_get(hass)
if (device := dev_reg.async_get(lookup_value)) and device.area_id:
return _get_area_name(area_reg, device.area_id)
return None
def closest(hass, *args): def closest(hass, *args):
"""Find closest entity. """Find closest entity.
@ -1532,6 +1598,12 @@ class TemplateEnvironment(ImmutableSandboxedEnvironment):
self.globals["device_id"] = hassfunction(device_id) self.globals["device_id"] = hassfunction(device_id)
self.filters["device_id"] = pass_context(self.globals["device_id"]) self.filters["device_id"] = pass_context(self.globals["device_id"])
self.globals["area_id"] = hassfunction(area_id)
self.filters["area_id"] = pass_context(self.globals["area_id"])
self.globals["area_name"] = hassfunction(area_name)
self.filters["area_name"] = pass_context(self.globals["area_name"])
if limited: if limited:
# Only device_entities is available to limited templates, mark other # Only device_entities is available to limited templates, mark other
# functions and filters as unsupported. # functions and filters as unsupported.
@ -1556,8 +1628,10 @@ class TemplateEnvironment(ImmutableSandboxedEnvironment):
"device_attr", "device_attr",
"is_device_attr", "is_device_attr",
"device_id", "device_id",
"area_id",
"area_name",
] ]
hass_filters = ["closest", "expand", "device_id"] hass_filters = ["closest", "expand", "device_id", "area_id", "area_name"]
for glob in hass_globals: for glob in hass_globals:
self.globals[glob] = unsupported(glob) self.globals[glob] = unsupported(glob)
for filt in hass_filters: for filt in hass_filters:

View File

@ -23,7 +23,12 @@ from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from homeassistant.util.unit_system import UnitSystem from homeassistant.util.unit_system import UnitSystem
from tests.common import MockConfigEntry, mock_device_registry, mock_registry from tests.common import (
MockConfigEntry,
mock_area_registry,
mock_device_registry,
mock_registry,
)
def _set_up_units(hass): def _set_up_units(hass):
@ -1513,7 +1518,7 @@ async def test_expand(hass):
async def test_device_entities(hass): async def test_device_entities(hass):
"""Test expand function.""" """Test device_entities function."""
config_entry = MockConfigEntry(domain="light") config_entry = MockConfigEntry(domain="light")
device_registry = mock_device_registry(hass) device_registry = mock_device_registry(hass)
entity_registry = mock_registry(hass) entity_registry = mock_registry(hass)
@ -1730,6 +1735,169 @@ async def test_device_attr(hass):
assert info.rate_limit is None assert info.rate_limit is None
async def test_area_id(hass):
"""Test area_id function."""
config_entry = MockConfigEntry(domain="light")
device_registry = mock_device_registry(hass)
entity_registry = mock_registry(hass)
area_registry = mock_area_registry(hass)
# Test non existing entity id
info = render_to_info(hass, "{{ area_id('sensor.fake') }}")
assert_result_info(info, None)
assert info.rate_limit is None
# Test non existing device id (hex value)
info = render_to_info(hass, "{{ area_id('123abc') }}")
assert_result_info(info, None)
assert info.rate_limit is None
# Test non existing area name
info = render_to_info(hass, "{{ area_id('fake area name') }}")
assert_result_info(info, None)
assert info.rate_limit is None
# Test wrong value type
info = render_to_info(hass, "{{ area_id(56) }}")
assert_result_info(info, None)
assert info.rate_limit is None
area_entry_entity_id = area_registry.async_get_or_create("sensor.fake")
# Test device with single entity, which has no area
device_entry = device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")},
)
entity_entry = entity_registry.async_get_or_create(
"light",
"hue",
"5678",
config_entry=config_entry,
device_id=device_entry.id,
)
info = render_to_info(hass, f"{{{{ area_id('{device_entry.id}') }}}}")
assert_result_info(info, None)
assert info.rate_limit is None
info = render_to_info(hass, f"{{{{ area_id('{entity_entry.entity_id}') }}}}")
assert_result_info(info, None)
assert info.rate_limit is None
# Test device ID, entity ID and area name as input with area name that looks like
# a device ID. Try a filter too
area_entry_hex = area_registry.async_get_or_create("123abc")
device_entry = device_registry.async_update_device(
device_entry.id, area_id=area_entry_hex.id
)
entity_entry = entity_registry.async_update_entity(
entity_entry.entity_id, area_id=area_entry_hex.id
)
info = render_to_info(hass, f"{{{{ '{device_entry.id}' | area_id }}}}")
assert_result_info(info, area_entry_hex.id)
assert info.rate_limit is None
info = render_to_info(hass, f"{{{{ area_id('{entity_entry.entity_id}') }}}}")
assert_result_info(info, area_entry_hex.id)
assert info.rate_limit is None
info = render_to_info(hass, f"{{{{ area_id('{area_entry_hex.name}') }}}}")
assert_result_info(info, area_entry_hex.id)
assert info.rate_limit is None
# Test device ID, entity ID and area name as input with area name that looks like an
# entity ID
area_entry_entity_id = area_registry.async_get_or_create("sensor.fake")
device_entry = device_registry.async_update_device(
device_entry.id, area_id=area_entry_entity_id.id
)
entity_entry = entity_registry.async_update_entity(
entity_entry.entity_id, area_id=area_entry_entity_id.id
)
info = render_to_info(hass, f"{{{{ area_id('{device_entry.id}') }}}}")
assert_result_info(info, area_entry_entity_id.id)
assert info.rate_limit is None
info = render_to_info(hass, f"{{{{ area_id('{entity_entry.entity_id}') }}}}")
assert_result_info(info, area_entry_entity_id.id)
assert info.rate_limit is None
info = render_to_info(hass, f"{{{{ area_id('{area_entry_entity_id.name}') }}}}")
assert_result_info(info, area_entry_entity_id.id)
assert info.rate_limit is None
async def test_area_name(hass):
"""Test area_name function."""
config_entry = MockConfigEntry(domain="light")
device_registry = mock_device_registry(hass)
entity_registry = mock_registry(hass)
area_registry = mock_area_registry(hass)
# Test non existing entity id
info = render_to_info(hass, "{{ area_name('sensor.fake') }}")
assert_result_info(info, None)
assert info.rate_limit is None
# Test non existing device id (hex value)
info = render_to_info(hass, "{{ area_name('123abc') }}")
assert_result_info(info, None)
assert info.rate_limit is None
# Test non existing area id
info = render_to_info(hass, "{{ area_name('1234567890') }}")
assert_result_info(info, None)
assert info.rate_limit is None
# Test wrong value type
info = render_to_info(hass, "{{ area_name(56) }}")
assert_result_info(info, None)
assert info.rate_limit is None
# Test device with single entity, which has no area
device_entry = device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")},
)
entity_entry = entity_registry.async_get_or_create(
"light",
"hue",
"5678",
config_entry=config_entry,
device_id=device_entry.id,
)
info = render_to_info(hass, f"{{{{ area_name('{device_entry.id}') }}}}")
assert_result_info(info, None)
assert info.rate_limit is None
info = render_to_info(hass, f"{{{{ area_name('{entity_entry.entity_id}') }}}}")
assert_result_info(info, None)
assert info.rate_limit is None
# Test device ID, entity ID and area id as input. Try a filter too
area_entry = area_registry.async_get_or_create("123abc")
device_entry = device_registry.async_update_device(
device_entry.id, area_id=area_entry.id
)
entity_entry = entity_registry.async_update_entity(
entity_entry.entity_id, area_id=area_entry.id
)
info = render_to_info(hass, f"{{{{ '{device_entry.id}' | area_name }}}}")
assert_result_info(info, area_entry.name)
assert info.rate_limit is None
info = render_to_info(hass, f"{{{{ area_name('{entity_entry.entity_id}') }}}}")
assert_result_info(info, area_entry.name)
assert info.rate_limit is None
info = render_to_info(hass, f"{{{{ area_name('{area_entry.id}') }}}}")
assert_result_info(info, area_entry.name)
assert info.rate_limit is None
def test_closest_function_to_coord(hass): def test_closest_function_to_coord(hass):
"""Test closest function to coord.""" """Test closest function to coord."""
hass.states.async_set( hass.states.async_set(