Add reauthentication flow to airOS (#153076)

Co-authored-by: G Johansson <goran.johansson@shiftit.se>
pull/146772/head^2
Tom 2025-09-27 23:28:14 +02:00 committed by GitHub
parent 96d51965e5
commit 1ebf096a33
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 218 additions and 51 deletions

View File

@ -2,6 +2,7 @@
from __future__ import annotations
from collections.abc import Mapping
import logging
from typing import Any
@ -14,7 +15,7 @@ from airos.exceptions import (
)
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.config_entries import SOURCE_REAUTH, ConfigFlow, ConfigFlowResult
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
@ -24,6 +25,11 @@ from homeassistant.const import (
)
from homeassistant.data_entry_flow import section
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.selector import (
TextSelector,
TextSelectorConfig,
TextSelectorType,
)
from .const import DEFAULT_SSL, DEFAULT_VERIFY_SSL, DOMAIN, SECTION_ADVANCED_SETTINGS
from .coordinator import AirOS8
@ -54,50 +60,107 @@ class AirOSConfigFlow(ConfigFlow, domain=DOMAIN):
VERSION = 1
MINOR_VERSION = 2
def __init__(self) -> None:
"""Initialize the config flow."""
super().__init__()
self.airos_device: AirOS8
self.errors: dict[str, str] = {}
async def async_step_user(
self,
user_input: dict[str, Any] | None = None,
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
errors: dict[str, str] = {}
"""Handle the manual input of host and credentials."""
self.errors = {}
if user_input is not None:
# By default airOS 8 comes with self-signed SSL certificates,
# with no option in the web UI to change or upload a custom certificate.
session = async_get_clientsession(
self.hass,
verify_ssl=user_input[SECTION_ADVANCED_SETTINGS][CONF_VERIFY_SSL],
)
airos_device = AirOS8(
host=user_input[CONF_HOST],
username=user_input[CONF_USERNAME],
password=user_input[CONF_PASSWORD],
session=session,
use_ssl=user_input[SECTION_ADVANCED_SETTINGS][CONF_SSL],
)
try:
await airos_device.login()
airos_data = await airos_device.status()
except (
AirOSConnectionSetupError,
AirOSDeviceConnectionError,
):
errors["base"] = "cannot_connect"
except (AirOSConnectionAuthenticationError, AirOSDataMissingError):
errors["base"] = "invalid_auth"
except AirOSKeyDataMissingError:
errors["base"] = "key_data_missing"
except Exception:
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
await self.async_set_unique_id(airos_data.derived.mac)
self._abort_if_unique_id_configured()
validated_info = await self._validate_and_get_device_info(user_input)
if validated_info:
return self.async_create_entry(
title=airos_data.host.hostname, data=user_input
title=validated_info["title"],
data=validated_info["data"],
)
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA, errors=self.errors
)
async def _validate_and_get_device_info(
self, config_data: dict[str, Any]
) -> dict[str, Any] | None:
"""Validate user input with the device API."""
# By default airOS 8 comes with self-signed SSL certificates,
# with no option in the web UI to change or upload a custom certificate.
session = async_get_clientsession(
self.hass,
verify_ssl=config_data[SECTION_ADVANCED_SETTINGS][CONF_VERIFY_SSL],
)
airos_device = AirOS8(
host=config_data[CONF_HOST],
username=config_data[CONF_USERNAME],
password=config_data[CONF_PASSWORD],
session=session,
use_ssl=config_data[SECTION_ADVANCED_SETTINGS][CONF_SSL],
)
try:
await airos_device.login()
airos_data = await airos_device.status()
except (
AirOSConnectionSetupError,
AirOSDeviceConnectionError,
):
self.errors["base"] = "cannot_connect"
except (AirOSConnectionAuthenticationError, AirOSDataMissingError):
self.errors["base"] = "invalid_auth"
except AirOSKeyDataMissingError:
self.errors["base"] = "key_data_missing"
except Exception:
_LOGGER.exception("Unexpected exception during credential validation")
self.errors["base"] = "unknown"
else:
await self.async_set_unique_id(airos_data.derived.mac)
if self.source == SOURCE_REAUTH:
self._abort_if_unique_id_mismatch()
else:
self._abort_if_unique_id_configured()
return {"title": airos_data.host.hostname, "data": config_data}
return None
async def async_step_reauth(
self,
user_input: Mapping[str, Any],
) -> ConfigFlowResult:
"""Perform reauthentication upon an API authentication error."""
return await self.async_step_reauth_confirm(user_input)
async def async_step_reauth_confirm(
self,
user_input: Mapping[str, Any],
) -> ConfigFlowResult:
"""Perform reauthentication upon an API authentication error."""
self.errors = {}
if user_input:
validate_data = {**self._get_reauth_entry().data, **user_input}
if await self._validate_and_get_device_info(config_data=validate_data):
return self.async_update_reload_and_abort(
self._get_reauth_entry(),
data_updates=validate_data,
)
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA, errors=errors
step_id="reauth_confirm",
data_schema=vol.Schema(
{
vol.Required(CONF_PASSWORD): TextSelector(
TextSelectorConfig(
type=TextSelectorType.PASSWORD,
autocomplete="current-password",
)
),
}
),
errors=self.errors,
)

View File

