1100 lines
		
	
	
		
			36 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			1100 lines
		
	
	
		
			36 KiB
		
	
	
	
		
			Python
		
	
	
"""Tests for the Home Assistant auth module."""
 | 
						|
from datetime import timedelta
 | 
						|
from unittest.mock import Mock, patch
 | 
						|
 | 
						|
import jwt
 | 
						|
import pytest
 | 
						|
import voluptuous as vol
 | 
						|
 | 
						|
from homeassistant import auth, data_entry_flow
 | 
						|
from homeassistant.auth import (
 | 
						|
    InvalidAuthError,
 | 
						|
    auth_store,
 | 
						|
    const as auth_const,
 | 
						|
    models as auth_models,
 | 
						|
)
 | 
						|
from homeassistant.auth.const import GROUP_ID_ADMIN, MFA_SESSION_EXPIRATION
 | 
						|
from homeassistant.core import callback
 | 
						|
from homeassistant.util import dt as dt_util
 | 
						|
 | 
						|
from tests.common import (
 | 
						|
    CLIENT_ID,
 | 
						|
    MockUser,
 | 
						|
    async_capture_events,
 | 
						|
    ensure_auth_manager_loaded,
 | 
						|
    flush_store,
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def mock_hass(loop):
 | 
						|
    """Home Assistant mock with minimum amount of data set to make it work with auth."""
 | 
						|
    hass = Mock()
 | 
						|
    hass.config.skip_pip = True
 | 
						|
    return hass
 | 
						|
 | 
						|
 | 
						|
async def test_auth_manager_from_config_validates_config(mock_hass):
 | 
						|
    """Test get auth providers."""
 | 
						|
    with pytest.raises(vol.Invalid):
 | 
						|
        manager = await auth.auth_manager_from_config(
 | 
						|
            mock_hass,
 | 
						|
            [
 | 
						|
                {"name": "Test Name", "type": "insecure_example", "users": []},
 | 
						|
                {
 | 
						|
                    "name": "Invalid configuration because no users",
 | 
						|
                    "type": "insecure_example",
 | 
						|
                    "id": "invalid_config",
 | 
						|
                },
 | 
						|
            ],
 | 
						|
            [],
 | 
						|
        )
 | 
						|
 | 
						|
    manager = await auth.auth_manager_from_config(
 | 
						|
        mock_hass,
 | 
						|
        [
 | 
						|
            {"name": "Test Name", "type": "insecure_example", "users": []},
 | 
						|
            {
 | 
						|
                "name": "Test Name 2",
 | 
						|
                "type": "insecure_example",
 | 
						|
                "id": "another",
 | 
						|
                "users": [],
 | 
						|
            },
 | 
						|
        ],
 | 
						|
        [],
 | 
						|
    )
 | 
						|
 | 
						|
    providers = [
 | 
						|
        {"name": provider.name, "id": provider.id, "type": provider.type}
 | 
						|
        for provider in manager.auth_providers
 | 
						|
    ]
 | 
						|
 | 
						|
    assert providers == [
 | 
						|
        {"name": "Test Name", "type": "insecure_example", "id": None},
 | 
						|
        {"name": "Test Name 2", "type": "insecure_example", "id": "another"},
 | 
						|
    ]
 | 
						|
 | 
						|
 | 
						|
async def test_auth_manager_from_config_auth_modules(mock_hass):
 | 
						|
    """Test get auth modules."""
 | 
						|
    with pytest.raises(vol.Invalid):
 | 
						|
        manager = await auth.auth_manager_from_config(
 | 
						|
            mock_hass,
 | 
						|
            [
 | 
						|
                {"name": "Test Name", "type": "insecure_example", "users": []},
 | 
						|
                {
 | 
						|
                    "name": "Test Name 2",
 | 
						|
                    "type": "insecure_example",
 | 
						|
                    "id": "another",
 | 
						|
                    "users": [],
 | 
						|
                },
 | 
						|
            ],
 | 
						|
            [
 | 
						|
                {"name": "Module 1", "type": "insecure_example", "data": []},
 | 
						|
                {
 | 
						|
                    "name": "Invalid configuration because no data",
 | 
						|
                    "type": "insecure_example",
 | 
						|
                    "id": "another",
 | 
						|
                },
 | 
						|
            ],
 | 
						|
        )
 | 
						|
 | 
						|
    manager = await auth.auth_manager_from_config(
 | 
						|
        mock_hass,
 | 
						|
        [
 | 
						|
            {"name": "Test Name", "type": "insecure_example", "users": []},
 | 
						|
            {
 | 
						|
                "name": "Test Name 2",
 | 
						|
                "type": "insecure_example",
 | 
						|
                "id": "another",
 | 
						|
                "users": [],
 | 
						|
            },
 | 
						|
        ],
 | 
						|
        [
 | 
						|
            {"name": "Module 1", "type": "insecure_example", "data": []},
 | 
						|
            {
 | 
						|
                "name": "Module 2",
 | 
						|
                "type": "insecure_example",
 | 
						|
                "id": "another",
 | 
						|
                "data": [],
 | 
						|
            },
 | 
						|
        ],
 | 
						|
    )
 | 
						|
    providers = [
 | 
						|
        {"name": provider.name, "type": provider.type, "id": provider.id}
 | 
						|
        for provider in manager.auth_providers
 | 
						|
    ]
 | 
						|
    assert providers == [
 | 
						|
        {"name": "Test Name", "type": "insecure_example", "id": None},
 | 
						|
        {"name": "Test Name 2", "type": "insecure_example", "id": "another"},
 | 
						|
    ]
 | 
						|
 | 
						|
    modules = [
 | 
						|
        {"name": module.name, "type": module.type, "id": module.id}
 | 
						|
        for module in manager.auth_mfa_modules
 | 
						|
    ]
 | 
						|
    assert modules == [
 | 
						|
        {"name": "Module 1", "type": "insecure_example", "id": "insecure_example"},
 | 
						|
        {"name": "Module 2", "type": "insecure_example", "id": "another"},
 | 
						|
    ]
 | 
						|
 | 
						|
 | 
						|
async def test_create_new_user(hass):
 | 
						|
    """Test creating new user."""
 | 
						|
    events = []
 | 
						|
 | 
						|
    @callback
 | 
						|
    def user_added(event):
 | 
						|
        events.append(event)
 | 
						|
 | 
						|
    hass.bus.async_listen("user_added", user_added)
 | 
						|
 | 
						|
    manager = await auth.auth_manager_from_config(
 | 
						|
        hass,
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "users": [
 | 
						|
                    {
 | 
						|
                        "username": "test-user",
 | 
						|
                        "password": "test-pass",
 | 
						|
                        "name": "Test Name",
 | 
						|
                    }
 | 
						|
                ],
 | 
						|
            }
 | 
						|
        ],
 | 
						|
        [],
 | 
						|
    )
 | 
						|
 | 
						|
    step = await manager.login_flow.async_init(("insecure_example", None))
 | 
						|
    assert step["type"] == data_entry_flow.RESULT_TYPE_FORM
 | 
						|
 | 
						|
    step = await manager.login_flow.async_configure(
 | 
						|
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
 | 
						|
    )
 | 
						|
    assert step["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
 | 
						|
    credential = step["result"]
 | 
						|
    assert credential is not None
 | 
						|
 | 
						|
    user = await manager.async_get_or_create_user(credential)
 | 
						|
    assert user is not None
 | 
						|
    assert user.is_owner is False
 | 
						|
    assert user.name == "Test Name"
 | 
						|
 | 
						|
    await hass.async_block_till_done()
 | 
						|
    assert len(events) == 1
 | 
						|
    assert events[0].data["user_id"] == user.id
 | 
						|
 | 
						|
 | 
						|
async def test_login_as_existing_user(mock_hass):
 | 
						|
    """Test login as existing user."""
 | 
						|
    manager = await auth.auth_manager_from_config(
 | 
						|
        mock_hass,
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "users": [
 | 
						|
                    {
 | 
						|
                        "username": "test-user",
 | 
						|
                        "password": "test-pass",
 | 
						|
                        "name": "Test Name",
 | 
						|
                    }
 | 
						|
                ],
 | 
						|
            }
 | 
						|
        ],
 | 
						|
        [],
 | 
						|
    )
 | 
						|
    mock_hass.auth = manager
 | 
						|
    ensure_auth_manager_loaded(manager)
 | 
						|
 | 
						|
    # Add a fake user that we're not going to log in with
 | 
						|
    user = MockUser(
 | 
						|
        id="mock-user2", is_owner=False, is_active=False, name="Not user"
 | 
						|
    ).add_to_auth_manager(manager)
 | 
						|
    user.credentials.append(
 | 
						|
        auth_models.Credentials(
 | 
						|
            id="mock-id2",
 | 
						|
            auth_provider_type="insecure_example",
 | 
						|
            auth_provider_id=None,
 | 
						|
            data={"username": "other-user"},
 | 
						|
            is_new=False,
 | 
						|
        )
 | 
						|
    )
 | 
						|
 | 
						|
    # Add fake user with credentials for example auth provider.
 | 
						|
    user = MockUser(
 | 
						|
        id="mock-user", is_owner=False, is_active=False, name="Paulus"
 | 
						|
    ).add_to_auth_manager(manager)
 | 
						|
    user.credentials.append(
 | 
						|
        auth_models.Credentials(
 | 
						|
            id="mock-id",
 | 
						|
            auth_provider_type="insecure_example",
 | 
						|
            auth_provider_id=None,
 | 
						|
            data={"username": "test-user"},
 | 
						|
            is_new=False,
 | 
						|
        )
 | 
						|
    )
 | 
						|
 | 
						|
    step = await manager.login_flow.async_init(("insecure_example", None))
 | 
						|
    assert step["type"] == data_entry_flow.RESULT_TYPE_FORM
 | 
						|
 | 
						|
    step = await manager.login_flow.async_configure(
 | 
						|
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
 | 
						|
    )
 | 
						|
    assert step["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
 | 
						|
 | 
						|
    credential = step["result"]
 | 
						|
    user = await manager.async_get_user_by_credentials(credential)
 | 
						|
    assert user is not None
 | 
						|
    assert user.id == "mock-user"
 | 
						|
    assert user.is_owner is False
 | 
						|
    assert user.is_active is False
 | 
						|
    assert user.name == "Paulus"
 | 
						|
 | 
						|
 | 
						|
async def test_linking_user_to_two_auth_providers(hass, hass_storage):
 | 
						|
    """Test linking user to two auth providers."""
 | 
						|
    manager = await auth.auth_manager_from_config(
 | 
						|
        hass,
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "users": [{"username": "test-user", "password": "test-pass"}],
 | 
						|
            },
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "id": "another-provider",
 | 
						|
                "users": [{"username": "another-user", "password": "another-password"}],
 | 
						|
            },
 | 
						|
        ],
 | 
						|
        [],
 | 
						|
    )
 | 
						|
 | 
						|
    step = await manager.login_flow.async_init(("insecure_example", None))
 | 
						|
    step = await manager.login_flow.async_configure(
 | 
						|
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
 | 
						|
    )
 | 
						|
    credential = step["result"]
 | 
						|
    user = await manager.async_get_or_create_user(credential)
 | 
						|
    assert user is not None
 | 
						|
 | 
						|
    step = await manager.login_flow.async_init(
 | 
						|
        ("insecure_example", "another-provider"), context={"credential_only": True}
 | 
						|
    )
 | 
						|
    step = await manager.login_flow.async_configure(
 | 
						|
        step["flow_id"], {"username": "another-user", "password": "another-password"}
 | 
						|
    )
 | 
						|
    new_credential = step["result"]
 | 
						|
    await manager.async_link_user(user, new_credential)
 | 
						|
    assert len(user.credentials) == 2
 | 
						|
 | 
						|
    # Linking it again to same user is a no-op
 | 
						|
    await manager.async_link_user(user, new_credential)
 | 
						|
    assert len(user.credentials) == 2
 | 
						|
 | 
						|
    # Linking a credential to a user while the credential is already linked to another user should raise
 | 
						|
    user_2 = await manager.async_create_user("User 2")
 | 
						|
    with pytest.raises(ValueError):
 | 
						|
        await manager.async_link_user(user_2, new_credential)
 | 
						|
    assert len(user_2.credentials) == 0
 | 
						|
 | 
						|
 | 
						|
