Add extra validation in private_ble_device config flow (#101254)
parent
100b6fd06f
commit
e23e71279f
|
@ -19,6 +19,30 @@ _LOGGER = logging.getLogger(__name__)
|
|||
CONF_IRK = "irk"
|
||||
|
||||
|
||||
def _parse_irk(irk: str) -> bytes | None:
|
||||
if irk.startswith("irk:"):
|
||||
irk = irk[4:]
|
||||
|
||||
if irk.endswith("="):
|
||||
try:
|
||||
irk_bytes = bytes(reversed(base64.b64decode(irk)))
|
||||
except binascii.Error:
|
||||
# IRK is not valid base64
|
||||
return None
|
||||
else:
|
||||
try:
|
||||
irk_bytes = binascii.unhexlify(irk)
|
||||
except binascii.Error:
|
||||
# IRK is not correctly hex encoded
|
||||
return None
|
||||
|
||||
if len(irk_bytes) != 16:
|
||||
# IRK must be 16 bytes when decoded
|
||||
return None
|
||||
|
||||
return irk_bytes
|
||||
|
||||
|
||||
class BLEDeviceTrackerConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||
"""Handle a config flow for BLE Device Tracker."""
|
||||
|
||||
|
@ -35,15 +59,8 @@ class BLEDeviceTrackerConfigFlow(ConfigFlow, domain=DOMAIN):
|
|||
|
||||
if user_input is not None:
|
||||
irk = user_input[CONF_IRK]
|
||||
if irk.startswith("irk:"):
|
||||
irk = irk[4:]
|
||||
|
||||
if irk.endswith("="):
|
||||
irk_bytes = bytes(reversed(base64.b64decode(irk)))
|
||||
else:
|
||||
irk_bytes = binascii.unhexlify(irk)
|
||||
|
||||
if len(irk_bytes) != 16:
|
||||
if not (irk_bytes := _parse_irk(irk)):
|
||||
errors[CONF_IRK] = "irk_not_valid"
|
||||
elif not (service_info := async_last_service_info(self.hass, irk_bytes)):
|
||||
errors[CONF_IRK] = "irk_not_found"
|
||||
|
|
|
@ -42,6 +42,32 @@ async def test_invalid_irk(hass: HomeAssistant, enable_bluetooth: None) -> None:
|
|||
assert_form_error(result, "irk", "irk_not_valid")
|
||||
|
||||
|
||||
async def test_invalid_irk_base64(hass: HomeAssistant, enable_bluetooth: None) -> None:
|
||||
"""Test invalid irk."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
const.DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
assert result["type"] == "form"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={"irk": "Ucredacted4T8n!!ZZZ=="}
|
||||
)
|
||||
assert_form_error(result, "irk", "irk_not_valid")
|
||||
|
||||
|
||||
async def test_invalid_irk_hex(hass: HomeAssistant, enable_bluetooth: None) -> None:
|
||||
"""Test invalid irk."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
const.DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
assert result["type"] == "form"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], user_input={"irk": "irk:abcdefghi"}
|
||||
)
|
||||
assert_form_error(result, "irk", "irk_not_valid")
|
||||
|
||||
|
||||
async def test_irk_not_found(hass: HomeAssistant, enable_bluetooth: None) -> None:
|
||||
"""Test irk not found."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
|
|
Loading…
Reference in New Issue