@ -14,7 +14,7 @@ from airos.exceptions import (
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN, SCAN_INTERVAL
@ -47,9 +47,9 @@ class AirOSDataUpdateCoordinator(DataUpdateCoordinator[AirOS8Data]):
try:
await self.airos_device.login()
return await self.airos_device.status()
except (AirOSConnectionAuthenticationError,) as err:
except AirOSConnectionAuthenticationError as err:
_LOGGER.exception("Error authenticating with airOS device")
raise ConfigEntryError(
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN, translation_key="invalid_auth"
) from err
except (

View File

@ -2,6 +2,14 @@
"config": {
"flow_title": "Ubiquiti airOS device",
"step": {
"reauth_confirm": {
"data": {
"password": "[%key:common::config_flow::data::password%]"
},
"data_description": {
"password": "[%key:component::airos::config::step::user::data_description::password%]"
}
},
"user": {
"data": {
"host": "[%key:common::config_flow::data::host%]",
@ -34,7 +42,9 @@
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]",
"unique_id_mismatch": "Re-authentication should be used for the same device not a new one"
}
},
"entity": {

View File

@ -24,6 +24,9 @@ from homeassistant.data_entry_flow import FlowResultType
from tests.common import MockConfigEntry
NEW_PASSWORD = "new_password"
REAUTH_STEP = "reauth_confirm"
MOCK_CONFIG = {
CONF_HOST: "1.1.1.1",
CONF_USERNAME: "ubnt",
@ -33,6 +36,11 @@ MOCK_CONFIG = {
CONF_VERIFY_SSL: False,
},
}
MOCK_CONFIG_REAUTH = {
CONF_HOST: "1.1.1.1",
CONF_USERNAME: "ubnt",
CONF_PASSWORD: "wrong-password",
}
async def test_form_creates_entry(
@ -89,7 +97,6 @@ async def test_form_duplicate_entry(
@pytest.mark.parametrize(
("exception", "error"),
[
(AirOSConnectionAuthenticationError, "invalid_auth"),
(AirOSDeviceConnectionError, "cannot_connect"),
(AirOSKeyDataMissingError, "key_data_missing"),
(Exception, "unknown"),
@ -128,3 +135,95 @@ async def test_form_exception_handling(
assert result["title"] == "NanoStation 5AC ap name"
assert result["data"] == MOCK_CONFIG
assert len(mock_setup_entry.mock_calls) == 1
@pytest.mark.parametrize(
("reauth_exception", "expected_error"),
[
(None, None),
(AirOSConnectionAuthenticationError, "invalid_auth"),
(AirOSDeviceConnectionError, "cannot_connect"),
(AirOSKeyDataMissingError, "key_data_missing"),
(Exception, "unknown"),
],
ids=[
"reauth_succes",
"invalid_auth",
"cannot_connect",
"key_data_missing",
"unknown",
],
)
async def test_reauth_flow_scenarios(
hass: HomeAssistant,
mock_airos_client: AsyncMock,
mock_config_entry: MockConfigEntry,
reauth_exception: Exception,
expected_error: str,
) -> None:
"""Test reauthentication from start (failure) to finish (success)."""
mock_config_entry.add_to_hass(hass)
mock_airos_client.login.side_effect = AirOSConnectionAuthenticationError
await hass.config_entries.async_setup(mock_config_entry.entry_id)
flows = hass.config_entries.flow.async_progress()
assert len(flows) == 1
flow = flows[0]
assert flow["step_id"] == REAUTH_STEP
mock_airos_client.login.side_effect = reauth_exception
result = await hass.config_entries.flow.async_configure(
flow["flow_id"],
user_input={CONF_PASSWORD: NEW_PASSWORD},
)
if expected_error:
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == REAUTH_STEP
assert result["errors"] == {"base": expected_error}
# Retry
mock_airos_client.login.side_effect = None
result = await hass.config_entries.flow.async_configure(
flow["flow_id"],
user_input={CONF_PASSWORD: NEW_PASSWORD},
)
# Always test resolution
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "reauth_successful"
updated_entry = hass.config_entries.async_get_entry(mock_config_entry.entry_id)
assert updated_entry.data[CONF_PASSWORD] == NEW_PASSWORD
async def test_reauth_unique_id_mismatch(
hass: HomeAssistant,
mock_airos_client: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test reauthentication failure when the unique ID changes."""
mock_config_entry.add_to_hass(hass)
mock_airos_client.login.side_effect = AirOSConnectionAuthenticationError
await hass.config_entries.async_setup(mock_config_entry.entry_id)
flows = hass.config_entries.flow.async_progress()
flow = flows[0]
mock_airos_client.login.side_effect = None
mock_airos_client.status.return_value.derived.mac = "FF:23:45:67:89:AB"
result = await hass.config_entries.flow.async_configure(
flow["flow_id"],
user_input={CONF_PASSWORD: NEW_PASSWORD},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "unique_id_mismatch"
updated_entry = hass.config_entries.async_get_entry(mock_config_entry.entry_id)
assert updated_entry.data[CONF_PASSWORD] != NEW_PASSWORD

View File

@ -3,11 +3,7 @@
from datetime import timedelta
from unittest.mock import AsyncMock
from airos.exceptions import (
AirOSConnectionAuthenticationError,
AirOSDataMissingError,
AirOSDeviceConnectionError,
)
from airos.exceptions import AirOSDataMissingError, AirOSDeviceConnectionError
from freezegun.api import FrozenDateTimeFactory
import pytest
from syrupy.assertion import SnapshotAssertion
@ -39,7 +35,6 @@ async def test_all_entities(
@pytest.mark.parametrize(
("exception"),
[
AirOSConnectionAuthenticationError,
TimeoutError,
AirOSDeviceConnectionError,
AirOSDataMissingError,