async def test_saving_loading(hass, hass_storage):
 | 
						|
    """Test storing and saving data.
 | 
						|
 | 
						|
    Creates one of each type that we store to test we restore correctly.
 | 
						|
    """
 | 
						|
    manager = await auth.auth_manager_from_config(
 | 
						|
        hass,
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "users": [{"username": "test-user", "password": "test-pass"}],
 | 
						|
            }
 | 
						|
        ],
 | 
						|
        [],
 | 
						|
    )
 | 
						|
 | 
						|
    step = await manager.login_flow.async_init(("insecure_example", None))
 | 
						|
    step = await manager.login_flow.async_configure(
 | 
						|
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
 | 
						|
    )
 | 
						|
    credential = step["result"]
 | 
						|
    user = await manager.async_get_or_create_user(credential)
 | 
						|
 | 
						|
    await manager.async_activate_user(user)
 | 
						|
    # the first refresh token will be used to create access token
 | 
						|
    refresh_token = await manager.async_create_refresh_token(
 | 
						|
        user, CLIENT_ID, credential=credential
 | 
						|
    )
 | 
						|
    manager.async_create_access_token(refresh_token, "192.168.0.1")
 | 
						|
    # the second refresh token will not be used
 | 
						|
    await manager.async_create_refresh_token(
 | 
						|
        user, "dummy-client", credential=credential
 | 
						|
    )
 | 
						|
 | 
						|
    await flush_store(manager._store._store)
 | 
						|
 | 
						|
    store2 = auth_store.AuthStore(hass)
 | 
						|
    users = await store2.async_get_users()
 | 
						|
    assert len(users) == 1
 | 
						|
    assert users[0].permissions == user.permissions
 | 
						|
    assert users[0] == user
 | 
						|
    assert len(users[0].refresh_tokens) == 2
 | 
						|
    for r_token in users[0].refresh_tokens.values():
 | 
						|
        if r_token.client_id == CLIENT_ID:
 | 
						|
            # verify the first refresh token
 | 
						|
            assert r_token.last_used_at is not None
 | 
						|
            assert r_token.last_used_ip == "192.168.0.1"
 | 
						|
        elif r_token.client_id == "dummy-client":
 | 
						|
            # verify the second refresh token
 | 
						|
            assert r_token.last_used_at is None
 | 
						|
            assert r_token.last_used_ip is None
 | 
						|
        else:
 | 
						|
            assert False, f"Unknown client_id: {r_token.client_id}"
 | 
						|
 | 
						|
 | 
						|
