Add iBeacon UUID allowlist (#104790)

pull/108331/head
Kostas Chatzikokolakis 2024-01-19 01:06:11 +02:00 committed by GitHub
parent 7c6fe31505
commit 6e8d491dae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 308 additions and 9 deletions

View File

@ -2,12 +2,17 @@
from __future__ import annotations
from typing import Any
from uuid import UUID
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import bluetooth
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
import homeassistant.helpers.config_validation as cv
from .const import DOMAIN
from .const import CONF_ALLOW_NAMELESS_UUIDS, DOMAIN
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
@ -29,3 +34,61 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
return self.async_create_entry(title="iBeacon Tracker", data={})
return self.async_show_form(step_id="user")
@staticmethod
@callback
def async_get_options_flow(
config_entry: config_entries.ConfigEntry,
) -> OptionsFlow:
"""Get the options flow for this handler."""
return OptionsFlow(config_entry)
class OptionsFlow(config_entries.OptionsFlow):
"""Handle options."""
def __init__(self, config_entry: config_entries.ConfigEntry) -> None:
"""Initialize options flow."""
self.config_entry = config_entry
async def async_step_init(self, user_input: dict | None = None) -> FlowResult:
"""Manage the options."""
errors = {}
current_uuids = self.config_entry.options.get(CONF_ALLOW_NAMELESS_UUIDS, [])
new_uuid = None
if user_input is not None:
if new_uuid := user_input.get("new_uuid", "").lower():
try:
# accept non-standard formats that can be fixed by UUID
new_uuid = str(UUID(new_uuid))
except ValueError:
errors["new_uuid"] = "invalid_uuid_format"
if not errors:
# don't modify current_uuids in memory, cause HA will think that the new
# data is equal to the old, and will refuse to write them to disk.
updated_uuids = user_input.get("allow_nameless_uuids", [])
if new_uuid and new_uuid not in updated_uuids:
updated_uuids.append(new_uuid)
data = {CONF_ALLOW_NAMELESS_UUIDS: list(updated_uuids)}
return self.async_create_entry(title="", data=data)
schema = {
vol.Optional(
"new_uuid",
description={"suggested_value": new_uuid},
): str,
}
if current_uuids:
schema |= {
vol.Optional(
"allow_nameless_uuids",
default=current_uuids,
): cv.multi_select(sorted(current_uuids))
}
return self.async_show_form(
step_id="init", errors=errors, data_schema=vol.Schema(schema)
)

View File

@ -46,3 +46,4 @@ MIN_SEEN_TRANSIENT_NEW = (
CONF_IGNORE_ADDRESSES = "ignore_addresses"
CONF_IGNORE_UUIDS = "ignore_uuids"
CONF_ALLOW_NAMELESS_UUIDS = "allow_nameless_uuids"

View File

@ -2,6 +2,7 @@
from __future__ import annotations
from datetime import datetime
import logging
import time
from ibeacon_ble import (
@ -21,6 +22,7 @@ from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_track_time_interval
from .const import (
CONF_ALLOW_NAMELESS_UUIDS,
CONF_IGNORE_ADDRESSES,
CONF_IGNORE_UUIDS,
DOMAIN,
@ -34,6 +36,8 @@ from .const import (
UPDATE_INTERVAL,
)
_LOGGER = logging.getLogger(__name__)
MONOTONIC_TIME = time.monotonic
@ -141,6 +145,16 @@ class IBeaconCoordinator:
# iBeacons with random MAC addresses, fixed UUID, random major/minor
self._major_minor_by_uuid: dict[str, set[tuple[int, int]]] = {}
# iBeacons from devices with no name
self._allow_nameless_uuids = set(
entry.options.get(CONF_ALLOW_NAMELESS_UUIDS, [])
)
self._ignored_nameless_by_uuid: dict[str, set[str]] = {}
self._entry.async_on_unload(
self._entry.add_update_listener(self.async_config_entry_updated)
)
@callback
def async_device_id_seen(self, device_id: str) -> bool:
"""Return True if the device_id has been seen since boot."""
@ -248,6 +262,8 @@ class IBeaconCoordinator:
if uuid_str in self._ignore_uuids:
return
_LOGGER.debug("update beacon %s", uuid_str)
major = ibeacon_advertisement.major
minor = ibeacon_advertisement.minor
major_minor_by_uuid = self._major_minor_by_uuid.setdefault(uuid_str, set())
@ -296,12 +312,24 @@ class IBeaconCoordinator:
address = service_info.address
unique_id = f"{group_id}_{address}"
new = unique_id not in self._last_ibeacon_advertisement_by_unique_id
# Reject creating new trackers if the name is not set
if new and (
service_info.device.name is None
or service_info.device.name.replace("-", ":") == service_info.device.address
uuid = str(ibeacon_advertisement.uuid)
# Reject creating new trackers if the name is not set (unless the uuid is allowlisted).
if (
new
and uuid not in self._allow_nameless_uuids
and (
service_info.device.name is None
or service_info.device.name.replace("-", ":")
== service_info.device.address
)
):
# Store the ignored addresses, cause the uuid might be allowlisted later
self._ignored_nameless_by_uuid.setdefault(uuid, set()).add(address)
_LOGGER.debug("ignoring new beacon %s due to empty device name", unique_id)
return
previously_tracked = address in self._unique_ids_by_address
self._last_ibeacon_advertisement_by_unique_id[unique_id] = ibeacon_advertisement
self._async_track_ibeacon_with_unique_address(address, group_id, unique_id)
@ -428,6 +456,33 @@ class IBeaconCoordinator:
ibeacon_advertisement,
)
async def async_config_entry_updated(
self, hass: HomeAssistant, config_entry: ConfigEntry
) -> None:
"""Restore ignored nameless beacons when the allowlist is updated."""
self._allow_nameless_uuids = set(
self._entry.options.get(CONF_ALLOW_NAMELESS_UUIDS, [])
)
for uuid in self._allow_nameless_uuids:
for address in self._ignored_nameless_by_uuid.pop(uuid, set()):
_LOGGER.debug(
"restoring nameless iBeacon %s from address %s", uuid, address
)
if not (
service_info := bluetooth.async_last_service_info(
self.hass, address, connectable=False
)
):
continue # no longer available
# the beacon was ignored, we need to re-process it from scratch
self._async_update_ibeacon(
service_info, bluetooth.BluetoothChange.ADVERTISEMENT
)
@callback
def _async_update(self, _now: datetime) -> None:
"""Update the Coordinator."""

View File

@ -13,11 +13,15 @@
"options": {
"step": {
"init": {
"description": "iBeacons with an RSSI value lower than the Minimum RSSI will be ignored. If the integration is seeing neighboring iBeacons, increasing this value may help.",
"description": "iBeacons with an empty device name are ignored by default, unless their UUID is explicitly allowed in the list below.",
"data": {
"min_rssi": "Minimum RSSI"
"new_uuid": "Enter a new allowed UUID",
"allow_nameless_uuids": "Currently allowed UUIDs. Uncheck to remove"
}
}
},
"error": {
"invalid_uuid_format": "UUIDs should contain 32 hex characters grouped as XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
}
},
"entity": {

View File

@ -2,7 +2,7 @@
from unittest.mock import patch
from homeassistant import config_entries
from homeassistant.components.ibeacon.const import DOMAIN
from homeassistant.components.ibeacon.const import CONF_ALLOW_NAMELESS_UUIDS, DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
@ -49,3 +49,70 @@ async def test_setup_user_already_setup(
)
assert result["type"] == FlowResultType.ABORT
assert result["reason"] == "single_instance_allowed"
async def test_options_flow(hass: HomeAssistant, enable_bluetooth: None) -> None:
"""Test config flow options."""
config_entry = MockConfigEntry(domain=DOMAIN)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "init"
# test save invalid uuid
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
"new_uuid": "invalid",
},
)
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "init"
assert result["errors"] == {"new_uuid": "invalid_uuid_format"}
# test save new uuid
uuid = "daa4b6bb-b77a-4662-aeb8-b3ed56454091"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
"new_uuid": uuid,
},
)
assert result["type"] == FlowResultType.CREATE_ENTRY
assert result["data"] == {CONF_ALLOW_NAMELESS_UUIDS: [uuid]}
# test save duplicate uuid
result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_ALLOW_NAMELESS_UUIDS: [uuid],
"new_uuid": uuid,
},
)
assert result["type"] == FlowResultType.CREATE_ENTRY
assert result["data"] == {CONF_ALLOW_NAMELESS_UUIDS: [uuid]}
# delete
result = await hass.config_entries.options.async_init(config_entry.entry_id)
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_ALLOW_NAMELESS_UUIDS: [],
},
)
assert result["type"] == FlowResultType.CREATE_ENTRY
assert result["data"] == {CONF_ALLOW_NAMELESS_UUIDS: []}

