Add Notify MFA module ()

* Add Notify MFA

* Fix unit test

* Address review comment, change storage implementation

* Add retry limit to mfa module

* Fix loading

* Fix invalaid login log processing

* Typing

* Change default message template

* Change one-time password to 8 digit

* Refactoring to not save secret

* Bug fixing

* Change async_initialize method name to aysnc_initialize_login_mfa_step

* Address some simple fix code review comment
pull/16825/head
Jason Hu 2018-09-24 02:06:50 -07:00 committed by Paulus Schoutsen
parent ad47ece5c6
commit 7a77951bb4
16 changed files with 799 additions and 24 deletions

View File

@ -2,3 +2,4 @@
from datetime import timedelta
ACCESS_TOKEN_EXPIRATION = timedelta(minutes=30)
MFA_SESSION_EXPIRATION = timedelta(minutes=5)

View File

@ -1,5 +1,4 @@
"""Plugable auth modules for Home Assistant."""
from datetime import timedelta
import importlib
import logging
import types
@ -23,8 +22,6 @@ MULTI_FACTOR_AUTH_MODULE_SCHEMA = vol.Schema({
vol.Optional(CONF_ID): str,
}, extra=vol.ALLOW_EXTRA)
SESSION_EXPIRATION = timedelta(minutes=5)
DATA_REQS = 'mfa_auth_module_reqs_processed'
_LOGGER = logging.getLogger(__name__)
@ -34,6 +31,7 @@ class MultiFactorAuthModule:
"""Multi-factor Auth Module of validation function."""
DEFAULT_TITLE = 'Unnamed auth module'
MAX_RETRY_TIME = 3
def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None:
"""Initialize an auth module."""
@ -84,7 +82,7 @@ class MultiFactorAuthModule:
"""Return whether user is setup."""
raise NotImplementedError
async def async_validation(
async def async_validate(
self, user_id: str, user_input: Dict[str, Any]) -> bool:
"""Return True if validation passed."""
raise NotImplementedError

View File

@ -77,7 +77,7 @@ class InsecureExampleModule(MultiFactorAuthModule):
return True
return False
async def async_validation(
async def async_validate(
self, user_id: str, user_input: Dict[str, Any]) -> bool:
"""Return True if validation passed."""
for data in self._data:

View File

@ -0,0 +1,325 @@
"""HMAC-based One-time Password auth module.
Sending HOTP through notify service
"""
import logging
from collections import OrderedDict
from typing import Any, Dict, Optional, Tuple, List # noqa: F401
import attr
import voluptuous as vol
from homeassistant.const import CONF_EXCLUDE, CONF_INCLUDE
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from . import MultiFactorAuthModule, MULTI_FACTOR_AUTH_MODULES, \
MULTI_FACTOR_AUTH_MODULE_SCHEMA, SetupFlow
REQUIREMENTS = ['pyotp==2.2.6']
CONF_MESSAGE = 'message'
CONFIG_SCHEMA = MULTI_FACTOR_AUTH_MODULE_SCHEMA.extend({
vol.Optional(CONF_INCLUDE): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_EXCLUDE): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_MESSAGE,
default='{} is your Home Assistant login code'): str
}, extra=vol.PREVENT_EXTRA)
STORAGE_VERSION = 1
STORAGE_KEY = 'auth_module.notify'
STORAGE_USERS = 'users'
STORAGE_USER_ID = 'user_id'
INPUT_FIELD_CODE = 'code'
_LOGGER = logging.getLogger(__name__)
def _generate_secret() -> str:
"""Generate a secret."""
import pyotp
return str(pyotp.random_base32())
def _generate_random() -> int:
"""Generate a 8 digit number."""
import pyotp
return int(pyotp.random_base32(length=8, chars=list('1234567890')))
def _generate_otp(secret: str, count: int) -> str:
"""Generate one time password."""
import pyotp
return str(pyotp.HOTP(secret).at(count))
def _verify_otp(secret: str, otp: str, count: int) -> bool:
"""Verify one time password."""
import pyotp
return bool(pyotp.HOTP(secret).verify(otp, count))
@attr.s(slots=True)
class NotifySetting:
"""Store notify setting for one user."""
secret = attr.ib(type=str, factory=_generate_secret) # not persistent
counter = attr.ib(type=int, factory=_generate_random) # not persistent
notify_service = attr.ib(type=Optional[str], default=None)
target = attr.ib(type=Optional[str], default=None)
_UsersDict = Dict[str, NotifySetting]
@MULTI_FACTOR_AUTH_MODULES.register('notify')
class NotifyAuthModule(MultiFactorAuthModule):
"""Auth module send hmac-based one time password by notify service."""
DEFAULT_TITLE = 'Notify One-Time Password'
def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None:
"""Initialize the user data store."""
super().__init__(hass, config)
self._user_settings = None # type: Optional[_UsersDict]
self._user_store = hass.helpers.storage.Store(
STORAGE_VERSION, STORAGE_KEY)
self._include = config.get(CONF_INCLUDE, [])
self._exclude = config.get(CONF_EXCLUDE, [])
self._message_template = config[CONF_MESSAGE]
@property
def input_schema(self) -> vol.Schema:
"""Validate login flow input data."""
return vol.Schema({INPUT_FIELD_CODE: str})
async def _async_load(self) -> None:
"""Load stored data."""
data = await self._user_store.async_load()
if data is None:
data = {STORAGE_USERS: {}}
self._user_settings = {
user_id: NotifySetting(**setting)
for user_id, setting in data.get(STORAGE_USERS, {}).items()
}
async def _async_save(self) -> None:
"""Save data."""
if self._user_settings is None:
return
await self._user_store.async_save({STORAGE_USERS: {
user_id: attr.asdict(
notify_setting, filter=attr.filters.exclude(
attr.fields(NotifySetting).secret,
attr.fields(NotifySetting).counter,
))
for user_id, notify_setting
in self._user_settings.items()
}})
@callback
def aync_get_available_notify_services(self) -> List[str]:
"""Return list of notify services."""
unordered_services = set()
for service in self.hass.services.async_services().get('notify', {}):
if service not in self._exclude:
unordered_services.add(service)
if self._include:
unordered_services &= set(self._include)
return sorted(unordered_services)
async def async_setup_flow(self, user_id: str) -> SetupFlow:
"""Return a data entry flow handler for setup module.
Mfa module should extend SetupFlow
"""
return NotifySetupFlow(
self, self.input_schema, user_id,
self.aync_get_available_notify_services())
async def async_setup_user(self, user_id: str, setup_data: Any) -> Any:
"""Set up auth module for user."""
if self._user_settings is None:
await self._async_load()
assert self._user_settings is not None
self._user_settings[user_id] = NotifySetting(
notify_service=setup_data.get('notify_service'),
target=setup_data.get('target'),
)
await self._async_save()
async def async_depose_user(self, user_id: str) -> None:
"""Depose auth module for user."""
if self._user_settings is None:
await self._async_load()
assert self._user_settings is not None
if self._user_settings.pop(user_id, None):
await self._async_save()
async def async_is_user_setup(self, user_id: str) -> bool:
"""Return whether user is setup."""
if self._user_settings is None:
await self._async_load()
assert self._user_settings is not None
return user_id in self._user_settings
async def async_validate(
self, user_id: str, user_input: Dict[str, Any]) -> bool:
"""Return True if validation passed."""
if self._user_settings is None:
await self._async_load()
assert self._user_settings is not None
notify_setting = self._user_settings.get(user_id, None)
if notify_setting is None:
return False
# user_input has been validate in caller
return await self.hass.async_add_executor_job(
_verify_otp, notify_setting.secret,
user_input.get(INPUT_FIELD_CODE, ''),
notify_setting.counter)
async def async_initialize_login_mfa_step(self, user_id: str) -> None:
"""Generate code and notify user."""
if self._user_settings is None:
await self._async_load()
assert self._user_settings is not None
notify_setting = self._user_settings.get(user_id, None)
if notify_setting is None:
raise ValueError('Cannot find user_id')
def generate_secret_and_one_time_password() -> str:
"""Generate and send one time password."""
assert notify_setting
# secret and counter are not persistent
notify_setting.secret = _generate_secret()
notify_setting.counter = _generate_random()
return _generate_otp(
notify_setting.secret, notify_setting.counter)
code = await self.hass.async_add_executor_job(
generate_secret_and_one_time_password)
await self.async_notify_user(user_id, code)
async def async_notify_user(self, user_id: str, code: str) -> None:
"""Send code by user's notify service."""
if self._user_settings is None:
await self._async_load()
assert self._user_settings is not None
notify_setting = self._user_settings.get(user_id, None)
if notify_setting is None:
_LOGGER.error('Cannot find user %s', user_id)
return
await self.async_notify( # type: ignore
code, notify_setting.notify_service, notify_setting.target)
async def async_notify(self, code: str, notify_service: str,
target: Optional[str] = None) -> None:
"""Send code by notify service."""
data = {'message': self._message_template.format(code)}
if target:
data['target'] = [target]
await self.hass.services.async_call('notify', notify_service, data)
class NotifySetupFlow(SetupFlow):
"""Handler for the setup flow."""
def __init__(self, auth_module: NotifyAuthModule,
setup_schema: vol.Schema,
user_id: str,
available_notify_services: List[str]) -> None:
"""Initialize the setup flow."""
super().__init__(auth_module, setup_schema, user_id)
# to fix typing complaint
self._auth_module = auth_module # type: NotifyAuthModule
self._available_notify_services = available_notify_services
self._secret = None # type: Optional[str]
self._count = None # type: Optional[int]
self._notify_service = None # type: Optional[str]
self._target = None # type: Optional[str]
async def async_step_init(
self, user_input: Optional[Dict[str, str]] = None) \
-> Dict[str, Any]:
"""Let user select available notify services."""
errors = {} # type: Dict[str, str]
hass = self._auth_module.hass
if user_input:
self._notify_service = user_input['notify_service']
self._target = user_input.get('target')
self._secret = await hass.async_add_executor_job(_generate_secret)
self._count = await hass.async_add_executor_job(_generate_random)
return await self.async_step_setup()
if not self._available_notify_services:
return self.async_abort(reason='no_available_service')
schema = OrderedDict() # type: Dict[str, Any]
schema['notify_service'] = vol.In(self._available_notify_services)
schema['target'] = vol.Optional(str)
return self.async_show_form(
step_id='init',
data_schema=vol.Schema(schema),
errors=errors
)
async def async_step_setup(
self, user_input: Optional[Dict[str, str]] = None) \
-> Dict[str, Any]:
"""Verify user can recevie one-time password."""
errors = {} # type: Dict[str, str]
hass = self._auth_module.hass
if user_input:
verified = await hass.async_add_executor_job(
_verify_otp, self._secret, user_input['code'], self._count)
if verified:
await self._auth_module.async_setup_user(
self._user_id, {
'notify_service': self._notify_service,
'target': self._target,
})
return self.async_create_entry(
title=self._auth_module.name,
data={}
)
errors['base'] = 'invalid_code'
# generate code every time, no retry logic
assert self._secret and self._count
code = await hass.async_add_executor_job(
_generate_otp, self._secret, self._count)
assert self._notify_service
await self._auth_module.async_notify(
code, self._notify_service, self._target)
return self.async_show_form(
step_id='setup',
data_schema=self._setup_schema,
description_placeholders={'notify_service': self._notify_service},
errors=errors,
)