async def test_cannot_retrieve_expired_access_token(hass):
 | 
						|
    """Test that we cannot retrieve expired access tokens."""
 | 
						|
    manager = await auth.auth_manager_from_config(hass, [], [])
 | 
						|
    user = MockUser().add_to_auth_manager(manager)
 | 
						|
    refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
 | 
						|
    assert refresh_token.user.id is user.id
 | 
						|
    assert refresh_token.client_id == CLIENT_ID
 | 
						|
 | 
						|
    access_token = manager.async_create_access_token(refresh_token)
 | 
						|
    assert await manager.async_validate_access_token(access_token) is refresh_token
 | 
						|
 | 
						|
    with patch(
 | 
						|
        "homeassistant.util.dt.utcnow",
 | 
						|
        return_value=dt_util.utcnow()
 | 
						|
        - auth_const.ACCESS_TOKEN_EXPIRATION
 | 
						|
        - timedelta(seconds=11),
 | 
						|
    ):
 | 
						|
        access_token = manager.async_create_access_token(refresh_token)
 | 
						|
 | 
						|
    assert await manager.async_validate_access_token(access_token) is None
 | 
						|
 | 
						|
 | 
						|
async def test_generating_system_user(hass):
 | 
						|
    """Test that we can add a system user."""
 | 
						|
    events = []
 | 
						|
 | 
						|
    @callback
 | 
						|
    def user_added(event):
 | 
						|
        events.append(event)
 | 
						|
 | 
						|
    hass.bus.async_listen("user_added", user_added)
 | 
						|
 | 
						|
    manager = await auth.auth_manager_from_config(hass, [], [])
 | 
						|
    user = await manager.async_create_system_user("Hass.io")
 | 
						|
    token = await manager.async_create_refresh_token(user)
 | 
						|
    assert user.system_generated
 | 
						|
    assert user.groups == []
 | 
						|
    assert not user.local_only
 | 
						|
    assert token is not None
 | 
						|
    assert token.client_id is None
 | 
						|
 | 
						|
    await hass.async_block_till_done()
 | 
						|
    assert len(events) == 1
 | 
						|
    assert events[0].data["user_id"] == user.id
 | 
						|
 | 
						|
    # Passing arguments
 | 
						|
    user = await manager.async_create_system_user(
 | 
						|
        "Hass.io", group_ids=[GROUP_ID_ADMIN], local_only=True
 | 
						|
    )
 | 
						|
    token = await manager.async_create_refresh_token(user)
 | 
						|
    assert user.system_generated
 | 
						|
    assert user.is_admin
 | 
						|
    assert user.local_only
 | 
						|
    assert token is not None
 | 
						|
    assert token.client_id is None
 | 
						|
 | 
						|
    await hass.async_block_till_done()
 | 
						|
    assert len(events) == 2
 | 
						|
    assert events[1].data["user_id"] == user.id
 | 
						|
 | 
						|
 | 
						|