View File

@ -4,7 +4,15 @@ import time
import pytest
from homeassistant.components.ibeacon.const import ATTR_SOURCE, DOMAIN, UPDATE_INTERVAL
from homeassistant.components.bluetooth import (
FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS,
)
from homeassistant.components.ibeacon.const import (
ATTR_SOURCE,
CONF_ALLOW_NAMELESS_UUIDS,
DOMAIN,
UPDATE_INTERVAL,
)
from homeassistant.const import STATE_HOME
from homeassistant.core import HomeAssistant
from homeassistant.helpers.service_info.bluetooth import BluetoothServiceInfo
@ -26,6 +34,7 @@ from tests.components.bluetooth import (
inject_advertisement_with_time_and_source_connectable,
inject_bluetooth_service_info,
patch_all_discovered_devices,
patch_bluetooth_time,
)
@ -146,6 +155,106 @@ async def test_ignore_default_name(hass: HomeAssistant) -> None:
assert len(hass.states.async_entity_ids()) == before_entity_count
async def test_default_name_allowlisted(hass: HomeAssistant) -> None:
"""Test we do NOT ignore beacons with default device name but allowlisted UUID."""
entry = MockConfigEntry(
domain=DOMAIN,
options={CONF_ALLOW_NAMELESS_UUIDS: ["426c7565-4368-6172-6d42-6561636f6e73"]},
)
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
before_entity_count = len(hass.states.async_entity_ids())
inject_bluetooth_service_info(
hass,
replace(
BLUECHARM_BEACON_SERVICE_INFO_DBUS,
name=BLUECHARM_BEACON_SERVICE_INFO_DBUS.address,
),
)
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids()) > before_entity_count
async def test_default_name_allowlisted_restore(hass: HomeAssistant) -> None:
"""Test that ignored nameless iBeacons are restored when allowlist entry is added."""
entry = MockConfigEntry(
domain=DOMAIN,
)
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
before_entity_count = len(hass.states.async_entity_ids())
inject_bluetooth_service_info(
hass,
replace(
BLUECHARM_BEACON_SERVICE_INFO_DBUS,
name=BLUECHARM_BEACON_SERVICE_INFO_DBUS.address,
),
)
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids()) == before_entity_count
result = await hass.config_entries.options.async_init(entry.entry_id)
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={"new_uuid": "426c7565-4368-6172-6d42-6561636f6e73"},
)
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids()) > before_entity_count
async def test_default_name_allowlisted_restore_late(hass: HomeAssistant) -> None:
"""Test that allowlisting an ignored but no longer advertised nameless iBeacon has no effect."""
start_monotonic = time.monotonic()
entry = MockConfigEntry(
domain=DOMAIN,
)
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
before_entity_count = len(hass.states.async_entity_ids())
inject_bluetooth_service_info(
hass,
replace(
BLUECHARM_BEACON_SERVICE_INFO_DBUS,
name=BLUECHARM_BEACON_SERVICE_INFO_DBUS.address,
),
)
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids()) == before_entity_count
# Fastforward time until the device is no longer advertised
monotonic_now = start_monotonic + FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
with patch_bluetooth_time(
monotonic_now,
), patch_all_discovered_devices([]):
async_fire_time_changed(
hass,
dt_util.utcnow()
+ timedelta(seconds=FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1),
)
await hass.async_block_till_done()
result = await hass.config_entries.options.async_init(entry.entry_id)
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={"new_uuid": "426c7565-4368-6172-6d42-6561636f6e73"},
)
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids()) == before_entity_count
async def test_rotating_major_minor_and_mac_with_name(hass: HomeAssistant) -> None:
"""Test the different uuid, major, minor from many addresses removes all associated entities."""
entry = MockConfigEntry(