core/tests/auth/mfa_modules/test_notify.py

398 lines
14 KiB
Python

"""Test the HMAC-based One Time Password (MFA) auth module."""
from unittest.mock import patch
from homeassistant import data_entry_flow
from homeassistant.auth import models as auth_models, auth_manager_from_config
from homeassistant.auth.mfa_modules import auth_mfa_module_from_config
from homeassistant.components.notify import NOTIFY_SERVICE_SCHEMA
from tests.common import MockUser, async_mock_service
MOCK_CODE = '123456'
MOCK_CODE_2 = '654321'
async def test_validating_mfa(hass):
"""Test validating mfa code."""
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify'
})
await notify_auth_module.async_setup_user('test-user', {
'notify_service': 'dummy'
})
with patch('pyotp.HOTP.verify', return_value=True):
assert await notify_auth_module.async_validate(
'test-user', {'code': MOCK_CODE})
async def test_validating_mfa_invalid_code(hass):
"""Test validating an invalid mfa code."""
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify'
})
await notify_auth_module.async_setup_user('test-user', {
'notify_service': 'dummy'
})
with patch('pyotp.HOTP.verify', return_value=False):
assert await notify_auth_module.async_validate(
'test-user', {'code': MOCK_CODE}) is False
async def test_validating_mfa_invalid_user(hass):
"""Test validating an mfa code with invalid user."""
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify'
})
await notify_auth_module.async_setup_user('test-user', {
'notify_service': 'dummy'
})
assert await notify_auth_module.async_validate(
'invalid-user', {'code': MOCK_CODE}) is False
async def test_validating_mfa_counter(hass):
"""Test counter will move only after generate code."""
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify'
})
await notify_auth_module.async_setup_user('test-user', {
'counter': 0,
'notify_service': 'dummy',
})
async_mock_service(hass, 'notify', 'dummy')
assert notify_auth_module._user_settings
notify_setting = list(notify_auth_module._user_settings.values())[0]
init_count = notify_setting.counter
assert init_count is not None
with patch('pyotp.HOTP.at', return_value=MOCK_CODE):
await notify_auth_module.async_initialize_login_mfa_step('test-user')
notify_setting = list(notify_auth_module._user_settings.values())[0]
after_generate_count = notify_setting.counter
assert after_generate_count != init_count
with patch('pyotp.HOTP.verify', return_value=True):
assert await notify_auth_module.async_validate(
'test-user', {'code': MOCK_CODE})
notify_setting = list(notify_auth_module._user_settings.values())[0]
assert after_generate_count == notify_setting.counter
with patch('pyotp.HOTP.verify', return_value=False):
assert await notify_auth_module.async_validate(
'test-user', {'code': MOCK_CODE}) is False
notify_setting = list(notify_auth_module._user_settings.values())[0]
assert after_generate_count == notify_setting.counter
async def test_setup_depose_user(hass):
"""Test set up and despose user."""
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify'
})
await notify_auth_module.async_setup_user('test-user', {})
assert len(notify_auth_module._user_settings) == 1
await notify_auth_module.async_setup_user('test-user', {})
assert len(notify_auth_module._user_settings) == 1
await notify_auth_module.async_depose_user('test-user')
assert len(notify_auth_module._user_settings) == 0
await notify_auth_module.async_setup_user(
'test-user2', {'secret': 'secret-code'})
assert len(notify_auth_module._user_settings) == 1
async def test_login_flow_validates_mfa(hass):
"""Test login flow with mfa enabled."""
hass.auth = await auth_manager_from_config(hass, [{
'type': 'insecure_example',
'users': [{'username': 'test-user', 'password': 'test-pass'}],
}], [{
'type': 'notify',
}])
user = MockUser(
id='mock-user',
is_owner=False,
is_active=False,
name='Paulus',
).add_to_auth_manager(hass.auth)
await hass.auth.async_link_user(user, auth_models.Credentials(
id='mock-id',
auth_provider_type='insecure_example',
auth_provider_id=None,
data={'username': 'test-user'},
is_new=False,
))
notify_calls = async_mock_service(hass, 'notify', 'test-notify',
NOTIFY_SERVICE_SCHEMA)
await hass.auth.async_enable_user_mfa(user, 'notify', {
'notify_service': 'test-notify',
})
provider = hass.auth.auth_providers[0]
result = await hass.auth.login_flow.async_init(
(provider.type, provider.id))
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.auth.login_flow.async_configure(result['flow_id'], {
'username': 'incorrect-user',
'password': 'test-pass',
})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['errors']['base'] == 'invalid_auth'
result = await hass.auth.login_flow.async_configure(result['flow_id'], {
'username': 'test-user',
'password': 'incorrect-pass',
})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['errors']['base'] == 'invalid_auth'
with patch('pyotp.HOTP.at', return_value=MOCK_CODE):
result = await hass.auth.login_flow.async_configure(
result['flow_id'],
{
'username': 'test-user',
'password': 'test-pass',
})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['step_id'] == 'mfa'
assert result['data_schema'].schema.get('code') == str
# wait service call finished
await hass.async_block_till_done()
assert len(notify_calls) == 1
notify_call = notify_calls[0]
assert notify_call.domain == 'notify'
assert notify_call.service == 'test-notify'
message = notify_call.data['message']
message.hass = hass
assert MOCK_CODE in message.async_render()
with patch('pyotp.HOTP.verify', return_value=False):
result = await hass.auth.login_flow.async_configure(
result['flow_id'], {'code': 'invalid-code'})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['step_id'] == 'mfa'
assert result['errors']['base'] == 'invalid_code'
# wait service call finished
await hass.async_block_till_done()
# would not send new code, allow user retry
assert len(notify_calls) == 1
# retry twice
with patch('pyotp.HOTP.verify', return_value=False), \
patch('pyotp.HOTP.at', return_value=MOCK_CODE_2):
result = await hass.auth.login_flow.async_configure(
result['flow_id'], {'code': 'invalid-code'})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['step_id'] == 'mfa'
assert result['errors']['base'] == 'invalid_code'
# after the 3rd failure, flow abort
result = await hass.auth.login_flow.async_configure(
result['flow_id'], {'code': 'invalid-code'})
assert result['type'] == data_entry_flow.RESULT_TYPE_ABORT
assert result['reason'] == 'too_many_retry'
# wait service call finished
await hass.async_block_till_done()
# restart login
result = await hass.auth.login_flow.async_init(
(provider.type, provider.id))
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
with patch('pyotp.HOTP.at', return_value=MOCK_CODE):
result = await hass.auth.login_flow.async_configure(
result['flow_id'],
{
'username': 'test-user',
'password': 'test-pass',
})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['step_id'] == 'mfa'
assert result['data_schema'].schema.get('code') == str
# wait service call finished
await hass.async_block_till_done()
assert len(notify_calls) == 2
notify_call = notify_calls[1]
assert notify_call.domain == 'notify'
assert notify_call.service == 'test-notify'
message = notify_call.data['message']
message.hass = hass
assert MOCK_CODE in message.async_render()
with patch('pyotp.HOTP.verify', return_value=True):
result = await hass.auth.login_flow.async_configure(
result['flow_id'], {'code': MOCK_CODE})
assert result['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result['data'].id == 'mock-user'
async def test_setup_user_notify_service(hass):
"""Test allow select notify service during mfa setup."""
notify_calls = async_mock_service(
hass, 'notify', 'test1', NOTIFY_SERVICE_SCHEMA)
async_mock_service(hass, 'notify', 'test2', NOTIFY_SERVICE_SCHEMA)
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify',
})
services = notify_auth_module.aync_get_available_notify_services()
assert services == ['test1', 'test2']
flow = await notify_auth_module.async_setup_flow('test-user')
step = await flow.async_step_init()
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'init'
schema = step['data_schema']
schema({'notify_service': 'test2'})
with patch('pyotp.HOTP.at', return_value=MOCK_CODE):
step = await flow.async_step_init({'notify_service': 'test1'})
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'setup'
# wait service call finished
await hass.async_block_till_done()
assert len(notify_calls) == 1
notify_call = notify_calls[0]
assert notify_call.domain == 'notify'
assert notify_call.service == 'test1'
message = notify_call.data['message']
message.hass = hass
assert MOCK_CODE in message.async_render()
with patch('pyotp.HOTP.at', return_value=MOCK_CODE_2):
step = await flow.async_step_setup({'code': 'invalid'})
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'setup'
assert step['errors']['base'] == 'invalid_code'
# wait service call finished
await hass.async_block_till_done()
assert len(notify_calls) == 2
notify_call = notify_calls[1]
assert notify_call.domain == 'notify'
assert notify_call.service == 'test1'
message = notify_call.data['message']
message.hass = hass
assert MOCK_CODE_2 in message.async_render()
with patch('pyotp.HOTP.verify', return_value=True):
step = await flow.async_step_setup({'code': MOCK_CODE_2})
assert step['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
async def test_include_exclude_config(hass):
"""Test allow include exclude config."""
async_mock_service(hass, 'notify', 'include1', NOTIFY_SERVICE_SCHEMA)
async_mock_service(hass, 'notify', 'include2', NOTIFY_SERVICE_SCHEMA)
async_mock_service(hass, 'notify', 'exclude1', NOTIFY_SERVICE_SCHEMA)
async_mock_service(hass, 'notify', 'exclude2', NOTIFY_SERVICE_SCHEMA)
async_mock_service(hass, 'other', 'include3', NOTIFY_SERVICE_SCHEMA)
async_mock_service(hass, 'other', 'exclude3', NOTIFY_SERVICE_SCHEMA)
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify',
'exclude': ['exclude1', 'exclude2', 'exclude3'],
})
services = notify_auth_module.aync_get_available_notify_services()
assert services == ['include1', 'include2']
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify',
'include': ['include1', 'include2', 'include3'],
})
services = notify_auth_module.aync_get_available_notify_services()
assert services == ['include1', 'include2']
# exclude has high priority than include
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify',
'include': ['include1', 'include2', 'include3'],
'exclude': ['exclude1', 'exclude2', 'include2'],
})
services = notify_auth_module.aync_get_available_notify_services()
assert services == ['include1']
async def test_setup_user_no_notify_service(hass):
"""Test setup flow abort if there is no avilable notify service."""
async_mock_service(hass, 'notify', 'test1', NOTIFY_SERVICE_SCHEMA)
notify_auth_module = await auth_mfa_module_from_config(hass, {
'type': 'notify',
'exclude': 'test1',
})
services = notify_auth_module.aync_get_available_notify_services()
assert services == []
flow = await notify_auth_module.async_setup_flow('test-user')
step = await flow.async_step_init()
assert step['type'] == data_entry_flow.RESULT_TYPE_ABORT
assert step['reason'] == 'no_available_service'
async def test_not_raise_exception_when_service_not_exist(hass):
"""Test login flow will not raise exception when notify service error."""
hass.auth = await auth_manager_from_config(hass, [{
'type': 'insecure_example',
'users': [{'username': 'test-user', 'password': 'test-pass'}],
}], [{
'type': 'notify',
}])
user = MockUser(
id='mock-user',
is_owner=False,
is_active=False,
name='Paulus',
).add_to_auth_manager(hass.auth)
await hass.auth.async_link_user(user, auth_models.Credentials(
id='mock-id',
auth_provider_type='insecure_example',
auth_provider_id=None,
data={'username': 'test-user'},
is_new=False,
))
await hass.auth.async_enable_user_mfa(user, 'notify', {
'notify_service': 'invalid-notify',
})
provider = hass.auth.auth_providers[0]
result = await hass.auth.login_flow.async_init(
(provider.type, provider.id))
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
with patch('pyotp.HOTP.at', return_value=MOCK_CODE):
result = await hass.auth.login_flow.async_configure(
result['flow_id'],
{
'username': 'test-user',
'password': 'test-pass',
})
assert result['type'] == data_entry_flow.RESULT_TYPE_ABORT
assert result['reason'] == 'unknown_error'
# wait service call finished
await hass.async_block_till_done()