async def test_refresh_token_requires_client_for_user(hass):
 | 
						|
    """Test create refresh token for a user with client_id."""
 | 
						|
    manager = await auth.auth_manager_from_config(hass, [], [])
 | 
						|
    user = MockUser().add_to_auth_manager(manager)
 | 
						|
    assert user.system_generated is False
 | 
						|
 | 
						|
    with pytest.raises(ValueError):
 | 
						|
        await manager.async_create_refresh_token(user)
 | 
						|
 | 
						|
    token = await manager.async_create_refresh_token(user, CLIENT_ID)
 | 
						|
    assert token is not None
 | 
						|
    assert token.client_id == CLIENT_ID
 | 
						|
    assert token.token_type == auth_models.TOKEN_TYPE_NORMAL
 | 
						|
    # default access token expiration
 | 
						|
    assert token.access_token_expiration == auth_const.ACCESS_TOKEN_EXPIRATION
 | 
						|
 | 
						|
 | 
						|
async def test_refresh_token_not_requires_client_for_system_user(hass):
 | 
						|
    """Test create refresh token for a system user w/o client_id."""
 | 
						|
    manager = await auth.auth_manager_from_config(hass, [], [])
 | 
						|
    user = await manager.async_create_system_user("Hass.io")
 | 
						|
    assert user.system_generated is True
 | 
						|
 | 
						|
    with pytest.raises(ValueError):
 | 
						|
        await manager.async_create_refresh_token(user, CLIENT_ID)
 | 
						|
 | 
						|
    token = await manager.async_create_refresh_token(user)
 | 
						|
    assert token is not None
 | 
						|
    assert token.client_id is None
 | 
						|
    assert token.token_type == auth_models.TOKEN_TYPE_SYSTEM
 | 
						|
 | 
						|
 | 
						|
async def test_refresh_token_with_specific_access_token_expiration(hass):
 | 
						|
    """Test create a refresh token with specific access token expiration."""
 | 
						|
    manager = await auth.auth_manager_from_config(hass, [], [])
 | 
						|
    user = MockUser().add_to_auth_manager(manager)
 | 
						|
 | 
						|
    token = await manager.async_create_refresh_token(
 | 
						|
        user, CLIENT_ID, access_token_expiration=timedelta(days=100)
 | 
						|
    )
 | 
						|
    assert token is not None
 | 
						|
    assert token.client_id == CLIENT_ID
 | 
						|
    assert token.access_token_expiration == timedelta(days=100)
 | 
						|
 | 
						|
 | 
						|
async def test_refresh_token_type(hass):
 | 
						|
    """Test create a refresh token with token type."""
 | 
						|
    manager = await auth.auth_manager_from_config(hass, [], [])
 | 
						|
    user = MockUser().add_to_auth_manager(manager)
 | 
						|
 | 
						|
    with pytest.raises(ValueError):
 | 
						|
        await manager.async_create_refresh_token(
 | 
						|
            user, CLIENT_ID, token_type=auth_models.TOKEN_TYPE_SYSTEM
 | 
						|
        )
 | 
						|
 | 
						|
    token = await manager.async_create_refresh_token(
 | 
						|
        user, CLIENT_ID, token_type=auth_models.TOKEN_TYPE_NORMAL
 | 
						|
    )
 | 
						|
    assert token is not None
 | 
						|
    assert token.client_id == CLIENT_ID
 | 
						|
    assert token.token_type == auth_models.TOKEN_TYPE_NORMAL
 | 
						|
 | 
						|
 | 
						|
async def test_refresh_token_type_long_lived_access_token(hass):
 | 
						|
    """Test create a refresh token has long-lived access token type."""
 | 
						|
    manager = await auth.auth_manager_from_config(hass, [], [])
 | 
						|
    user = MockUser().add_to_auth_manager(manager)
 | 
						|
 | 
						|
    with pytest.raises(ValueError):
 | 
						|
        await manager.async_create_refresh_token(
 | 
						|
            user, token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
 | 
						|
        )
 | 
						|
 | 
						|
    token = await manager.async_create_refresh_token(
 | 
						|
        user,
 | 
						|
        client_name="GPS LOGGER",
 | 
						|
        client_icon="mdi:home",
 | 
						|
        token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
 | 
						|
    )
 | 
						|
    assert token is not None
 | 
						|
    assert token.client_id is None
 | 
						|
    assert token.client_name == "GPS LOGGER"
 | 
						|
    assert token.client_icon == "mdi:home"
 | 
						|
    assert token.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
 | 
						|
 | 
						|
 | 
						|
async def test_refresh_token_provider_validation(mock_hass):
 | 
						|
    """Test that creating access token from refresh token checks with provider."""
 | 
						|
    manager = await auth.auth_manager_from_config(
 | 
						|
        mock_hass,
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "users": [{"username": "test-user", "password": "test-pass"}],
 | 
						|
            }
 | 
						|
        ],
 | 
						|
        [],
 | 
						|
    )
 | 
						|
 | 
						|
    credential = auth_models.Credentials(
 | 
						|
        id="mock-credential-id",
 | 
						|
        auth_provider_type="insecure_example",
 | 
						|
        auth_provider_id=None,
 | 
						|
        data={"username": "test-user"},
 | 
						|
        is_new=False,
 | 
						|
    )
 | 
						|
 | 
						|
    user = MockUser().add_to_auth_manager(manager)
 | 
						|
    user.credentials.append(credential)
 | 
						|
    refresh_token = await manager.async_create_refresh_token(
 | 
						|
        user, CLIENT_ID, credential=credential
 | 
						|
    )
 | 
						|
    ip = "127.0.0.1"
 | 
						|
 | 
						|
    assert manager.async_create_access_token(refresh_token, ip) is not None
 | 
						|
 | 
						|
    with patch(
 | 
						|
        "homeassistant.auth.providers.insecure_example.ExampleAuthProvider.async_validate_refresh_token",
 | 
						|
        side_effect=InvalidAuthError("Invalid access"),
 | 
						|
    ) as call, pytest.raises(InvalidAuthError):
 | 
						|
        manager.async_create_access_token(refresh_token, ip)
 | 
						|
 | 
						|
    call.assert_called_with(refresh_token, ip)
 | 
						|
 | 
						|
 | 
						|
async def test_cannot_deactive_owner(mock_hass):
 | 
						|
    """Test that we cannot deactivate the owner."""
 | 
						|
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
 | 
						|
    owner = MockUser(is_owner=True).add_to_auth_manager(manager)
 | 
						|
 | 
						|
    with pytest.raises(ValueError):
 | 
						|
        await manager.async_deactivate_user(owner)
 | 
						|
 | 
						|
 | 
						|