View File

@ -60,6 +60,7 @@ class TotpAuthModule(MultiFactorAuthModule):
"""Auth module validate time-based one time password."""
DEFAULT_TITLE = 'Time-based One Time Password'
MAX_RETRY_TIME = 5
def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None:
"""Initialize the user data store."""
@ -130,7 +131,7 @@ class TotpAuthModule(MultiFactorAuthModule):
return user_id in self._users # type: ignore
async def async_validation(
async def async_validate(
self, user_id: str, user_input: Dict[str, Any]) -> bool:
"""Return True if validation passed."""
if self._users is None:

View File

@ -15,8 +15,8 @@ from homeassistant.util import dt as dt_util
from homeassistant.util.decorator import Registry
from ..auth_store import AuthStore
from ..const import MFA_SESSION_EXPIRATION
from ..models import Credentials, User, UserMeta # noqa: F401
from ..mfa_modules import SESSION_EXPIRATION
_LOGGER = logging.getLogger(__name__)
DATA_REQS = 'auth_prov_reqs_processed'
@ -171,6 +171,7 @@ class LoginFlow(data_entry_flow.FlowHandler):
self._auth_manager = auth_provider.hass.auth # type: ignore
self.available_mfa_modules = {} # type: Dict[str, str]
self.created_at = dt_util.utcnow()
self.invalid_mfa_times = 0
self.user = None # type: Optional[User]
async def async_step_init(
@ -212,6 +213,8 @@ class LoginFlow(data_entry_flow.FlowHandler):
self, user_input: Optional[Dict[str, str]] = None) \
-> Dict[str, Any]:
"""Handle the step of mfa validation."""
assert self.user
errors = {}
auth_module = self._auth_manager.get_auth_mfa_module(
@ -221,25 +224,34 @@ class LoginFlow(data_entry_flow.FlowHandler):
# will show invalid_auth_module error
return await self.async_step_select_mfa_module(user_input={})
if user_input is None and hasattr(auth_module,
'async_initialize_login_mfa_step'):
await auth_module.async_initialize_login_mfa_step(self.user.id)
if user_input is not None:
expires = self.created_at + SESSION_EXPIRATION
expires = self.created_at + MFA_SESSION_EXPIRATION
if dt_util.utcnow() > expires:
return self.async_abort(
reason='login_expired'
)
result = await auth_module.async_validation(
self.user.id, user_input) # type: ignore
result = await auth_module.async_validate(
self.user.id, user_input)
if not result:
errors['base'] = 'invalid_code'
self.invalid_mfa_times += 1
if self.invalid_mfa_times >= auth_module.MAX_RETRY_TIME > 0:
return self.async_abort(
reason='too_many_retry'
)
if not errors:
return await self.async_finish(self.user)
description_placeholders = {
'mfa_module_name': auth_module.name,
'mfa_module_id': auth_module.id
} # type: Dict[str, str]
'mfa_module_id': auth_module.id,
} # type: Dict[str, Optional[str]]
return self.async_show_form(
step_id='mfa',

View File

@ -1,5 +1,24 @@
{
"mfa_setup": {
"notify": {
"abort": {
"no_available_service": "No available notify services."
},
"error": {
"invalid_code": "Invalid code, please try again."
},
"step": {
"init": {
"description": "Please select one of notify service:",
"title": "Set up one-time password delivered by notify component"
},
"setup": {
"description": "A one-time password have sent by **notify.{notify_service}**. Please input it in below:",
"title": "Verify setup"
}
},
"title": "Notify One-Time Password"
},
"totp": {
"error": {
"invalid_code": "Invalid code, please try again. If you get this error consistently, please make sure the clock of your Home Assistant system is accurate."

View File

@ -226,8 +226,9 @@ class LoginFlowResourceView(HomeAssistantView):
if result['type'] != data_entry_flow.RESULT_TYPE_CREATE_ENTRY:
# @log_invalid_auth does not work here since it returns HTTP 200
# need manually log failed login attempts
if result['errors'] is not None and \
result['errors'].get('base') == 'invalid_auth':
if (result.get('errors') is not None and
result['errors'].get('base') in ['invalid_auth',
'invalid_code']):
await process_wrong_login(request)
return self.json(_prepare_result_json(result))

View File

@ -11,6 +11,25 @@
"error": {
"invalid_code": "Invalid code, please try again. If you get this error consistently, please make sure the clock of your Home Assistant system is accurate."
}
},
"notify": {
"title": "Notify One-Time Password",
"step": {
"init": {
"title": "Set up one-time password delivered by notify component",
"description": "Please select one of notify service:"
},
"setup": {
"title": "Verify setup",
"description": "A one-time password have sent by **notify.{notify_service}**. Please input it in below:"
}
},
"abort": {
"no_available_service": "No available notify services."
},
"error": {
"invalid_code": "Invalid code, please try again."
}
}
}
}

View File

@ -475,7 +475,7 @@ async def async_process_ha_core_config(
auth_conf.append({'type': 'trusted_networks'})
mfa_conf = config.get(CONF_AUTH_MFA_MODULES, [
{'type': 'totp', 'id': 'totp', 'name': 'Authenticator app'}
{'type': 'totp', 'id': 'totp', 'name': 'Authenticator app'},
])
setattr(hass, 'auth', await auth.auth_manager_from_config(

View File

@ -1020,6 +1020,7 @@ pyota==2.0.5
# homeassistant.components.climate.opentherm_gw
pyotgw==0.1b0
# homeassistant.auth.mfa_modules.notify
# homeassistant.auth.mfa_modules.totp
# homeassistant.components.sensor.otp
pyotp==2.2.6

View File

@ -160,6 +160,7 @@ pynx584==0.4
# homeassistant.components.openuv
pyopenuv==1.0.4
# homeassistant.auth.mfa_modules.notify
# homeassistant.auth.mfa_modules.totp
# homeassistant.components.sensor.otp
pyotp==2.2.6

View File

@ -12,15 +12,15 @@ async def test_validate(hass):
'data': [{'user_id': 'test-user', 'pin': '123456'}]
})
result = await auth_module.async_validation(
result = await auth_module.async_validate(
'test-user', {'pin': '123456'})
assert result is True
result = await auth_module.async_validation(
result = await auth_module.async_validate(
'test-user', {'pin': 'invalid'})
assert result is False
result = await auth_module.async_validation(
result = await auth_module.async_validate(
'invalid-user', {'pin': '123456'})
assert result is False
@ -36,7 +36,7 @@ async def test_setup_user(hass):
'test-user', {'pin': '123456'})
assert len(auth_module._data) == 1
result = await auth_module.async_validation(
result = await auth_module.async_validate(
'test-user', {'pin': '123456'})
assert result is True

View File

@ -0,0 +1,397 @@
"""Test the HMAC-based One Time Password (MFA) auth module."""
from unittest.mock import patch
from homeassistant import data_entry_flow
from homeassistant.auth import models as auth_models, auth_manager_from_config
from homeassistant.auth.mfa_modules import auth_mfa_module_from_config
from homeassistant.components.notify import NOTIFY_SERVICE_SCHEMA
from tests.common import MockUser, async_mock_service
MOCK_CODE = '123456'
MOCK_CODE_2 = '654321'
async def test_validating_mfa(hass):
"""Test validating mfa code."""
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify'
})
await notify_auth_module.async_setup_user('test-user', {
'notify_service': 'dummy'
})
with patch('pyotp.HOTP.verify', return_value=True):
assert await notify_auth_module.async_validate(
'test-user', {'code': MOCK_CODE})
async def test_validating_mfa_invalid_code(hass):
"""Test validating an invalid mfa code."""
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify'
})
await notify_auth_module.async_setup_user('test-user', {
'notify_service': 'dummy'
})
with patch('pyotp.HOTP.verify', return_value=False):
assert await notify_auth_module.async_validate(
'test-user', {'code': MOCK_CODE}) is False
async def test_validating_mfa_invalid_user(hass):
"""Test validating an mfa code with invalid user."""
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify'
})
await notify_auth_module.async_setup_user('test-user', {
'notify_service': 'dummy'
})
assert await notify_auth_module.async_validate(
'invalid-user', {'code': MOCK_CODE}) is False
async def test_validating_mfa_counter(hass):
"""Test counter will move only after generate code."""
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify'
})
await notify_auth_module.async_setup_user('test-user', {
'counter': 0,
'notify_service': 'dummy',
})
assert notify_auth_module._user_settings
notify_setting = list(notify_auth_module._user_settings.values())[0]
init_count = notify_setting.counter
assert init_count is not None
with patch('pyotp.HOTP.at', return_value=MOCK_CODE):
await notify_auth_module.async_initialize_login_mfa_step('test-user')
notify_setting = list(notify_auth_module._user_settings.values())[0]
after_generate_count = notify_setting.counter
assert after_generate_count != init_count
with patch('pyotp.HOTP.verify', return_value=True):
assert await notify_auth_module.async_validate(
'test-user', {'code': MOCK_CODE})
notify_setting = list(notify_auth_module._user_settings.values())[0]
assert after_generate_count == notify_setting.counter
with patch('pyotp.HOTP.verify', return_value=False):
assert await notify_auth_module.async_validate(
'test-user', {'code': MOCK_CODE}) is False
notify_setting = list(notify_auth_module._user_settings.values())[0]
assert after_generate_count == notify_setting.counter
async def test_setup_depose_user(hass):
"""Test set up and despose user."""
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify'
})
await notify_auth_module.async_setup_user('test-user', {})
assert len(notify_auth_module._user_settings) == 1
await notify_auth_module.async_setup_user('test-user', {})
assert len(notify_auth_module._user_settings) == 1
await notify_auth_module.async_depose_user('test-user')
assert len(notify_auth_module._user_settings) == 0
await notify_auth_module.async_setup_user(
'test-user2', {'secret': 'secret-code'})
assert len(notify_auth_module._user_settings) == 1
async def test_login_flow_validates_mfa(hass):
"""Test login flow with mfa enabled."""
hass.auth = await auth_manager_from_config(hass, [{
'type': 'insecure_example',
'users': [{'username': 'test-user', 'password': 'test-pass'}],
}], [{
'type': 'notify',
}])
user = MockUser(
id='mock-user',
is_owner=False,
is_active=False,
name='Paulus',
).add_to_auth_manager(hass.auth)
await hass.auth.async_link_user(user, auth_models.Credentials(
id='mock-id',
auth_provider_type='insecure_example',
auth_provider_id=None,
data={'username': 'test-user'},
is_new=False,
))
notify_calls = async_mock_service(hass, 'notify', 'test-notify',
NOTIFY_SERVICE_SCHEMA)
await hass.auth.async_enable_user_mfa(user, 'notify', {
'notify_service': 'test-notify',
})
provider = hass.auth.auth_providers[0]
result = await hass.auth.login_flow.async_init(
(provider.type, provider.id))
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.auth.login_flow.async_configure(result['flow_id'], {
'username': 'incorrect-user',
'password': 'test-pass',
})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['errors']['base'] == 'invalid_auth'
result = await hass.auth.login_flow.async_configure(result['flow_id'], {
'username': 'test-user',
'password': 'incorrect-pass',
})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['errors']['base'] == 'invalid_auth'
with patch('pyotp.HOTP.at', return_value=MOCK_CODE):
result = await hass.auth.login_flow.async_configure(
result['flow_id'],
{
'username': 'test-user',
'password': 'test-pass',
})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['step_id'] == 'mfa'
assert result['data_schema'].schema.get('code') == str
# wait service call finished
await hass.async_block_till_done()
assert len(notify_calls) == 1
notify_call = notify_calls[0]
assert notify_call.domain == 'notify'
assert notify_call.service == 'test-notify'
message = notify_call.data['message']
message.hass = hass
assert MOCK_CODE in message.async_render()
with patch('pyotp.HOTP.verify', return_value=False):
result = await hass.auth.login_flow.async_configure(
result['flow_id'], {'code': 'invalid-code'})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['step_id'] == 'mfa'
assert result['errors']['base'] == 'invalid_code'
# wait service call finished
await hass.async_block_till_done()
# would not send new code, allow user retry
assert len(notify_calls) == 1
# retry twice
with patch('pyotp.HOTP.verify', return_value=False), \
patch('pyotp.HOTP.at', return_value=MOCK_CODE_2):
result = await hass.auth.login_flow.async_configure(
result['flow_id'], {'code': 'invalid-code'})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['step_id'] == 'mfa'
assert result['errors']['base'] == 'invalid_code'
# after the 3rd failure, flow abort
result = await hass.auth.login_flow.async_configure(
result['flow_id'], {'code': 'invalid-code'})
assert result['type'] == data_entry_flow.RESULT_TYPE_ABORT
assert result['reason'] == 'too_many_retry'
# wait service call finished
await hass.async_block_till_done()
# restart login
result = await hass.auth.login_flow.async_init(
(provider.type, provider.id))
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
with patch('pyotp.HOTP.at', return_value=MOCK_CODE):
result = await hass.auth.login_flow.async_configure(
result['flow_id'],
{
'username': 'test-user',
'password': 'test-pass',
})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['step_id'] == 'mfa'
assert result['data_schema'].schema.get('code') == str
# wait service call finished
await hass.async_block_till_done()
assert len(notify_calls) == 2
notify_call = notify_calls[1]
assert notify_call.domain == 'notify'
assert notify_call.service == 'test-notify'
message = notify_call.data['message']
message.hass = hass
assert MOCK_CODE in message.async_render()
with patch('pyotp.HOTP.verify', return_value=True):
result = await hass.auth.login_flow.async_configure(
result['flow_id'], {'code': MOCK_CODE})
assert result['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result['data'].id == 'mock-user'
async def test_setup_user_notify_service(hass):
"""Test allow select notify service during mfa setup."""
notify_calls = async_mock_service(
hass, 'notify', 'test1', NOTIFY_SERVICE_SCHEMA)
async_mock_service(hass, 'notify', 'test2', NOTIFY_SERVICE_SCHEMA)
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify',
})
services = notify_auth_module.aync_get_available_notify_services()
assert services == ['test1', 'test2']
flow = await notify_auth_module.async_setup_flow('test-user')
step = await flow.async_step_init()
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'init'
schema = step['data_schema']
schema({'notify_service': 'test2'})
with patch('pyotp.HOTP.at', return_value=MOCK_CODE):
step = await flow.async_step_init({'notify_service': 'test1'})
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'setup'
# wait service call finished
await hass.async_block_till_done()
assert len(notify_calls) == 1
notify_call = notify_calls[0]
assert notify_call.domain == 'notify'
assert notify_call.service == 'test1'
message = notify_call.data['message']
message.hass = hass
assert MOCK_CODE in message.async_render()
with patch('pyotp.HOTP.at', return_value=MOCK_CODE_2):
step = await flow.async_step_setup({'code': 'invalid'})
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'setup'
assert step['errors']['base'] == 'invalid_code'
# wait service call finished
await hass.async_block_till_done()
assert len(notify_calls) == 2
notify_call = notify_calls[1]
assert notify_call.domain == 'notify'
assert notify_call.service == 'test1'
message = notify_call.data['message']
message.hass = hass
assert MOCK_CODE_2 in message.async_render()
with patch('pyotp.HOTP.verify', return_value=True):
step = await flow.async_step_setup({'code': MOCK_CODE_2})
assert step['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
async def test_include_exclude_config(hass):
"""Test allow include exclude config."""
async_mock_service(hass, 'notify', 'include1', NOTIFY_SERVICE_SCHEMA)
async_mock_service(hass, 'notify', 'include2', NOTIFY_SERVICE_SCHEMA)
async_mock_service(hass, 'notify', 'exclude1', NOTIFY_SERVICE_SCHEMA)
async_mock_service(hass, 'notify', 'exclude2', NOTIFY_SERVICE_SCHEMA)
async_mock_service(hass, 'other', 'include3', NOTIFY_SERVICE_SCHEMA)
async_mock_service(hass, 'other', 'exclude3', NOTIFY_SERVICE_SCHEMA)
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify',
'exclude': ['exclude1', 'exclude2', 'exclude3'],
})
services = notify_auth_module.aync_get_available_notify_services()
assert services == ['include1', 'include2']
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify',
'include': ['include1', 'include2', 'include3'],
})
services = notify_auth_module.aync_get_available_notify_services()
assert services == ['include1', 'include2']
# exclude has high priority than include
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify',
'include': ['include1', 'include2', 'include3'],
'exclude': ['exclude1', 'exclude2', 'include2'],
})
services = notify_auth_module.aync_get_available_notify_services()
assert services == ['include1']
async def test_setup_user_no_notify_service(hass):
"""Test setup flow abort if there is no avilable notify service."""
async_mock_service(hass, 'notify', 'test1', NOTIFY_SERVICE_SCHEMA)
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify',
'exclude': 'test1',
})
services = notify_auth_module.aync_get_available_notify_services()
assert services == []
flow = await notify_auth_module.async_setup_flow('test-user')
step = await flow.async_step_init()
assert step['type'] == data_entry_flow.RESULT_TYPE_ABORT
assert step['reason'] == 'no_available_service'
async def test_not_raise_exception_when_service_not_exist(hass):
"""Test login flow will not raise exception when notify service error."""
hass.auth = await auth_manager_from_config(hass, [{
'type': 'insecure_example',
'users': [{'username': 'test-user', 'password': 'test-pass'}],
}], [{
'type': 'notify',
}])
user = MockUser(
id='mock-user',
is_owner=False,
is_active=False,
name='Paulus',
).add_to_auth_manager(hass.auth)
await hass.auth.async_link_user(user, auth_models.Credentials(
id='mock-id',
auth_provider_type='insecure_example',
auth_provider_id=None,
data={'username': 'test-user'},
is_new=False,
))
await hass.auth.async_enable_user_mfa(user, 'notify', {
'notify_service': 'invalid-notify',
})
provider = hass.auth.auth_providers[0]
result = await hass.auth.login_flow.async_init(
(provider.type, provider.id))
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
with patch('pyotp.HOTP.at', return_value=MOCK_CODE):
result = await hass.auth.login_flow.async_configure(
result['flow_id'],
{
'username': 'test-user',
'password': 'test-pass',
})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['step_id'] == 'mfa'
assert result['data_schema'].schema.get('code') == str
# wait service call finished
await hass.async_block_till_done()

