Ensure that email-based 2FA in SimpliSafe shows the progress UI (#71021)

pull/71015/head^2
Aaron Bach 2022-04-28 15:05:55 -06:00 committed by GitHub
parent 8883f5482b
commit d791a08002
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 95 additions and 55 deletions

View File

@ -49,13 +49,14 @@ class SimpliSafeFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
def __init__(self) -> None:
"""Initialize the config flow."""
self._email_2fa_task: asyncio.Task | None = None
self._password: str | None = None
self._reauth: bool = False
self._simplisafe: API | None = None
self._username: str | None = None
async def _async_authenticate(
self, error_step_id: str, error_schema: vol.Schema
self, originating_step_id: str, originating_step_schema: vol.Schema
) -> FlowResult:
"""Attempt to authenticate to the SimpliSafe API."""
assert self._password
@ -76,8 +77,8 @@ class SimpliSafeFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
if errors:
return self.async_show_form(
step_id=error_step_id,
data_schema=error_schema,
step_id=originating_step_id,
data_schema=originating_step_schema,
errors=errors,
description_placeholders={CONF_USERNAME: self._username},
)
@ -86,6 +87,31 @@ class SimpliSafeFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
if self._simplisafe.auth_state == AuthStates.PENDING_2FA_SMS:
return await self.async_step_sms_2fa()
return await self.async_step_email_2fa()
@staticmethod
@callback
def async_get_options_flow(
config_entry: ConfigEntry,
) -> SimpliSafeOptionsFlowHandler:
"""Define the config flow to handle options."""
return SimpliSafeOptionsFlowHandler(config_entry)
async def async_step_reauth(self, config: dict[str, Any]) -> FlowResult:
"""Handle configuration by re-auth."""
self._reauth = True
if CONF_USERNAME not in config:
# Old versions of the config flow may not have the username by this point;
# in that case, we reauth them by making them go through the user flow:
return await self.async_step_user()
self._username = config[CONF_USERNAME]
return await self.async_step_reauth_confirm()
async def _async_get_email_2fa(self) -> None:
"""Define a task to wait for email-based 2FA."""
assert self._simplisafe
try:
async with async_timeout.timeout(DEFAULT_EMAIL_2FA_TIMEOUT):
@ -97,17 +123,39 @@ class SimpliSafeFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
await asyncio.sleep(DEFAULT_EMAIL_2FA_SLEEP)
else:
break
except asyncio.TimeoutError:
return self.async_show_form(
step_id="user",
data_schema=STEP_USER_SCHEMA,
errors={"base": "2fa_timed_out"},
finally:
self.hass.async_create_task(
self.hass.config_entries.flow.async_configure(flow_id=self.flow_id)
)
return await self._async_finish_setup()
async def async_step_email_2fa(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle email-based two-factor authentication."""
if not self._email_2fa_task:
self._email_2fa_task = self.hass.async_create_task(
self._async_get_email_2fa()
)
return self.async_show_progress(
step_id="email_2fa", progress_action="email_2fa"
)
async def _async_finish_setup(self) -> FlowResult:
"""Complete setup with an authenticated API object."""
try:
await self._email_2fa_task
except asyncio.TimeoutError:
return self.async_show_progress_done(next_step_id="email_2fa_error")
return self.async_show_progress_done(next_step_id="finish")
async def async_step_email_2fa_error(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle an error during email-based two-factor authentication."""
return self.async_abort(reason="email_2fa_timed_out")
async def async_step_finish(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the final step."""
assert self._simplisafe
assert self._username
@ -142,26 +190,6 @@ class SimpliSafeFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
self._abort_if_unique_id_configured()
return self.async_create_entry(title=self._username, data=data)
@staticmethod
@callback
def async_get_options_flow(
config_entry: ConfigEntry,
) -> SimpliSafeOptionsFlowHandler:
"""Define the config flow to handle options."""
return SimpliSafeOptionsFlowHandler(config_entry)
async def async_step_reauth(self, config: dict[str, Any]) -> FlowResult:
"""Handle configuration by re-auth."""
self._reauth = True
if CONF_USERNAME not in config:
# Old versions of the config flow may not have the username by this point;
# in that case, we reauth them by making them go through the user flow:
return await self.async_step_user()
self._username = config[CONF_USERNAME]
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
@ -197,7 +225,7 @@ class SimpliSafeFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
errors={CONF_CODE: "invalid_auth"},
)
return await self._async_finish_setup()
return await self.async_step_finish()
async def async_step_user(
self, user_input: dict[str, Any] | None = None

View File

@ -23,13 +23,16 @@
}
},
"error": {
"2fa_timed_out": "Timed out while waiting for two-factor authentication",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"already_configured": "This SimpliSafe account is already in use.",
"email_2fa_timed_out": "Timed out while waiting for email-based two-factor authentication.",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
},
"progress": {
"email_2fa": "Input the two-factor authentication code\nsent to you via email."
}
},
"options": {

View File

@ -2,13 +2,16 @@
"config": {
"abort": {
"already_configured": "This SimpliSafe account is already in use.",
"email_2fa_timed_out": "Timed out while waiting for email-based two-factor authentication.",
"reauth_successful": "Re-authentication was successful"
},
"error": {
"2fa_timed_out": "Timed out while waiting for two-factor authentication",
"invalid_auth": "Invalid authentication",
"unknown": "Unexpected error"
},
"progress": {
"email_2fa": "Input the two-factor authentication code\nsent to you via email."
},
"step": {
"reauth_confirm": {
"data": {

View File

@ -102,6 +102,8 @@ def reauth_config_fixture():
async def setup_simplisafe_fixture(hass, api, config):
"""Define a fixture to set up SimpliSafe."""
with patch(
"homeassistant.components.simplisafe.config_flow.DEFAULT_EMAIL_2FA_SLEEP", 0
), patch(
"homeassistant.components.simplisafe.config_flow.API.async_from_credentials",
return_value=api,
), patch(

View File

@ -118,6 +118,7 @@ async def test_step_reauth_errors(hass, config, error_string, exc, reauth_config
result["flow_id"], user_input=reauth_config
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "reauth_confirm"
assert result["errors"] == {"base": error_string}
@ -191,12 +192,13 @@ async def test_step_user_errors(hass, credentials_config, error_string, exc):
result["flow_id"], user_input=credentials_config
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "user"
assert result["errors"] == {"base": error_string}
@pytest.mark.parametrize("api_auth_state", [AuthStates.PENDING_2FA_EMAIL])
async def test_step_user_email_2fa(
api, hass, config, credentials_config, setup_simplisafe
api, api_auth_state, hass, config, credentials_config, setup_simplisafe
):
"""Test the user step with email-based 2FA."""
result = await hass.config_entries.flow.async_init(
@ -208,14 +210,15 @@ async def test_step_user_email_2fa(
# Patch API.async_verify_2fa_email to first return pending, then return all done:
api.async_verify_2fa_email.side_effect = [Verify2FAPending, None]
# Patch the amount of time slept between calls so to not slow down this test:
with patch(
"homeassistant.components.simplisafe.config_flow.DEFAULT_EMAIL_2FA_SLEEP", 0
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=credentials_config
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=credentials_config
)
assert result["type"] == data_entry_flow.RESULT_TYPE_SHOW_PROGRESS
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["type"] == data_entry_flow.RESULT_TYPE_SHOW_PROGRESS_DONE
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert len(hass.config_entries.async_entries()) == 1
[config_entry] = hass.config_entries.async_entries(DOMAIN)
@ -223,6 +226,7 @@ async def test_step_user_email_2fa(
assert config_entry.data == config
@patch("homeassistant.components.simplisafe.config_flow.DEFAULT_EMAIL_2FA_TIMEOUT", 0)
@pytest.mark.parametrize("api_auth_state", [AuthStates.PENDING_2FA_EMAIL])
async def test_step_user_email_2fa_timeout(
api, hass, config, credentials_config, setup_simplisafe
@ -237,18 +241,18 @@ async def test_step_user_email_2fa_timeout(
# Patch API.async_verify_2fa_email to return pending:
api.async_verify_2fa_email.side_effect = Verify2FAPending
# Patch the amount of time slept between calls and the timeout duration so to not
# slow down this test:
with patch(
"homeassistant.components.simplisafe.config_flow.DEFAULT_EMAIL_2FA_SLEEP", 0
), patch(
"homeassistant.components.simplisafe.config_flow.DEFAULT_EMAIL_2FA_TIMEOUT", 0
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=credentials_config
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {"base": "2fa_timed_out"}
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=credentials_config
)
assert result["type"] == data_entry_flow.RESULT_TYPE_SHOW_PROGRESS
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["type"] == data_entry_flow.RESULT_TYPE_SHOW_PROGRESS_DONE
assert result["step_id"] == "email_2fa_error"
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "email_2fa_timed_out"
async def test_step_user_sms_2fa(