async def test_remove_refresh_token(mock_hass):
 | 
						|
    """Test that we can remove a refresh token."""
 | 
						|
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
 | 
						|
    user = MockUser().add_to_auth_manager(manager)
 | 
						|
    refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
 | 
						|
    access_token = manager.async_create_access_token(refresh_token)
 | 
						|
 | 
						|
    await manager.async_remove_refresh_token(refresh_token)
 | 
						|
 | 
						|
    assert await manager.async_get_refresh_token(refresh_token.id) is None
 | 
						|
    assert await manager.async_validate_access_token(access_token) is None
 | 
						|
 | 
						|
 | 
						|
async def test_register_revoke_token_callback(mock_hass):
 | 
						|
    """Test that a registered revoke token callback is called."""
 | 
						|
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
 | 
						|
    user = MockUser().add_to_auth_manager(manager)
 | 
						|
    refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
 | 
						|
 | 
						|
    called = False
 | 
						|
 | 
						|
    def cb():
 | 
						|
        nonlocal called
 | 
						|
        called = True
 | 
						|
 | 
						|
    manager.async_register_revoke_token_callback(refresh_token.id, cb)
 | 
						|
    await manager.async_remove_refresh_token(refresh_token)
 | 
						|
    assert called
 | 
						|
 | 
						|
 | 
						|
async def test_unregister_revoke_token_callback(mock_hass):
 | 
						|
    """Test that a revoke token callback can be unregistered."""
 | 
						|
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
 | 
						|
    user = MockUser().add_to_auth_manager(manager)
 | 
						|
    refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
 | 
						|
 | 
						|
    called = False
 | 
						|
 | 
						|
    def cb():
 | 
						|
        nonlocal called
 | 
						|
        called = True
 | 
						|
 | 
						|
    unregister = manager.async_register_revoke_token_callback(refresh_token.id, cb)
 | 
						|
    unregister()
 | 
						|
 | 
						|
    await manager.async_remove_refresh_token(refresh_token)
 | 
						|
    assert not called
 | 
						|
 | 
						|
 | 
						|
