898 lines
29 KiB
Python
898 lines
29 KiB
Python
"""Tests for the Home Assistant auth module."""
|
|
from datetime import timedelta
|
|
from unittest.mock import Mock, patch
|
|
|
|
import jwt
|
|
import pytest
|
|
import voluptuous as vol
|
|
|
|
from homeassistant import auth, data_entry_flow
|
|
from homeassistant.auth import (
|
|
models as auth_models, auth_store, const as auth_const)
|
|
from homeassistant.auth.const import MFA_SESSION_EXPIRATION
|
|
from homeassistant.core import callback
|
|
from homeassistant.util import dt as dt_util
|
|
from tests.common import (
|
|
MockUser, ensure_auth_manager_loaded, flush_store, CLIENT_ID)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_hass(loop):
|
|
"""Hass mock with minimum amount of data set to make it work with auth."""
|
|
hass = Mock()
|
|
hass.config.skip_pip = True
|
|
return hass
|
|
|
|
|
|
async def test_auth_manager_from_config_validates_config(mock_hass):
|
|
"""Test get auth providers."""
|
|
with pytest.raises(vol.Invalid):
|
|
manager = await auth.auth_manager_from_config(mock_hass, [{
|
|
'name': 'Test Name',
|
|
'type': 'insecure_example',
|
|
'users': [],
|
|
}, {
|
|
'name': 'Invalid config because no users',
|
|
'type': 'insecure_example',
|
|
'id': 'invalid_config',
|
|
}], [])
|
|
|
|
manager = await auth.auth_manager_from_config(mock_hass, [{
|
|
'name': 'Test Name',
|
|
'type': 'insecure_example',
|
|
'users': [],
|
|
}, {
|
|
'name': 'Test Name 2',
|
|
'type': 'insecure_example',
|
|
'id': 'another',
|
|
'users': [],
|
|
}], [])
|
|
|
|
providers = [{
|
|
'name': provider.name,
|
|
'id': provider.id,
|
|
'type': provider.type,
|
|
} for provider in manager.auth_providers]
|
|
|
|
assert providers == [{
|
|
'name': 'Test Name',
|
|
'type': 'insecure_example',
|
|
'id': None,
|
|
}, {
|
|
'name': 'Test Name 2',
|
|
'type': 'insecure_example',
|
|
'id': 'another',
|
|
}]
|
|
|
|
|
|
async def test_auth_manager_from_config_auth_modules(mock_hass):
|
|
"""Test get auth modules."""
|
|
with pytest.raises(vol.Invalid):
|
|
manager = await auth.auth_manager_from_config(mock_hass, [{
|
|
'name': 'Test Name',
|
|
'type': 'insecure_example',
|
|
'users': [],
|
|
}, {
|
|
'name': 'Test Name 2',
|
|
'type': 'insecure_example',
|
|
'id': 'another',
|
|
'users': [],
|
|
}], [{
|
|
'name': 'Module 1',
|
|
'type': 'insecure_example',
|
|
'data': [],
|
|
}, {
|
|
'name': 'Invalid config because no data',
|
|
'type': 'insecure_example',
|
|
'id': 'another',
|
|
}])
|
|
|
|
manager = await auth.auth_manager_from_config(mock_hass, [{
|
|
'name': 'Test Name',
|
|
'type': 'insecure_example',
|
|
'users': [],
|
|
}, {
|
|
'name': 'Test Name 2',
|
|
'type': 'insecure_example',
|
|
'id': 'another',
|
|
'users': [],
|
|
}], [{
|
|
'name': 'Module 1',
|
|
'type': 'insecure_example',
|
|
'data': [],
|
|
}, {
|
|
'name': 'Module 2',
|
|
'type': 'insecure_example',
|
|
'id': 'another',
|
|
'data': [],
|
|
}])
|
|
providers = [{
|
|
'name': provider.name,
|
|
'type': provider.type,
|
|
'id': provider.id,
|
|
} for provider in manager.auth_providers]
|
|
assert providers == [{
|
|
'name': 'Test Name',
|
|
'type': 'insecure_example',
|
|
'id': None,
|
|
}, {
|
|
'name': 'Test Name 2',
|
|
'type': 'insecure_example',
|
|
'id': 'another',
|
|
}]
|
|
|
|
modules = [{
|
|
'name': module.name,
|
|
'type': module.type,
|
|
'id': module.id,
|
|
} for module in manager.auth_mfa_modules]
|
|
assert modules == [{
|
|
'name': 'Module 1',
|
|
'type': 'insecure_example',
|
|
'id': 'insecure_example',
|
|
}, {
|
|
'name': 'Module 2',
|
|
'type': 'insecure_example',
|
|
'id': 'another',
|
|
}]
|
|
|
|
|
|
async def test_create_new_user(hass):
|
|
"""Test creating new user."""
|
|
events = []
|
|
|
|
@callback
|
|
def user_added(event):
|
|
events.append(event)
|
|
|
|
hass.bus.async_listen('user_added', user_added)
|
|
|
|
manager = await auth.auth_manager_from_config(hass, [{
|
|
'type': 'insecure_example',
|
|
'users': [{
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
'name': 'Test Name'
|
|
}]
|
|
}], [])
|
|
|
|
step = await manager.login_flow.async_init(('insecure_example', None))
|
|
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
|
|
|
|
step = await manager.login_flow.async_configure(step['flow_id'], {
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
})
|
|
assert step['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
|
user = step['result']
|
|
assert user is not None
|
|
assert user.is_owner is False
|
|
assert user.name == 'Test Name'
|
|
|
|
await hass.async_block_till_done()
|
|
assert len(events) == 1
|
|
assert events[0].data['user_id'] == user.id
|
|
|
|
|
|
async def test_login_as_existing_user(mock_hass):
|
|
"""Test login as existing user."""
|
|
manager = await auth.auth_manager_from_config(mock_hass, [{
|
|
'type': 'insecure_example',
|
|
'users': [{
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
'name': 'Test Name'
|
|
}]
|
|
}], [])
|
|
mock_hass.auth = manager
|
|
ensure_auth_manager_loaded(manager)
|
|
|
|
# Add a fake user that we're not going to log in with
|
|
user = MockUser(
|
|
id='mock-user2',
|
|
is_owner=False,
|
|
is_active=False,
|
|
name='Not user',
|
|
).add_to_auth_manager(manager)
|
|
user.credentials.append(auth_models.Credentials(
|
|
id='mock-id2',
|
|
auth_provider_type='insecure_example',
|
|
auth_provider_id=None,
|
|
data={'username': 'other-user'},
|
|
is_new=False,
|
|
))
|
|
|
|
# Add fake user with credentials for example auth provider.
|
|
user = MockUser(
|
|
id='mock-user',
|
|
is_owner=False,
|
|
is_active=False,
|
|
name='Paulus',
|
|
).add_to_auth_manager(manager)
|
|
user.credentials.append(auth_models.Credentials(
|
|
id='mock-id',
|
|
auth_provider_type='insecure_example',
|
|
auth_provider_id=None,
|
|
data={'username': 'test-user'},
|
|
is_new=False,
|
|
))
|
|
|
|
step = await manager.login_flow.async_init(('insecure_example', None))
|
|
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
|
|
|
|
step = await manager.login_flow.async_configure(step['flow_id'], {
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
})
|
|
assert step['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
|
|
|
user = step['result']
|
|
assert user is not None
|
|
assert user.id == 'mock-user'
|
|
assert user.is_owner is False
|
|
assert user.is_active is False
|
|
assert user.name == 'Paulus'
|
|
|
|
|
|
async def test_linking_user_to_two_auth_providers(hass, hass_storage):
|
|
"""Test linking user to two auth providers."""
|
|
manager = await auth.auth_manager_from_config(hass, [{
|
|
'type': 'insecure_example',
|
|
'users': [{
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
}]
|
|
}, {
|
|
'type': 'insecure_example',
|
|
'id': 'another-provider',
|
|
'users': [{
|
|
'username': 'another-user',
|
|
'password': 'another-password',
|
|
}]
|
|
}], [])
|
|
|
|
step = await manager.login_flow.async_init(('insecure_example', None))
|
|
step = await manager.login_flow.async_configure(step['flow_id'], {
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
})
|
|
user = step['result']
|
|
assert user is not None
|
|
|
|
step = await manager.login_flow.async_init(
|
|
('insecure_example', 'another-provider'),
|
|
context={'credential_only': True})
|
|
step = await manager.login_flow.async_configure(step['flow_id'], {
|
|
'username': 'another-user',
|
|
'password': 'another-password',
|
|
})
|
|
new_credential = step['result']
|
|
await manager.async_link_user(user, new_credential)
|
|
assert len(user.credentials) == 2
|
|
|
|
|
|
async def test_saving_loading(hass, hass_storage):
|
|
"""Test storing and saving data.
|
|
|
|
Creates one of each type that we store to test we restore correctly.
|
|
"""
|
|
manager = await auth.auth_manager_from_config(hass, [{
|
|
'type': 'insecure_example',
|
|
'users': [{
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
}]
|
|
}], [])
|
|
|
|
step = await manager.login_flow.async_init(('insecure_example', None))
|
|
step = await manager.login_flow.async_configure(step['flow_id'], {
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
})
|
|
user = step['result']
|
|
await manager.async_activate_user(user)
|
|
# the first refresh token will be used to create access token
|
|
refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
|
|
manager.async_create_access_token(refresh_token, '192.168.0.1')
|
|
# the second refresh token will not be used
|
|
await manager.async_create_refresh_token(user, 'dummy-client')
|
|
|
|
await flush_store(manager._store._store)
|
|
|
|
store2 = auth_store.AuthStore(hass)
|
|
users = await store2.async_get_users()
|
|
assert len(users) == 1
|
|
assert users[0].permissions == user.permissions
|
|
assert users[0] == user
|
|
assert len(users[0].refresh_tokens) == 2
|
|
for r_token in users[0].refresh_tokens.values():
|
|
if r_token.client_id == CLIENT_ID:
|
|
# verify the first refresh token
|
|
assert r_token.last_used_at is not None
|
|
assert r_token.last_used_ip == '192.168.0.1'
|
|
elif r_token.client_id == 'dummy-client':
|
|
# verify the second refresh token
|
|
assert r_token.last_used_at is None
|
|
assert r_token.last_used_ip is None
|
|
else:
|
|
assert False, 'Unknown client_id: %s' % r_token.client_id
|
|
|
|
|
|
async def test_cannot_retrieve_expired_access_token(hass):
|
|
"""Test that we cannot retrieve expired access tokens."""
|
|
manager = await auth.auth_manager_from_config(hass, [], [])
|
|
user = MockUser().add_to_auth_manager(manager)
|
|
refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
|
|
assert refresh_token.user.id is user.id
|
|
assert refresh_token.client_id == CLIENT_ID
|
|
|
|
access_token = manager.async_create_access_token(refresh_token)
|
|
assert (
|
|
await manager.async_validate_access_token(access_token)
|
|
is refresh_token
|
|
)
|
|
|
|
with patch('homeassistant.util.dt.utcnow',
|
|
return_value=dt_util.utcnow() -
|
|
auth_const.ACCESS_TOKEN_EXPIRATION - timedelta(seconds=11)):
|
|
access_token = manager.async_create_access_token(refresh_token)
|
|
|
|
assert (
|
|
await manager.async_validate_access_token(access_token)
|
|
is None
|
|
)
|
|
|
|
|
|
async def test_generating_system_user(hass):
|
|
"""Test that we can add a system user."""
|
|
events = []
|
|
|
|
@callback
|
|
def user_added(event):
|
|
events.append(event)
|
|
|
|
hass.bus.async_listen('user_added', user_added)
|
|
|
|
manager = await auth.auth_manager_from_config(hass, [], [])
|
|
user = await manager.async_create_system_user('Hass.io')
|
|
token = await manager.async_create_refresh_token(user)
|
|
assert user.system_generated
|
|
assert token is not None
|
|
assert token.client_id is None
|
|
|
|
await hass.async_block_till_done()
|
|
assert len(events) == 1
|
|
assert events[0].data['user_id'] == user.id
|
|
|
|
|
|
async def test_refresh_token_requires_client_for_user(hass):
|
|
"""Test create refresh token for a user with client_id."""
|
|
manager = await auth.auth_manager_from_config(hass, [], [])
|
|
user = MockUser().add_to_auth_manager(manager)
|
|
assert user.system_generated is False
|
|
|
|
with pytest.raises(ValueError):
|
|
await manager.async_create_refresh_token(user)
|
|
|
|
token = await manager.async_create_refresh_token(user, CLIENT_ID)
|
|
assert token is not None
|
|
assert token.client_id == CLIENT_ID
|
|
assert token.token_type == auth_models.TOKEN_TYPE_NORMAL
|
|
# default access token expiration
|
|
assert token.access_token_expiration == \
|
|
auth_const.ACCESS_TOKEN_EXPIRATION
|
|
|
|
|
|
async def test_refresh_token_not_requires_client_for_system_user(hass):
|
|
"""Test create refresh token for a system user w/o client_id."""
|
|
manager = await auth.auth_manager_from_config(hass, [], [])
|
|
user = await manager.async_create_system_user('Hass.io')
|
|
assert user.system_generated is True
|
|
|
|
with pytest.raises(ValueError):
|
|
await manager.async_create_refresh_token(user, CLIENT_ID)
|
|
|
|
token = await manager.async_create_refresh_token(user)
|
|
assert token is not None
|
|
assert token.client_id is None
|
|
assert token.token_type == auth_models.TOKEN_TYPE_SYSTEM
|
|
|
|
|
|
async def test_refresh_token_with_specific_access_token_expiration(hass):
|
|
"""Test create a refresh token with specific access token expiration."""
|
|
manager = await auth.auth_manager_from_config(hass, [], [])
|
|
user = MockUser().add_to_auth_manager(manager)
|
|
|
|
token = await manager.async_create_refresh_token(
|
|
user, CLIENT_ID,
|
|
access_token_expiration=timedelta(days=100))
|
|
assert token is not None
|
|
assert token.client_id == CLIENT_ID
|
|
assert token.access_token_expiration == timedelta(days=100)
|
|
|
|
|
|
async def test_refresh_token_type(hass):
|
|
"""Test create a refresh token with token type."""
|
|
manager = await auth.auth_manager_from_config(hass, [], [])
|
|
user = MockUser().add_to_auth_manager(manager)
|
|
|
|
with pytest.raises(ValueError):
|
|
await manager.async_create_refresh_token(
|
|
user, CLIENT_ID, token_type=auth_models.TOKEN_TYPE_SYSTEM)
|
|
|
|
token = await manager.async_create_refresh_token(
|
|
user, CLIENT_ID,
|
|
token_type=auth_models.TOKEN_TYPE_NORMAL)
|
|
assert token is not None
|
|
assert token.client_id == CLIENT_ID
|
|
assert token.token_type == auth_models.TOKEN_TYPE_NORMAL
|
|
|
|
|
|
async def test_refresh_token_type_long_lived_access_token(hass):
|
|
"""Test create a refresh token has long-lived access token type."""
|
|
manager = await auth.auth_manager_from_config(hass, [], [])
|
|
user = MockUser().add_to_auth_manager(manager)
|
|
|
|
with pytest.raises(ValueError):
|
|
await manager.async_create_refresh_token(
|
|
user, token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN)
|
|
|
|
token = await manager.async_create_refresh_token(
|
|
user, client_name='GPS LOGGER', client_icon='mdi:home',
|
|
token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN)
|
|
assert token is not None
|
|
assert token.client_id is None
|
|
assert token.client_name == 'GPS LOGGER'
|
|
assert token.client_icon == 'mdi:home'
|
|
assert token.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
|
|
|
|
|
|
async def test_cannot_deactive_owner(mock_hass):
|
|
"""Test that we cannot deactive the owner."""
|
|
manager = await auth.auth_manager_from_config(mock_hass, [], [])
|
|
owner = MockUser(
|
|
is_owner=True,
|
|
).add_to_auth_manager(manager)
|
|
|
|
with pytest.raises(ValueError):
|
|
await manager.async_deactivate_user(owner)
|
|
|
|
|
|
async def test_remove_refresh_token(mock_hass):
|
|
"""Test that we can remove a refresh token."""
|
|
manager = await auth.auth_manager_from_config(mock_hass, [], [])
|
|
user = MockUser().add_to_auth_manager(manager)
|
|
refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
|
|
access_token = manager.async_create_access_token(refresh_token)
|
|
|
|
await manager.async_remove_refresh_token(refresh_token)
|
|
|
|
assert (
|
|
await manager.async_get_refresh_token(refresh_token.id) is None
|
|
)
|
|
assert (
|
|
await manager.async_validate_access_token(access_token) is None
|
|
)
|
|
|
|
|
|
async def test_create_access_token(mock_hass):
|
|
"""Test normal refresh_token's jwt_key keep same after used."""
|
|
manager = await auth.auth_manager_from_config(mock_hass, [], [])
|
|
user = MockUser().add_to_auth_manager(manager)
|
|
refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
|
|
assert refresh_token.token_type == auth_models.TOKEN_TYPE_NORMAL
|
|
jwt_key = refresh_token.jwt_key
|
|
access_token = manager.async_create_access_token(refresh_token)
|
|
assert access_token is not None
|
|
assert refresh_token.jwt_key == jwt_key
|
|
jwt_payload = jwt.decode(access_token, jwt_key, algorithm=['HS256'])
|
|
assert jwt_payload['iss'] == refresh_token.id
|
|
assert jwt_payload['exp'] - jwt_payload['iat'] == \
|
|
timedelta(minutes=30).total_seconds()
|
|
|
|
|
|
async def test_create_long_lived_access_token(mock_hass):
|
|
"""Test refresh_token's jwt_key changed for long-lived access token."""
|
|
manager = await auth.auth_manager_from_config(mock_hass, [], [])
|
|
user = MockUser().add_to_auth_manager(manager)
|
|
refresh_token = await manager.async_create_refresh_token(
|
|
user, client_name='GPS Logger',
|
|
token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
|
|
access_token_expiration=timedelta(days=300))
|
|
assert refresh_token.token_type == \
|
|
auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
|
|
access_token = manager.async_create_access_token(refresh_token)
|
|
jwt_payload = jwt.decode(
|
|
access_token, refresh_token.jwt_key, algorithm=['HS256'])
|
|
assert jwt_payload['iss'] == refresh_token.id
|
|
assert jwt_payload['exp'] - jwt_payload['iat'] == \
|
|
timedelta(days=300).total_seconds()
|
|
|
|
|
|
async def test_one_long_lived_access_token_per_refresh_token(mock_hass):
|
|
"""Test one refresh_token can only have one long-lived access token."""
|
|
manager = await auth.auth_manager_from_config(mock_hass, [], [])
|
|
user = MockUser().add_to_auth_manager(manager)
|
|
refresh_token = await manager.async_create_refresh_token(
|
|
user, client_name='GPS Logger',
|
|
token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
|
|
access_token_expiration=timedelta(days=3000))
|
|
assert refresh_token.token_type == \
|
|
auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
|
|
access_token = manager.async_create_access_token(refresh_token)
|
|
jwt_key = refresh_token.jwt_key
|
|
|
|
rt = await manager.async_validate_access_token(access_token)
|
|
assert rt.id == refresh_token.id
|
|
|
|
with pytest.raises(ValueError):
|
|
await manager.async_create_refresh_token(
|
|
user, client_name='GPS Logger',
|
|
token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
|
|
access_token_expiration=timedelta(days=3000))
|
|
|
|
await manager.async_remove_refresh_token(refresh_token)
|
|
assert refresh_token.id not in user.refresh_tokens
|
|
rt = await manager.async_validate_access_token(access_token)
|
|
assert rt is None, 'Previous issued access token has been invoked'
|
|
|
|
refresh_token_2 = await manager.async_create_refresh_token(
|
|
user, client_name='GPS Logger',
|
|
token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
|
|
access_token_expiration=timedelta(days=3000))
|
|
assert refresh_token_2.id != refresh_token.id
|
|
assert refresh_token_2.token_type == \
|
|
auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
|
|
access_token_2 = manager.async_create_access_token(refresh_token_2)
|
|
jwt_key_2 = refresh_token_2.jwt_key
|
|
|
|
assert access_token != access_token_2
|
|
assert jwt_key != jwt_key_2
|
|
|
|
rt = await manager.async_validate_access_token(access_token_2)
|
|
jwt_payload = jwt.decode(
|
|
access_token_2, rt.jwt_key, algorithm=['HS256'])
|
|
assert jwt_payload['iss'] == refresh_token_2.id
|
|
assert jwt_payload['exp'] - jwt_payload['iat'] == \
|
|
timedelta(days=3000).total_seconds()
|
|
|
|
|
|
async def test_login_with_auth_module(mock_hass):
|
|
"""Test login as existing user with auth module."""
|
|
manager = await auth.auth_manager_from_config(mock_hass, [{
|
|
'type': 'insecure_example',
|
|
'users': [{
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
'name': 'Test Name'
|
|
}],
|
|
}], [{
|
|
'type': 'insecure_example',
|
|
'data': [{
|
|
'user_id': 'mock-user',
|
|
'pin': 'test-pin'
|
|
}]
|
|
}])
|
|
mock_hass.auth = manager
|
|
ensure_auth_manager_loaded(manager)
|
|
|
|
# Add fake user with credentials for example auth provider.
|
|
user = MockUser(
|
|
id='mock-user',
|
|
is_owner=False,
|
|
is_active=False,
|
|
name='Paulus',
|
|
).add_to_auth_manager(manager)
|
|
user.credentials.append(auth_models.Credentials(
|
|
id='mock-id',
|
|
auth_provider_type='insecure_example',
|
|
auth_provider_id=None,
|
|
data={'username': 'test-user'},
|
|
is_new=False,
|
|
))
|
|
|
|
step = await manager.login_flow.async_init(('insecure_example', None))
|
|
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
|
|
|
|
step = await manager.login_flow.async_configure(step['flow_id'], {
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
})
|
|
|
|
# After auth_provider validated, request auth module input form
|
|
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
|
|
assert step['step_id'] == 'mfa'
|
|
|
|
step = await manager.login_flow.async_configure(step['flow_id'], {
|
|
'pin': 'invalid-pin',
|
|
})
|
|
|
|
# Invalid code error
|
|
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
|
|
assert step['step_id'] == 'mfa'
|
|
assert step['errors'] == {'base': 'invalid_code'}
|
|
|
|
step = await manager.login_flow.async_configure(step['flow_id'], {
|
|
'pin': 'test-pin',
|
|
})
|
|
|
|
# Finally passed, get user
|
|
assert step['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
|
user = step['result']
|
|
assert user is not None
|
|
assert user.id == 'mock-user'
|
|
assert user.is_owner is False
|
|
assert user.is_active is False
|
|
assert user.name == 'Paulus'
|
|
|
|
|
|
async def test_login_with_multi_auth_module(mock_hass):
|
|
"""Test login as existing user with multiple auth modules."""
|
|
manager = await auth.auth_manager_from_config(mock_hass, [{
|
|
'type': 'insecure_example',
|
|
'users': [{
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
'name': 'Test Name'
|
|
}],
|
|
}], [{
|
|
'type': 'insecure_example',
|
|
'data': [{
|
|
'user_id': 'mock-user',
|
|
'pin': 'test-pin'
|
|
}]
|
|
}, {
|
|
'type': 'insecure_example',
|
|
'id': 'module2',
|
|
'data': [{
|
|
'user_id': 'mock-user',
|
|
'pin': 'test-pin2'
|
|
}]
|
|
}])
|
|
mock_hass.auth = manager
|
|
ensure_auth_manager_loaded(manager)
|
|
|
|
# Add fake user with credentials for example auth provider.
|
|
user = MockUser(
|
|
id='mock-user',
|
|
is_owner=False,
|
|
is_active=False,
|
|
name='Paulus',
|
|
).add_to_auth_manager(manager)
|
|
user.credentials.append(auth_models.Credentials(
|
|
id='mock-id',
|
|
auth_provider_type='insecure_example',
|
|
auth_provider_id=None,
|
|
data={'username': 'test-user'},
|
|
is_new=False,
|
|
))
|
|
|
|
step = await manager.login_flow.async_init(('insecure_example', None))
|
|
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
|
|
|
|
step = await manager.login_flow.async_configure(step['flow_id'], {
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
})
|
|
|
|
# After auth_provider validated, request select auth module
|
|
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
|
|
assert step['step_id'] == 'select_mfa_module'
|
|
|
|
step = await manager.login_flow.async_configure(step['flow_id'], {
|
|
'multi_factor_auth_module': 'module2',
|
|
})
|
|
|
|
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
|
|
assert step['step_id'] == 'mfa'
|
|
|
|
step = await manager.login_flow.async_configure(step['flow_id'], {
|
|
'pin': 'test-pin2',
|
|
})
|
|
|
|
# Finally passed, get user
|
|
assert step['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
|
|
user = step['result']
|
|
assert user is not None
|
|
assert user.id == 'mock-user'
|
|
assert user.is_owner is False
|
|
assert user.is_active is False
|
|
assert user.name == 'Paulus'
|
|
|
|
|
|
async def test_auth_module_expired_session(mock_hass):
|
|
"""Test login as existing user."""
|
|
manager = await auth.auth_manager_from_config(mock_hass, [{
|
|
'type': 'insecure_example',
|
|
'users': [{
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
'name': 'Test Name'
|
|
}],
|
|
}], [{
|
|
'type': 'insecure_example',
|
|
'data': [{
|
|
'user_id': 'mock-user',
|
|
'pin': 'test-pin'
|
|
}]
|
|
}])
|
|
mock_hass.auth = manager
|
|
ensure_auth_manager_loaded(manager)
|
|
|
|
# Add fake user with credentials for example auth provider.
|
|
user = MockUser(
|
|
id='mock-user',
|
|
is_owner=False,
|
|
is_active=False,
|
|
name='Paulus',
|
|
).add_to_auth_manager(manager)
|
|
user.credentials.append(auth_models.Credentials(
|
|
id='mock-id',
|
|
auth_provider_type='insecure_example',
|
|
auth_provider_id=None,
|
|
data={'username': 'test-user'},
|
|
is_new=False,
|
|
))
|
|
|
|
step = await manager.login_flow.async_init(('insecure_example', None))
|
|
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
|
|
|
|
step = await manager.login_flow.async_configure(step['flow_id'], {
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
})
|
|
|
|
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
|
|
assert step['step_id'] == 'mfa'
|
|
|
|
with patch('homeassistant.util.dt.utcnow',
|
|
return_value=dt_util.utcnow() + MFA_SESSION_EXPIRATION):
|
|
step = await manager.login_flow.async_configure(step['flow_id'], {
|
|
'pin': 'test-pin',
|
|
})
|
|
# login flow abort due session timeout
|
|
assert step['type'] == data_entry_flow.RESULT_TYPE_ABORT
|
|
assert step['reason'] == 'login_expired'
|
|
|
|
|
|
async def test_enable_mfa_for_user(hass, hass_storage):
|
|
"""Test enable mfa module for user."""
|
|
manager = await auth.auth_manager_from_config(hass, [{
|
|
'type': 'insecure_example',
|
|
'users': [{
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
}]
|
|
}], [{
|
|
'type': 'insecure_example',
|
|
'data': [],
|
|
}])
|
|
|
|
step = await manager.login_flow.async_init(('insecure_example', None))
|
|
step = await manager.login_flow.async_configure(step['flow_id'], {
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
})
|
|
user = step['result']
|
|
assert user is not None
|
|
|
|
# new user don't have mfa enabled
|
|
modules = await manager.async_get_enabled_mfa(user)
|
|
assert len(modules) == 0
|
|
|
|
module = manager.get_auth_mfa_module('insecure_example')
|
|
# mfa module don't have data
|
|
assert bool(module._data) is False
|
|
|
|
# test enable mfa for user
|
|
await manager.async_enable_user_mfa(user, 'insecure_example',
|
|
{'pin': 'test-pin'})
|
|
assert len(module._data) == 1
|
|
assert module._data[0] == {'user_id': user.id, 'pin': 'test-pin'}
|
|
|
|
# test get enabled mfa
|
|
modules = await manager.async_get_enabled_mfa(user)
|
|
assert len(modules) == 1
|
|
assert 'insecure_example' in modules
|
|
|
|
# re-enable mfa for user will override
|
|
await manager.async_enable_user_mfa(user, 'insecure_example',
|
|
{'pin': 'test-pin-new'})
|
|
assert len(module._data) == 1
|
|
assert module._data[0] == {'user_id': user.id, 'pin': 'test-pin-new'}
|
|
modules = await manager.async_get_enabled_mfa(user)
|
|
assert len(modules) == 1
|
|
assert 'insecure_example' in modules
|
|
|
|
# system user cannot enable mfa
|
|
system_user = await manager.async_create_system_user('system-user')
|
|
with pytest.raises(ValueError):
|
|
await manager.async_enable_user_mfa(system_user, 'insecure_example',
|
|
{'pin': 'test-pin'})
|
|
assert len(module._data) == 1
|
|
modules = await manager.async_get_enabled_mfa(system_user)
|
|
assert len(modules) == 0
|
|
|
|
# disable mfa for user
|
|
await manager.async_disable_user_mfa(user, 'insecure_example')
|
|
assert bool(module._data) is False
|
|
|
|
# test get enabled mfa
|
|
modules = await manager.async_get_enabled_mfa(user)
|
|
assert len(modules) == 0
|
|
|
|
# disable mfa for user don't enabled just silent fail
|
|
await manager.async_disable_user_mfa(user, 'insecure_example')
|
|
|
|
|
|
async def test_async_remove_user(hass):
|
|
"""Test removing a user."""
|
|
events = []
|
|
|
|
@callback
|
|
def user_removed(event):
|
|
events.append(event)
|
|
|
|
hass.bus.async_listen('user_removed', user_removed)
|
|
|
|
manager = await auth.auth_manager_from_config(hass, [{
|
|
'type': 'insecure_example',
|
|
'users': [{
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
'name': 'Test Name'
|
|
}]
|
|
}], [])
|
|
hass.auth = manager
|
|
ensure_auth_manager_loaded(manager)
|
|
|
|
# Add fake user with credentials for example auth provider.
|
|
user = MockUser(
|
|
id='mock-user',
|
|
is_owner=False,
|
|
is_active=False,
|
|
name='Paulus',
|
|
).add_to_auth_manager(manager)
|
|
user.credentials.append(auth_models.Credentials(
|
|
id='mock-id',
|
|
auth_provider_type='insecure_example',
|
|
auth_provider_id=None,
|
|
data={'username': 'test-user'},
|
|
is_new=False,
|
|
))
|
|
assert len(user.credentials) == 1
|
|
|
|
await hass.auth.async_remove_user(user)
|
|
|
|
assert len(await manager.async_get_users()) == 0
|
|
assert len(user.credentials) == 0
|
|
|
|
await hass.async_block_till_done()
|
|
assert len(events) == 1
|
|
assert events[0].data['user_id'] == user.id
|
|
|
|
|
|
async def test_new_users_admin(mock_hass):
|
|
"""Test newly created users are admin."""
|
|
manager = await auth.auth_manager_from_config(mock_hass, [{
|
|
'type': 'insecure_example',
|
|
'users': [{
|
|
'username': 'test-user',
|
|
'password': 'test-pass',
|
|
'name': 'Test Name'
|
|
}]
|
|
}], [])
|
|
ensure_auth_manager_loaded(manager)
|
|
|
|
user = await manager.async_create_user('Hello')
|
|
assert user.is_admin
|
|
|
|
user_cred = await manager.async_get_or_create_user(auth_models.Credentials(
|
|
id='mock-id',
|
|
auth_provider_type='insecure_example',
|
|
auth_provider_id=None,
|
|
data={'username': 'test-user'},
|
|
is_new=True,
|
|
))
|
|
assert user_cred.is_admin
|