core/tests/auth/test_init.py

638 lines
20 KiB
Python

"""Tests for the Home Assistant auth module."""
from datetime import timedelta
from unittest.mock import Mock, patch
import pytest
from homeassistant import auth, data_entry_flow
from homeassistant.auth import (
models as auth_models, auth_store, const as auth_const)
from homeassistant.auth.mfa_modules import SESSION_EXPIRATION
from homeassistant.util import dt as dt_util
from tests.common import (
MockUser, ensure_auth_manager_loaded, flush_store, CLIENT_ID)
@pytest.fixture
def mock_hass(loop):
"""Hass mock with minimum amount of data set to make it work with auth."""
hass = Mock()
hass.config.skip_pip = True
return hass
async def test_auth_manager_from_config_validates_config_and_id(mock_hass):
"""Test get auth providers."""
manager = await auth.auth_manager_from_config(mock_hass, [{
'name': 'Test Name',
'type': 'insecure_example',
'users': [],
}, {
'name': 'Invalid config because no users',
'type': 'insecure_example',
'id': 'invalid_config',
}, {
'name': 'Test Name 2',
'type': 'insecure_example',
'id': 'another',
'users': [],
}, {
'name': 'Wrong because duplicate ID',
'type': 'insecure_example',
'id': 'another',
'users': [],
}], [])
providers = [{
'name': provider.name,
'id': provider.id,
'type': provider.type,
} for provider in manager.auth_providers]
assert providers == [{
'name': 'Test Name',
'type': 'insecure_example',
'id': None,
}, {
'name': 'Test Name 2',
'type': 'insecure_example',
'id': 'another',
}]
async def test_auth_manager_from_config_auth_modules(mock_hass):
"""Test get auth modules."""
manager = await auth.auth_manager_from_config(mock_hass, [{
'name': 'Test Name',
'type': 'insecure_example',
'users': [],
}, {
'name': 'Test Name 2',
'type': 'insecure_example',
'id': 'another',
'users': [],
}], [{
'name': 'Module 1',
'type': 'insecure_example',
'data': [],
}, {
'name': 'Module 2',
'type': 'insecure_example',
'id': 'another',
'data': [],
}, {
'name': 'Duplicate ID',
'type': 'insecure_example',
'id': 'another',
'data': [],
}])
providers = [{
'name': provider.name,
'type': provider.type,
'id': provider.id,
} for provider in manager.auth_providers]
assert providers == [{
'name': 'Test Name',
'type': 'insecure_example',
'id': None,
}, {
'name': 'Test Name 2',
'type': 'insecure_example',
'id': 'another',
}]
modules = [{
'name': module.name,
'type': module.type,
'id': module.id,
} for module in manager.auth_mfa_modules]
assert modules == [{
'name': 'Module 1',
'type': 'insecure_example',
'id': 'insecure_example',
}, {
'name': 'Module 2',
'type': 'insecure_example',
'id': 'another',
}]
async def test_create_new_user(hass):
"""Test creating new user."""
manager = await auth.auth_manager_from_config(hass, [{
'type': 'insecure_example',
'users': [{
'username': 'test-user',
'password': 'test-pass',
'name': 'Test Name'
}]
}], [])
step = await manager.login_flow.async_init(('insecure_example', None))
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
step = await manager.login_flow.async_configure(step['flow_id'], {
'username': 'test-user',
'password': 'test-pass',
})
assert step['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
user = step['result']
assert user is not None
assert user.is_owner is False
assert user.name == 'Test Name'
async def test_login_as_existing_user(mock_hass):
"""Test login as existing user."""
manager = await auth.auth_manager_from_config(mock_hass, [{
'type': 'insecure_example',
'users': [{
'username': 'test-user',
'password': 'test-pass',
'name': 'Test Name'
}]
}], [])
mock_hass.auth = manager
ensure_auth_manager_loaded(manager)
# Add a fake user that we're not going to log in with
user = MockUser(
id='mock-user2',
is_owner=False,
is_active=False,
name='Not user',
).add_to_auth_manager(manager)
user.credentials.append(auth_models.Credentials(
id='mock-id2',
auth_provider_type='insecure_example',
auth_provider_id=None,
data={'username': 'other-user'},
is_new=False,
))
# Add fake user with credentials for example auth provider.
user = MockUser(
id='mock-user',
is_owner=False,
is_active=False,
name='Paulus',
).add_to_auth_manager(manager)
user.credentials.append(auth_models.Credentials(
id='mock-id',
auth_provider_type='insecure_example',
auth_provider_id=None,
data={'username': 'test-user'},
is_new=False,
))
step = await manager.login_flow.async_init(('insecure_example', None))
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
step = await manager.login_flow.async_configure(step['flow_id'], {
'username': 'test-user',
'password': 'test-pass',
})
assert step['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
user = step['result']
assert user is not None
assert user.id == 'mock-user'
assert user.is_owner is False
assert user.is_active is False
assert user.name == 'Paulus'
async def test_linking_user_to_two_auth_providers(hass, hass_storage):
"""Test linking user to two auth providers."""
manager = await auth.auth_manager_from_config(hass, [{
'type': 'insecure_example',
'users': [{
'username': 'test-user',
'password': 'test-pass',
}]
}, {
'type': 'insecure_example',
'id': 'another-provider',
'users': [{
'username': 'another-user',
'password': 'another-password',
}]
}], [])
step = await manager.login_flow.async_init(('insecure_example', None))
step = await manager.login_flow.async_configure(step['flow_id'], {
'username': 'test-user',
'password': 'test-pass',
})
user = step['result']
assert user is not None
step = await manager.login_flow.async_init(
('insecure_example', 'another-provider'),
context={'credential_only': True})
step = await manager.login_flow.async_configure(step['flow_id'], {
'username': 'another-user',
'password': 'another-password',
})
new_credential = step['result']
await manager.async_link_user(user, new_credential)
assert len(user.credentials) == 2
async def test_saving_loading(hass, hass_storage):
"""Test storing and saving data.
Creates one of each type that we store to test we restore correctly.
"""
manager = await auth.auth_manager_from_config(hass, [{
'type': 'insecure_example',
'users': [{
'username': 'test-user',
'password': 'test-pass',
}]
}], [])
step = await manager.login_flow.async_init(('insecure_example', None))
step = await manager.login_flow.async_configure(step['flow_id'], {
'username': 'test-user',
'password': 'test-pass',
})
user = step['result']
await manager.async_activate_user(user)
await manager.async_create_refresh_token(user, CLIENT_ID)
await flush_store(manager._store._store)
store2 = auth_store.AuthStore(hass)
users = await store2.async_get_users()
assert len(users) == 1
assert users[0] == user
async def test_cannot_retrieve_expired_access_token(hass):
"""Test that we cannot retrieve expired access tokens."""
manager = await auth.auth_manager_from_config(hass, [], [])
user = MockUser().add_to_auth_manager(manager)
refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
assert refresh_token.user.id is user.id
assert refresh_token.client_id == CLIENT_ID
access_token = manager.async_create_access_token(refresh_token)
assert (
await manager.async_validate_access_token(access_token)
is refresh_token
)
with patch('homeassistant.util.dt.utcnow',
return_value=dt_util.utcnow() -
auth_const.ACCESS_TOKEN_EXPIRATION - timedelta(seconds=11)):
access_token = manager.async_create_access_token(refresh_token)
assert (
await manager.async_validate_access_token(access_token)
is None
)
async def test_generating_system_user(hass):
"""Test that we can add a system user."""
manager = await auth.auth_manager_from_config(hass, [], [])
user = await manager.async_create_system_user('Hass.io')
token = await manager.async_create_refresh_token(user)
assert user.system_generated
assert token is not None
assert token.client_id is None
async def test_refresh_token_requires_client_for_user(hass):
"""Test that we can add a system user."""
manager = await auth.auth_manager_from_config(hass, [], [])
user = MockUser().add_to_auth_manager(manager)
assert user.system_generated is False
with pytest.raises(ValueError):
await manager.async_create_refresh_token(user)
token = await manager.async_create_refresh_token(user, CLIENT_ID)
assert token is not None
assert token.client_id == CLIENT_ID
async def test_refresh_token_not_requires_client_for_system_user(hass):
"""Test that we can add a system user."""
manager = await auth.auth_manager_from_config(hass, [], [])
user = await manager.async_create_system_user('Hass.io')
assert user.system_generated is True
with pytest.raises(ValueError):
await manager.async_create_refresh_token(user, CLIENT_ID)
token = await manager.async_create_refresh_token(user)
assert token is not None
assert token.client_id is None
async def test_cannot_deactive_owner(mock_hass):
"""Test that we cannot deactive the owner."""
manager = await auth.auth_manager_from_config(mock_hass, [], [])
owner = MockUser(
is_owner=True,
).add_to_auth_manager(manager)
with pytest.raises(ValueError):
await manager.async_deactivate_user(owner)
async def test_remove_refresh_token(mock_hass):
"""Test that we can remove a refresh token."""
manager = await auth.auth_manager_from_config(mock_hass, [], [])
user = MockUser().add_to_auth_manager(manager)
refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
access_token = manager.async_create_access_token(refresh_token)
await manager.async_remove_refresh_token(refresh_token)
assert (
await manager.async_get_refresh_token(refresh_token.id) is None
)
assert (
await manager.async_validate_access_token(access_token) is None
)
async def test_login_with_auth_module(mock_hass):
"""Test login as existing user with auth module."""
manager = await auth.auth_manager_from_config(mock_hass, [{
'type': 'insecure_example',
'users': [{
'username': 'test-user',
'password': 'test-pass',
'name': 'Test Name'
}],
}], [{
'type': 'insecure_example',
'data': [{
'user_id': 'mock-user',
'pin': 'test-pin'
}]
}])
mock_hass.auth = manager
ensure_auth_manager_loaded(manager)
# Add fake user with credentials for example auth provider.
user = MockUser(
id='mock-user',
is_owner=False,
is_active=False,
name='Paulus',
).add_to_auth_manager(manager)
user.credentials.append(auth_models.Credentials(
id='mock-id',
auth_provider_type='insecure_example',
auth_provider_id=None,
data={'username': 'test-user'},
is_new=False,
))
step = await manager.login_flow.async_init(('insecure_example', None))
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
step = await manager.login_flow.async_configure(step['flow_id'], {
'username': 'test-user',
'password': 'test-pass',
})
# After auth_provider validated, request auth module input form
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'mfa'
step = await manager.login_flow.async_configure(step['flow_id'], {
'pin': 'invalid-pin',
})
# Invalid auth error
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'mfa'
assert step['errors'] == {'base': 'invalid_auth'}
step = await manager.login_flow.async_configure(step['flow_id'], {
'pin': 'test-pin',
})
# Finally passed, get user
assert step['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
user = step['result']
assert user is not None
assert user.id == 'mock-user'
assert user.is_owner is False
assert user.is_active is False
assert user.name == 'Paulus'
async def test_login_with_multi_auth_module(mock_hass):
"""Test login as existing user with multiple auth modules."""
manager = await auth.auth_manager_from_config(mock_hass, [{
'type': 'insecure_example',
'users': [{
'username': 'test-user',
'password': 'test-pass',
'name': 'Test Name'
}],
}], [{
'type': 'insecure_example',
'data': [{
'user_id': 'mock-user',
'pin': 'test-pin'
}]
}, {
'type': 'insecure_example',
'id': 'module2',
'data': [{
'user_id': 'mock-user',
'pin': 'test-pin2'
}]
}])
mock_hass.auth = manager
ensure_auth_manager_loaded(manager)
# Add fake user with credentials for example auth provider.
user = MockUser(
id='mock-user',
is_owner=False,
is_active=False,
name='Paulus',
).add_to_auth_manager(manager)
user.credentials.append(auth_models.Credentials(
id='mock-id',
auth_provider_type='insecure_example',
auth_provider_id=None,
data={'username': 'test-user'},
is_new=False,
))
step = await manager.login_flow.async_init(('insecure_example', None))
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
step = await manager.login_flow.async_configure(step['flow_id'], {
'username': 'test-user',
'password': 'test-pass',
})
# After auth_provider validated, request select auth module
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'select_mfa_module'
step = await manager.login_flow.async_configure(step['flow_id'], {
'multi_factor_auth_module': 'module2',
})
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'mfa'
step = await manager.login_flow.async_configure(step['flow_id'], {
'pin': 'test-pin2',
})
# Finally passed, get user
assert step['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
user = step['result']
assert user is not None
assert user.id == 'mock-user'
assert user.is_owner is False
assert user.is_active is False
assert user.name == 'Paulus'
async def test_auth_module_expired_session(mock_hass):
"""Test login as existing user."""
manager = await auth.auth_manager_from_config(mock_hass, [{
'type': 'insecure_example',
'users': [{
'username': 'test-user',
'password': 'test-pass',
'name': 'Test Name'
}],
}], [{
'type': 'insecure_example',
'data': [{
'user_id': 'mock-user',
'pin': 'test-pin'
}]
}])
mock_hass.auth = manager
ensure_auth_manager_loaded(manager)
# Add fake user with credentials for example auth provider.
user = MockUser(
id='mock-user',
is_owner=False,
is_active=False,
name='Paulus',
).add_to_auth_manager(manager)
user.credentials.append(auth_models.Credentials(
id='mock-id',
auth_provider_type='insecure_example',
auth_provider_id=None,
data={'username': 'test-user'},
is_new=False,
))
step = await manager.login_flow.async_init(('insecure_example', None))
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
step = await manager.login_flow.async_configure(step['flow_id'], {
'username': 'test-user',
'password': 'test-pass',
})
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'mfa'
with patch('homeassistant.util.dt.utcnow',
return_value=dt_util.utcnow() + SESSION_EXPIRATION):
step = await manager.login_flow.async_configure(step['flow_id'], {
'pin': 'test-pin',
})
# Invalid auth due session timeout
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'mfa'
assert step['errors']['base'] == 'login_expired'
# The second try will fail as well
step = await manager.login_flow.async_configure(step['flow_id'], {
'pin': 'test-pin',
})
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'mfa'
assert step['errors']['base'] == 'login_expired'
async def test_enable_mfa_for_user(hass, hass_storage):
"""Test enable mfa module for user."""
manager = await auth.auth_manager_from_config(hass, [{
'type': 'insecure_example',
'users': [{
'username': 'test-user',
'password': 'test-pass',
}]
}], [{
'type': 'insecure_example',
'data': [],
}])
step = await manager.login_flow.async_init(('insecure_example', None))
step = await manager.login_flow.async_configure(step['flow_id'], {
'username': 'test-user',
'password': 'test-pass',
})
user = step['result']
assert user is not None
# new user don't have mfa enabled
modules = await manager.async_get_enabled_mfa(user)
assert len(modules) == 0
module = manager.get_auth_mfa_module('insecure_example')
# mfa module don't have data
assert bool(module._data) is False
# test enable mfa for user
await manager.async_enable_user_mfa(user, 'insecure_example',
{'pin': 'test-pin'})
assert len(module._data) == 1
assert module._data[0] == {'user_id': user.id, 'pin': 'test-pin'}
# test get enabled mfa
modules = await manager.async_get_enabled_mfa(user)
assert len(modules) == 1
assert 'insecure_example' in modules
# re-enable mfa for user will override
await manager.async_enable_user_mfa(user, 'insecure_example',
{'pin': 'test-pin-new'})
assert len(module._data) == 1
assert module._data[0] == {'user_id': user.id, 'pin': 'test-pin-new'}
modules = await manager.async_get_enabled_mfa(user)
assert len(modules) == 1
assert 'insecure_example' in modules
# system user cannot enable mfa
system_user = await manager.async_create_system_user('system-user')
with pytest.raises(ValueError):
await manager.async_enable_user_mfa(system_user, 'insecure_example',
{'pin': 'test-pin'})
assert len(module._data) == 1
modules = await manager.async_get_enabled_mfa(system_user)
assert len(modules) == 0
# disable mfa for user
await manager.async_disable_user_mfa(user, 'insecure_example')
assert bool(module._data) is False
# test get enabled mfa
modules = await manager.async_get_enabled_mfa(user)
assert len(modules) == 0
# disable mfa for user don't enabled just silent fail
await manager.async_disable_user_mfa(user, 'insecure_example')