async def test_create_access_token(mock_hass):
 | 
						|
    """Test normal refresh_token's jwt_key keep same after used."""
 | 
						|
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
 | 
						|
    user = MockUser().add_to_auth_manager(manager)
 | 
						|
    refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
 | 
						|
    assert refresh_token.token_type == auth_models.TOKEN_TYPE_NORMAL
 | 
						|
    jwt_key = refresh_token.jwt_key
 | 
						|
    access_token = manager.async_create_access_token(refresh_token)
 | 
						|
    assert access_token is not None
 | 
						|
    assert refresh_token.jwt_key == jwt_key
 | 
						|
    jwt_payload = jwt.decode(access_token, jwt_key, algorithms=["HS256"])
 | 
						|
    assert jwt_payload["iss"] == refresh_token.id
 | 
						|
    assert (
 | 
						|
        jwt_payload["exp"] - jwt_payload["iat"] == timedelta(minutes=30).total_seconds()
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
async def test_create_long_lived_access_token(mock_hass):
 | 
						|
    """Test refresh_token's jwt_key changed for long-lived access token."""
 | 
						|
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
 | 
						|
    user = MockUser().add_to_auth_manager(manager)
 | 
						|
    refresh_token = await manager.async_create_refresh_token(
 | 
						|
        user,
 | 
						|
        client_name="GPS Logger",
 | 
						|
        token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
 | 
						|
        access_token_expiration=timedelta(days=300),
 | 
						|
    )
 | 
						|
    assert refresh_token.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
 | 
						|
    access_token = manager.async_create_access_token(refresh_token)
 | 
						|
    jwt_payload = jwt.decode(access_token, refresh_token.jwt_key, algorithms=["HS256"])
 | 
						|
    assert jwt_payload["iss"] == refresh_token.id
 | 
						|
    assert (
 | 
						|
        jwt_payload["exp"] - jwt_payload["iat"] == timedelta(days=300).total_seconds()
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
async def test_one_long_lived_access_token_per_refresh_token(mock_hass):
 | 
						|
    """Test one refresh_token can only have one long-lived access token."""
 | 
						|
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
 | 
						|
    user = MockUser().add_to_auth_manager(manager)
 | 
						|
    refresh_token = await manager.async_create_refresh_token(
 | 
						|
        user,
 | 
						|
        client_name="GPS Logger",
 | 
						|
        token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
 | 
						|
        access_token_expiration=timedelta(days=3000),
 | 
						|
    )
 | 
						|
    assert refresh_token.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
 | 
						|
    access_token = manager.async_create_access_token(refresh_token)
 | 
						|
    jwt_key = refresh_token.jwt_key
 | 
						|
 | 
						|
    rt = await manager.async_validate_access_token(access_token)
 | 
						|
    assert rt.id == refresh_token.id
 | 
						|
 | 
						|
    with pytest.raises(ValueError):
 | 
						|
        await manager.async_create_refresh_token(
 | 
						|
            user,
 | 
						|
            client_name="GPS Logger",
 | 
						|
            token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
 | 
						|
            access_token_expiration=timedelta(days=3000),
 | 
						|
        )
 | 
						|
 | 
						|
    await manager.async_remove_refresh_token(refresh_token)
 | 
						|
    assert refresh_token.id not in user.refresh_tokens
 | 
						|
    rt = await manager.async_validate_access_token(access_token)
 | 
						|
    assert rt is None, "Previous issued access token has been invoked"
 | 
						|
 | 
						|
    refresh_token_2 = await manager.async_create_refresh_token(
 | 
						|
        user,
 | 
						|
        client_name="GPS Logger",
 | 
						|
        token_type=auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN,
 | 
						|
        access_token_expiration=timedelta(days=3000),
 | 
						|
    )
 | 
						|
    assert refresh_token_2.id != refresh_token.id
 | 
						|
    assert refresh_token_2.token_type == auth_models.TOKEN_TYPE_LONG_LIVED_ACCESS_TOKEN
 | 
						|
    access_token_2 = manager.async_create_access_token(refresh_token_2)
 | 
						|
    jwt_key_2 = refresh_token_2.jwt_key
 | 
						|
 | 
						|
    assert access_token != access_token_2
 | 
						|
    assert jwt_key != jwt_key_2
 | 
						|
 | 
						|
    rt = await manager.async_validate_access_token(access_token_2)
 | 
						|
    jwt_payload = jwt.decode(access_token_2, rt.jwt_key, algorithms=["HS256"])
 | 
						|
    assert jwt_payload["iss"] == refresh_token_2.id
 | 
						|
    assert (
 | 
						|
        jwt_payload["exp"] - jwt_payload["iat"] == timedelta(days=3000).total_seconds()
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
async def test_login_with_auth_module(mock_hass):
 | 
						|
    """Test login as existing user with auth module."""
 | 
						|
    manager = await auth.auth_manager_from_config(
 | 
						|
        mock_hass,
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "users": [
 | 
						|
                    {
 | 
						|
                        "username": "test-user",
 | 
						|
                        "password": "test-pass",
 | 
						|
                        "name": "Test Name",
 | 
						|
                    }
 | 
						|
                ],
 | 
						|
            }
 | 
						|
        ],
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "data": [{"user_id": "mock-user", "pin": "test-pin"}],
 | 
						|
            }
 | 
						|
        ],
 | 
						|
    )
 | 
						|
    mock_hass.auth = manager
 | 
						|
    ensure_auth_manager_loaded(manager)
 | 
						|
 | 
						|
    # Add fake user with credentials for example auth provider.
 | 
						|
    user = MockUser(
 | 
						|
        id="mock-user", is_owner=False, is_active=False, name="Paulus"
 | 
						|
    ).add_to_auth_manager(manager)
 | 
						|
    user.credentials.append(
 | 
						|
        auth_models.Credentials(
 | 
						|
            id="mock-id",
 | 
						|
            auth_provider_type="insecure_example",
 | 
						|
            auth_provider_id=None,
 | 
						|
            data={"username": "test-user"},
 | 
						|
            is_new=False,
 | 
						|
        )
 | 
						|
    )
 | 
						|
 | 
						|
    step = await manager.login_flow.async_init(("insecure_example", None))
 | 
						|
    assert step["type"] == data_entry_flow.RESULT_TYPE_FORM
 | 
						|
 | 
						|
    step = await manager.login_flow.async_configure(
 | 
						|
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
 | 
						|
    )
 | 
						|
 | 
						|
    # After auth_provider validated, request auth module input form
 | 
						|
    assert step["type"] == data_entry_flow.RESULT_TYPE_FORM
 | 
						|
    assert step["step_id"] == "mfa"
 | 
						|
 | 
						|
    step = await manager.login_flow.async_configure(
 | 
						|
        step["flow_id"], {"pin": "invalid-pin"}
 | 
						|
    )
 | 
						|
 | 
						|
    # Invalid code error
 | 
						|
    assert step["type"] == data_entry_flow.RESULT_TYPE_FORM
 | 
						|
    assert step["step_id"] == "mfa"
 | 
						|
    assert step["errors"] == {"base": "invalid_code"}
 | 
						|
 | 
						|
    step = await manager.login_flow.async_configure(
 | 
						|
        step["flow_id"], {"pin": "test-pin"}
 | 
						|
    )
 | 
						|
 | 
						|
    # Finally passed, get credential
 | 
						|
    assert step["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
 | 
						|
    assert step["result"]
 | 
						|
    assert step["result"].id == "mock-id"
 | 
						|
 | 
						|
 | 
						|
async def test_login_with_multi_auth_module(mock_hass):
 | 
						|
    """Test login as existing user with multiple auth modules."""
 | 
						|
    manager = await auth.auth_manager_from_config(
 | 
						|
        mock_hass,
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "users": [
 | 
						|
                    {
 | 
						|
                        "username": "test-user",
 | 
						|
                        "password": "test-pass",
 | 
						|
                        "name": "Test Name",
 | 
						|
                    }
 | 
						|
                ],
 | 
						|
            }
 | 
						|
        ],
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "data": [{"user_id": "mock-user", "pin": "test-pin"}],
 | 
						|
            },
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "id": "module2",
 | 
						|
                "data": [{"user_id": "mock-user", "pin": "test-pin2"}],
 | 
						|
            },
 | 
						|
        ],
 | 
						|
    )
 | 
						|
    mock_hass.auth = manager
 | 
						|
    ensure_auth_manager_loaded(manager)
 | 
						|
 | 
						|
    # Add fake user with credentials for example auth provider.
 | 
						|
    user = MockUser(
 | 
						|
        id="mock-user", is_owner=False, is_active=False, name="Paulus"
 | 
						|
    ).add_to_auth_manager(manager)
 | 
						|
    user.credentials.append(
 | 
						|
        auth_models.Credentials(
 | 
						|
            id="mock-id",
 | 
						|
            auth_provider_type="insecure_example",
 | 
						|
            auth_provider_id=None,
 | 
						|
            data={"username": "test-user"},
 | 
						|
            is_new=False,
 | 
						|
        )
 | 
						|
    )
 | 
						|
 | 
						|
    step = await manager.login_flow.async_init(("insecure_example", None))
 | 
						|
    assert step["type"] == data_entry_flow.RESULT_TYPE_FORM
 | 
						|
 | 
						|
    step = await manager.login_flow.async_configure(
 | 
						|
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
 | 
						|
    )
 | 
						|
 | 
						|
    # After auth_provider validated, request select auth module
 | 
						|
    assert step["type"] == data_entry_flow.RESULT_TYPE_FORM
 | 
						|
    assert step["step_id"] == "select_mfa_module"
 | 
						|
 | 
						|
    step = await manager.login_flow.async_configure(
 | 
						|
        step["flow_id"], {"multi_factor_auth_module": "module2"}
 | 
						|
    )
 | 
						|
 | 
						|
    assert step["type"] == data_entry_flow.RESULT_TYPE_FORM
 | 
						|
    assert step["step_id"] == "mfa"
 | 
						|
 | 
						|
    step = await manager.login_flow.async_configure(
 | 
						|
        step["flow_id"], {"pin": "test-pin2"}
 | 
						|
    )
 | 
						|
 | 
						|
    # Finally passed, get credential
 | 
						|
    assert step["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
 | 
						|
    assert step["result"]
 | 
						|
    assert step["result"].id == "mock-id"
 | 
						|
 | 
						|
 | 
						|
async def test_auth_module_expired_session(mock_hass):
 | 
						|
    """Test login as existing user."""
 | 
						|
    manager = await auth.auth_manager_from_config(
 | 
						|
        mock_hass,
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "users": [
 | 
						|
                    {
 | 
						|
                        "username": "test-user",
 | 
						|
                        "password": "test-pass",
 | 
						|
                        "name": "Test Name",
 | 
						|
                    }
 | 
						|
                ],
 | 
						|
            }
 | 
						|
        ],
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "data": [{"user_id": "mock-user", "pin": "test-pin"}],
 | 
						|
            }
 | 
						|
        ],
 | 
						|
    )
 | 
						|
    mock_hass.auth = manager
 | 
						|
    ensure_auth_manager_loaded(manager)
 | 
						|
 | 
						|
    # Add fake user with credentials for example auth provider.
 | 
						|
    user = MockUser(
 | 
						|
        id="mock-user", is_owner=False, is_active=False, name="Paulus"
 | 
						|
    ).add_to_auth_manager(manager)
 | 
						|
    user.credentials.append(
 | 
						|
        auth_models.Credentials(
 | 
						|
            id="mock-id",
 | 
						|
            auth_provider_type="insecure_example",
 | 
						|
            auth_provider_id=None,
 | 
						|
            data={"username": "test-user"},
 | 
						|
            is_new=False,
 | 
						|
        )
 | 
						|
    )
 | 
						|
 | 
						|
    step = await manager.login_flow.async_init(("insecure_example", None))
 | 
						|
    assert step["type"] == data_entry_flow.RESULT_TYPE_FORM
 | 
						|
 | 
						|
    step = await manager.login_flow.async_configure(
 | 
						|
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
 | 
						|
    )
 | 
						|
 | 
						|
    assert step["type"] == data_entry_flow.RESULT_TYPE_FORM
 | 
						|
    assert step["step_id"] == "mfa"
 | 
						|
 | 
						|
    with patch(
 | 
						|
        "homeassistant.util.dt.utcnow",
 | 
						|
        return_value=dt_util.utcnow() + MFA_SESSION_EXPIRATION,
 | 
						|
    ):
 | 
						|
        step = await manager.login_flow.async_configure(
 | 
						|
            step["flow_id"], {"pin": "test-pin"}
 | 
						|
        )
 | 
						|
        # login flow abort due session timeout
 | 
						|
        assert step["type"] == data_entry_flow.RESULT_TYPE_ABORT
 | 
						|
        assert step["reason"] == "login_expired"
 | 
						|
 | 
						|
 | 
						|