View File

@ -17,7 +17,7 @@ async def test_validating_mfa(hass):
await totp_auth_module.async_setup_user('test-user', {})
with patch('pyotp.TOTP.verify', return_value=True):
assert await totp_auth_module.async_validation(
assert await totp_auth_module.async_validate(
'test-user', {'code': MOCK_CODE})
@ -29,7 +29,7 @@ async def test_validating_mfa_invalid_code(hass):
await totp_auth_module.async_setup_user('test-user', {})
with patch('pyotp.TOTP.verify', return_value=False):
assert await totp_auth_module.async_validation(
assert await totp_auth_module.async_validate(
'test-user', {'code': MOCK_CODE}) is False
@ -40,7 +40,7 @@ async def test_validating_mfa_invalid_user(hass):
})
await totp_auth_module.async_setup_user('test-user', {})
assert await totp_auth_module.async_validation(
assert await totp_auth_module.async_validate(
'invalid-user', {'code': MOCK_CODE}) is False

View File

@ -9,7 +9,7 @@ import voluptuous as vol
from homeassistant import auth, data_entry_flow
from homeassistant.auth import (
models as auth_models, auth_store, const as auth_const)
from homeassistant.auth.mfa_modules import SESSION_EXPIRATION
from homeassistant.auth.const import MFA_SESSION_EXPIRATION
from homeassistant.util import dt as dt_util
from tests.common import (
MockUser, ensure_auth_manager_loaded, flush_store, CLIENT_ID)
@ -720,7 +720,7 @@ async def test_auth_module_expired_session(mock_hass):
assert step['step_id'] == 'mfa'
with patch('homeassistant.util.dt.utcnow',
return_value=dt_util.utcnow() + SESSION_EXPIRATION):
return_value=dt_util.utcnow() + MFA_SESSION_EXPIRATION):
step = await manager.login_flow.async_configure(step['flow_id'], {
'pin': 'test-pin',
})