"""Test the Time-based One Time Password (MFA) auth module.""" import asyncio from unittest.mock import patch from homeassistant import data_entry_flow from homeassistant.auth import models as auth_models, auth_manager_from_config from homeassistant.auth.mfa_modules import auth_mfa_module_from_config from tests.common import MockUser MOCK_CODE = '123456' async def test_validating_mfa(hass): """Test validating mfa code.""" totp_auth_module = await auth_mfa_module_from_config(hass, { 'type': 'totp' }) await totp_auth_module.async_setup_user('test-user', {}) with patch('pyotp.TOTP.verify', return_value=True): assert await totp_auth_module.async_validate( 'test-user', {'code': MOCK_CODE}) async def test_validating_mfa_invalid_code(hass): """Test validating an invalid mfa code.""" totp_auth_module = await auth_mfa_module_from_config(hass, { 'type': 'totp' }) await totp_auth_module.async_setup_user('test-user', {}) with patch('pyotp.TOTP.verify', return_value=False): assert await totp_auth_module.async_validate( 'test-user', {'code': MOCK_CODE}) is False async def test_validating_mfa_invalid_user(hass): """Test validating an mfa code with invalid user.""" totp_auth_module = await auth_mfa_module_from_config(hass, { 'type': 'totp' }) await totp_auth_module.async_setup_user('test-user', {}) assert await totp_auth_module.async_validate( 'invalid-user', {'code': MOCK_CODE}) is False async def test_setup_depose_user(hass): """Test despose user.""" totp_auth_module = await auth_mfa_module_from_config(hass, { 'type': 'totp' }) result = await totp_auth_module.async_setup_user('test-user', {}) assert len(totp_auth_module._users) == 1 result2 = await totp_auth_module.async_setup_user('test-user', {}) assert len(totp_auth_module._users) == 1 assert result != result2 await totp_auth_module.async_depose_user('test-user') assert len(totp_auth_module._users) == 0 result = await totp_auth_module.async_setup_user( 'test-user2', {'secret': 'secret-code'}) assert result == 'secret-code' assert len(totp_auth_module._users) == 1 async def test_login_flow_validates_mfa(hass): """Test login flow with mfa enabled.""" hass.auth = await auth_manager_from_config(hass, [{ 'type': 'insecure_example', 'users': [{'username': 'test-user', 'password': 'test-pass'}], }], [{ 'type': 'totp', }]) user = MockUser( id='mock-user', is_owner=False, is_active=False, name='Paulus', ).add_to_auth_manager(hass.auth) await hass.auth.async_link_user(user, auth_models.Credentials( id='mock-id', auth_provider_type='insecure_example', auth_provider_id=None, data={'username': 'test-user'}, is_new=False, )) await hass.auth.async_enable_user_mfa(user, 'totp', {}) provider = hass.auth.auth_providers[0] result = await hass.auth.login_flow.async_init( (provider.type, provider.id)) assert result['type'] == data_entry_flow.RESULT_TYPE_FORM result = await hass.auth.login_flow.async_configure(result['flow_id'], { 'username': 'incorrect-user', 'password': 'test-pass', }) assert result['type'] == data_entry_flow.RESULT_TYPE_FORM assert result['errors']['base'] == 'invalid_auth' result = await hass.auth.login_flow.async_configure(result['flow_id'], { 'username': 'test-user', 'password': 'incorrect-pass', }) assert result['type'] == data_entry_flow.RESULT_TYPE_FORM assert result['errors']['base'] == 'invalid_auth' result = await hass.auth.login_flow.async_configure(result['flow_id'], { 'username': 'test-user', 'password': 'test-pass', }) assert result['type'] == data_entry_flow.RESULT_TYPE_FORM assert result['step_id'] == 'mfa' assert result['data_schema'].schema.get('code') == str with patch('pyotp.TOTP.verify', return_value=False): result = await hass.auth.login_flow.async_configure( result['flow_id'], {'code': 'invalid-code'}) assert result['type'] == data_entry_flow.RESULT_TYPE_FORM assert result['step_id'] == 'mfa' assert result['errors']['base'] == 'invalid_code' with patch('pyotp.TOTP.verify', return_value=True): result = await hass.auth.login_flow.async_configure( result['flow_id'], {'code': MOCK_CODE}) assert result['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY assert result['data'].id == 'mock-user' async def test_race_condition_in_data_loading(hass): """Test race condition in the data loading.""" counter = 0 async def mock_load(_): """Mock of homeassistant.helpers.storage.Store.async_load.""" nonlocal counter counter += 1 await asyncio.sleep(0) totp_auth_module = await auth_mfa_module_from_config(hass, { 'type': 'totp' }) with patch('homeassistant.helpers.storage.Store.async_load', new=mock_load): task1 = totp_auth_module.async_validate('user', {'code': 'value'}) task2 = totp_auth_module.async_validate('user', {'code': 'value'}) results = await asyncio.gather(task1, task2, return_exceptions=True) assert counter == 1 assert results[0] is False assert results[1] is False