async def test_enable_mfa_for_user(hass, hass_storage):
 | 
						|
    """Test enable mfa module for user."""
 | 
						|
    manager = await auth.auth_manager_from_config(
 | 
						|
        hass,
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "users": [{"username": "test-user", "password": "test-pass"}],
 | 
						|
            }
 | 
						|
        ],
 | 
						|
        [{"type": "insecure_example", "data": []}],
 | 
						|
    )
 | 
						|
 | 
						|
    step = await manager.login_flow.async_init(("insecure_example", None))
 | 
						|
    step = await manager.login_flow.async_configure(
 | 
						|
        step["flow_id"], {"username": "test-user", "password": "test-pass"}
 | 
						|
    )
 | 
						|
    credential = step["result"]
 | 
						|
    user = await manager.async_get_or_create_user(credential)
 | 
						|
    assert user is not None
 | 
						|
 | 
						|
    # new user don't have mfa enabled
 | 
						|
    modules = await manager.async_get_enabled_mfa(user)
 | 
						|
    assert len(modules) == 0
 | 
						|
 | 
						|
    module = manager.get_auth_mfa_module("insecure_example")
 | 
						|
    # mfa module don't have data
 | 
						|
    assert bool(module._data) is False
 | 
						|
 | 
						|
    # test enable mfa for user
 | 
						|
    await manager.async_enable_user_mfa(user, "insecure_example", {"pin": "test-pin"})
 | 
						|
    assert len(module._data) == 1
 | 
						|
    assert module._data[0] == {"user_id": user.id, "pin": "test-pin"}
 | 
						|
 | 
						|
    # test get enabled mfa
 | 
						|
    modules = await manager.async_get_enabled_mfa(user)
 | 
						|
    assert len(modules) == 1
 | 
						|
    assert "insecure_example" in modules
 | 
						|
 | 
						|
    # re-enable mfa for user will override
 | 
						|
    await manager.async_enable_user_mfa(
 | 
						|
        user, "insecure_example", {"pin": "test-pin-new"}
 | 
						|
    )
 | 
						|
    assert len(module._data) == 1
 | 
						|
    assert module._data[0] == {"user_id": user.id, "pin": "test-pin-new"}
 | 
						|
    modules = await manager.async_get_enabled_mfa(user)
 | 
						|
    assert len(modules) == 1
 | 
						|
    assert "insecure_example" in modules
 | 
						|
 | 
						|
    # system user cannot enable mfa
 | 
						|
    system_user = await manager.async_create_system_user("system-user")
 | 
						|
    with pytest.raises(ValueError):
 | 
						|
        await manager.async_enable_user_mfa(
 | 
						|
            system_user, "insecure_example", {"pin": "test-pin"}
 | 
						|
        )
 | 
						|
    assert len(module._data) == 1
 | 
						|
    modules = await manager.async_get_enabled_mfa(system_user)
 | 
						|
    assert len(modules) == 0
 | 
						|
 | 
						|
    # disable mfa for user
 | 
						|
    await manager.async_disable_user_mfa(user, "insecure_example")
 | 
						|
    assert bool(module._data) is False
 | 
						|
 | 
						|
    # test get enabled mfa
 | 
						|
    modules = await manager.async_get_enabled_mfa(user)
 | 
						|
    assert len(modules) == 0
 | 
						|
 | 
						|
    # disable mfa for user don't enabled just silent fail
 | 
						|
    await manager.async_disable_user_mfa(user, "insecure_example")
 | 
						|
 | 
						|
 | 
						|
async def test_async_remove_user(hass):
 | 
						|
    """Test removing a user."""
 | 
						|
    events = async_capture_events(hass, "user_removed")
 | 
						|
    manager = await auth.auth_manager_from_config(
 | 
						|
        hass,
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "users": [
 | 
						|
                    {
 | 
						|
                        "username": "test-user",
 | 
						|
                        "password": "test-pass",
 | 
						|
                        "name": "Test Name",
 | 
						|
                    }
 | 
						|
                ],
 | 
						|
            }
 | 
						|
        ],
 | 
						|
        [],
 | 
						|
    )
 | 
						|
    hass.auth = manager
 | 
						|
    ensure_auth_manager_loaded(manager)
 | 
						|
 | 
						|
    # Add fake user with credentials for example auth provider.
 | 
						|
    user = MockUser(
 | 
						|
        id="mock-user", is_owner=False, is_active=False, name="Paulus"
 | 
						|
    ).add_to_auth_manager(manager)
 | 
						|
    user.credentials.append(
 | 
						|
        auth_models.Credentials(
 | 
						|
            id="mock-id",
 | 
						|
            auth_provider_type="insecure_example",
 | 
						|
            auth_provider_id=None,
 | 
						|
            data={"username": "test-user"},
 | 
						|
            is_new=False,
 | 
						|
        )
 | 
						|
    )
 | 
						|
    assert len(user.credentials) == 1
 | 
						|
 | 
						|
    await hass.auth.async_remove_user(user)
 | 
						|
 | 
						|
    assert len(await manager.async_get_users()) == 0
 | 
						|
    assert len(user.credentials) == 0
 | 
						|
 | 
						|
    await hass.async_block_till_done()
 | 
						|
    assert len(events) == 1
 | 
						|
    assert events[0].data["user_id"] == user.id
 | 
						|
 | 
						|
 | 
						|
async def test_async_remove_user_fail_if_remove_credential_fails(
 | 
						|
    hass, hass_admin_user, hass_admin_credential
 | 
						|
):
 | 
						|
    """Test removing a user."""
 | 
						|
    await hass.auth.async_link_user(hass_admin_user, hass_admin_credential)
 | 
						|
 | 
						|
    with patch.object(
 | 
						|
        hass.auth, "async_remove_credentials", side_effect=ValueError
 | 
						|
    ), pytest.raises(ValueError):
 | 
						|
        await hass.auth.async_remove_user(hass_admin_user)
 | 
						|
 | 
						|
 | 
						|
async def test_new_users(mock_hass):
 | 
						|
    """Test newly created users."""
 | 
						|
    manager = await auth.auth_manager_from_config(
 | 
						|
        mock_hass,
 | 
						|
        [
 | 
						|
            {
 | 
						|
                "type": "insecure_example",
 | 
						|
                "users": [
 | 
						|
                    {
 | 
						|
                        "username": "test-user",
 | 
						|
                        "password": "test-pass",
 | 
						|
                        "name": "Test Name",
 | 
						|
                    },
 | 
						|
                    {
 | 
						|
                        "username": "test-user-2",
 | 
						|
                        "password": "test-pass",
 | 
						|
                        "name": "Test Name",
 | 
						|
                    },
 | 
						|
                    {
 | 
						|
                        "username": "test-user-3",
 | 
						|
                        "password": "test-pass",
 | 
						|
                        "name": "Test Name",
 | 
						|
                    },
 | 
						|
                ],
 | 
						|
            }
 | 
						|
        ],
 | 
						|
        [],
 | 
						|
    )
 | 
						|
    ensure_auth_manager_loaded(manager)
 | 
						|
 | 
						|
    user = await manager.async_create_user("Hello")
 | 
						|
    # first user in the system is owner and admin
 | 
						|
    assert user.is_owner
 | 
						|
    assert user.is_admin
 | 
						|
    assert not user.local_only
 | 
						|
    assert user.groups == []
 | 
						|
 | 
						|
    user = await manager.async_create_user("Hello 2")
 | 
						|
    assert not user.is_admin
 | 
						|
    assert user.groups == []
 | 
						|
 | 
						|
    user = await manager.async_create_user(
 | 
						|
        "Hello 3", group_ids=["system-admin"], local_only=True
 | 
						|
    )
 | 
						|
    assert user.is_admin
 | 
						|
    assert user.groups[0].id == "system-admin"
 | 
						|
    assert user.local_only
 | 
						|
 | 
						|
    user_cred = await manager.async_get_or_create_user(
 | 
						|
        auth_models.Credentials(
 | 
						|
            id="mock-id",
 | 
						|
            auth_provider_type="insecure_example",
 | 
						|
            auth_provider_id=None,
 | 
						|
            data={"username": "test-user"},
 | 
						|
            is_new=True,
 | 
						|
        )
 | 
						|
    )
 | 
						|
    assert user_cred.is_admin
 | 
						|
 | 
						|
 | 
						|
async def test_rename_does_not_change_refresh_token(mock_hass):
 | 
						|
    """Test that we can rename without changing refresh token."""
 | 
						|
    manager = await auth.auth_manager_from_config(mock_hass, [], [])
 | 
						|
    user = MockUser().add_to_auth_manager(manager)
 | 
						|
    await manager.async_create_refresh_token(user, CLIENT_ID)
 | 
						|
 | 
						|
    assert len(list(user.refresh_tokens.values())) == 1
 | 
						|
    token_before = list(user.refresh_tokens.values())[0]
 | 
						|
 | 
						|
    await manager.async_update_user(user, name="new name")
 | 
						|
    assert user.name == "new name"
 | 
						|
 | 
						|
    assert len(list(user.refresh_tokens.values())) == 1
 | 
						|
    token_after = list(user.refresh_tokens.values())[0]
 | 
						|
 | 
						|
    assert token_before == token_after
 |