"""Test the Home Assistant local auth provider.""" import asyncio from typing import Any from unittest.mock import Mock, patch import pytest import voluptuous as vol from homeassistant import data_entry_flow from homeassistant.auth import auth_manager_from_config, auth_store from homeassistant.auth.providers import ( auth_provider_from_config, homeassistant as hass_auth, ) from homeassistant.core import HomeAssistant from homeassistant.helpers import issue_registry as ir from homeassistant.setup import async_setup_component @pytest.fixture def data(hass: HomeAssistant) -> hass_auth.Data: """Create a loaded data class.""" data = hass_auth.Data(hass) hass.loop.run_until_complete(data.async_load()) return data @pytest.fixture def legacy_data(hass: HomeAssistant) -> hass_auth.Data: """Create a loaded legacy data class.""" data = hass_auth.Data(hass) hass.loop.run_until_complete(data.async_load()) data.is_legacy = True return data @pytest.fixture async def load_auth_component(hass: HomeAssistant) -> None: """Load the auth component for translations.""" await async_setup_component(hass, "auth", {}) async def test_validating_password_invalid_user(data: hass_auth.Data) -> None: """Test validating an invalid user.""" with pytest.raises(hass_auth.InvalidAuth): data.validate_login("non-existing", "pw") async def test_not_allow_set_id() -> None: """Test we are not allowed to set an ID in config.""" hass = Mock() hass.data = {} with pytest.raises(vol.Invalid): await auth_provider_from_config( hass, None, {"type": "homeassistant", "id": "invalid"} ) async def test_new_users_populate_values( hass: HomeAssistant, data: hass_auth.Data ) -> None: """Test that we populate data for new users.""" data.add_auth("hello", "test-pass") await data.async_save() manager = await auth_manager_from_config(hass, [{"type": "homeassistant"}], []) provider = manager.auth_providers[0] credentials = await provider.async_get_or_create_credentials({"username": "hello"}) user = await manager.async_get_or_create_user(credentials) assert user.name == "hello" assert user.is_active async def test_changing_password_raises_invalid_user(data: hass_auth.Data) -> None: """Test that changing password raises invalid user.""" with pytest.raises(hass_auth.InvalidUser): data.change_password("non-existing", "pw") # Modern mode async def test_adding_user(data: hass_auth.Data) -> None: """Test adding a user.""" data.add_auth("test-user", "test-pass") data.validate_login(" test-user ", "test-pass") @pytest.mark.parametrize("username", ["test-user ", "TEST-USER"]) @pytest.mark.usefixtures("load_auth_component") def test_adding_user_not_normalized(data: hass_auth.Data, username: str) -> None: """Test adding a user.""" with pytest.raises( hass_auth.InvalidUsername, match=f'Username "{username}" is not normalized' ): data.add_auth(username, "test-pass") @pytest.mark.usefixtures("load_auth_component") def test_adding_user_duplicate_username(data: hass_auth.Data) -> None: """Test adding a user with duplicate username.""" data.add_auth("test-user", "test-pass") with pytest.raises( hass_auth.InvalidUsername, match='Username "test-user" already exists' ): data.add_auth("test-user", "other-pass") async def test_validating_password_invalid_password(data: hass_auth.Data) -> None: """Test validating an invalid password.""" data.add_auth("test-user", "test-pass") with pytest.raises(hass_auth.InvalidAuth): data.validate_login(" test-user ", "invalid-pass") with pytest.raises(hass_auth.InvalidAuth): data.validate_login("test-user", "test-pass ") with pytest.raises(hass_auth.InvalidAuth): data.validate_login("test-user", "Test-pass") async def test_changing_password(data: hass_auth.Data) -> None: """Test adding a user.""" data.add_auth("test-user", "test-pass") data.change_password("TEST-USER ", "new-pass") with pytest.raises(hass_auth.InvalidAuth): data.validate_login("test-user", "test-pass") data.validate_login("test-UsEr", "new-pass") async def test_login_flow_validates(data: hass_auth.Data, hass: HomeAssistant) -> None: """Test login flow.""" data.add_auth("test-user", "test-pass") await data.async_save() provider = hass_auth.HassAuthProvider( hass, auth_store.AuthStore(hass), {"type": "homeassistant"} ) flow = await provider.async_login_flow({}) result = await flow.async_step_init() assert result["type"] == data_entry_flow.FlowResultType.FORM result = await flow.async_step_init( {"username": "incorrect-user", "password": "test-pass"} ) assert result["type"] == data_entry_flow.FlowResultType.FORM assert result["errors"]["base"] == "invalid_auth" result = await flow.async_step_init( {"username": "TEST-user ", "password": "incorrect-pass"} ) assert result["type"] == data_entry_flow.FlowResultType.FORM assert result["errors"]["base"] == "invalid_auth" result = await flow.async_step_init( {"username": "test-USER", "password": "test-pass"} ) assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY assert result["data"]["username"] == "test-USER" async def test_saving_loading(data: hass_auth.Data, hass: HomeAssistant) -> None: """Test saving and loading JSON.""" data.add_auth("test-user", "test-pass") data.add_auth("second-user", "second-pass") await data.async_save() data = hass_auth.Data(hass) await data.async_load() data.validate_login("test-user ", "test-pass") data.validate_login("second-user ", "second-pass") async def test_get_or_create_credentials( hass: HomeAssistant, data: hass_auth.Data ) -> None: """Test that we can get or create credentials.""" manager = await auth_manager_from_config(hass, [{"type": "homeassistant"}], []) provider = manager.auth_providers[0] provider.data = data credentials1 = await provider.async_get_or_create_credentials({"username": "hello"}) with patch.object(provider, "async_credentials", return_value=[credentials1]): credentials2 = await provider.async_get_or_create_credentials( {"username": "hello "} ) assert credentials1 is credentials2 # Legacy mode async def test_legacy_adding_user(legacy_data: hass_auth.Data) -> None: """Test in legacy mode adding a user.""" legacy_data.add_auth("test-user", "test-pass") legacy_data.validate_login("test-user", "test-pass") async def test_legacy_validating_password_invalid_password( legacy_data: hass_auth.Data, ) -> None: """Test in legacy mode validating an invalid password.""" legacy_data.add_auth("test-user", "test-pass") with pytest.raises(hass_auth.InvalidAuth): legacy_data.validate_login("test-user", "invalid-pass") async def test_legacy_changing_password(legacy_data: hass_auth.Data) -> None: """Test in legacy mode adding a user.""" user = "test-user" legacy_data.add_auth(user, "test-pass") legacy_data.change_password(user, "new-pass") with pytest.raises(hass_auth.InvalidAuth): legacy_data.validate_login(user, "test-pass") legacy_data.validate_login(user, "new-pass") async def test_legacy_changing_password_raises_invalid_user( legacy_data: hass_auth.Data, ) -> None: """Test in legacy mode that we initialize an empty config.""" with pytest.raises(hass_auth.InvalidUser): legacy_data.change_password("non-existing", "pw") async def test_legacy_login_flow_validates( legacy_data: hass_auth.Data, hass: HomeAssistant ) -> None: """Test in legacy mode login flow.""" legacy_data.add_auth("test-user", "test-pass") await legacy_data.async_save() provider = hass_auth.HassAuthProvider( hass, auth_store.AuthStore(hass), {"type": "homeassistant"} ) flow = await provider.async_login_flow({}) result = await flow.async_step_init() assert result["type"] == data_entry_flow.FlowResultType.FORM result = await flow.async_step_init( {"username": "incorrect-user", "password": "test-pass"} ) assert result["type"] == data_entry_flow.FlowResultType.FORM assert result["errors"]["base"] == "invalid_auth" result = await flow.async_step_init( {"username": "test-user", "password": "incorrect-pass"} ) assert result["type"] == data_entry_flow.FlowResultType.FORM assert result["errors"]["base"] == "invalid_auth" result = await flow.async_step_init( {"username": "test-user", "password": "test-pass"} ) assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY assert result["data"]["username"] == "test-user" async def test_legacy_saving_loading( legacy_data: hass_auth.Data, hass: HomeAssistant ) -> None: """Test in legacy mode saving and loading JSON.""" legacy_data.add_auth("test-user", "test-pass") legacy_data.add_auth("second-user", "second-pass") await legacy_data.async_save() legacy_data = hass_auth.Data(hass) await legacy_data.async_load() legacy_data.is_legacy = True legacy_data.validate_login("test-user", "test-pass") legacy_data.validate_login("second-user", "second-pass") with pytest.raises(hass_auth.InvalidAuth): legacy_data.validate_login("test-user ", "test-pass") async def test_legacy_get_or_create_credentials( hass: HomeAssistant, legacy_data: hass_auth.Data ) -> None: """Test in legacy mode that we can get or create credentials.""" manager = await auth_manager_from_config(hass, [{"type": "homeassistant"}], []) provider = manager.auth_providers[0] provider.data = legacy_data credentials1 = await provider.async_get_or_create_credentials({"username": "hello"}) with patch.object(provider, "async_credentials", return_value=[credentials1]): credentials2 = await provider.async_get_or_create_credentials( {"username": "hello"} ) assert credentials1 is credentials2 with patch.object(provider, "async_credentials", return_value=[credentials1]): credentials3 = await provider.async_get_or_create_credentials( {"username": "hello "} ) assert credentials1 is not credentials3 async def test_race_condition_in_data_loading(hass: HomeAssistant) -> None: """Test race condition in the hass_auth.Data loading. Ref issue: https://github.com/home-assistant/core/issues/21569 """ counter = 0 async def mock_load(_): """Mock of homeassistant.helpers.storage.Store.async_load.""" nonlocal counter counter += 1 await asyncio.sleep(0) provider = hass_auth.HassAuthProvider( hass, auth_store.AuthStore(hass), {"type": "homeassistant"} ) with patch("homeassistant.helpers.storage.Store.async_load", new=mock_load): task1 = provider.async_validate_login("user", "pass") task2 = provider.async_validate_login("user", "pass") results = await asyncio.gather(task1, task2, return_exceptions=True) assert counter == 1 assert isinstance(results[0], hass_auth.InvalidAuth) # results[1] will be a TypeError if race condition occurred assert isinstance(results[1], hass_auth.InvalidAuth) def test_change_username(data: hass_auth.Data) -> None: """Test changing username.""" data.add_auth("test-user", "test-pass") users = data.users assert len(users) == 1 assert users[0]["username"] == "test-user" data.change_username("test-user", "new-user") users = data.users assert len(users) == 1 assert users[0]["username"] == "new-user" @pytest.mark.parametrize("username", ["test-user ", "TEST-USER"]) def test_change_username_legacy(legacy_data: hass_auth.Data, username: str) -> None: """Test changing username.""" # Cannot use add_auth as it normalizes username legacy_data.users.append( { "username": username, "password": legacy_data.hash_password("test-pass", True).decode(), } ) users = legacy_data.users assert len(users) == 1 assert users[0]["username"] == username legacy_data.change_username(username, "test-user") users = legacy_data.users assert len(users) == 1 assert users[0]["username"] == "test-user" def test_change_username_invalid_user(data: hass_auth.Data) -> None: """Test changing username raises on invalid user.""" data.add_auth("test-user", "test-pass") users = data.users assert len(users) == 1 assert users[0]["username"] == "test-user" with pytest.raises(hass_auth.InvalidUser): data.change_username("non-existing", "new-user") users = data.users assert len(users) == 1 assert users[0]["username"] == "test-user" @pytest.mark.usefixtures("load_auth_component") async def test_change_username_not_normalized( data: hass_auth.Data, hass: HomeAssistant ) -> None: """Test changing username raises on not normalized username.""" data.add_auth("test-user", "test-pass") with pytest.raises( hass_auth.InvalidUsername, match='Username "TEST-user " is not normalized' ): data.change_username("test-user", "TEST-user ") @pytest.mark.parametrize( ("usernames_in_storage", "usernames_in_repair"), [ (["Uppercase"], '- "Uppercase"'), ([" leading"], '- " leading"'), (["trailing "], '- "trailing "'), (["Test", "test", "Fritz "], '- "Fritz "\n- "Test"'), ], ) async def test_create_repair_on_legacy_usernames( hass: HomeAssistant, hass_storage: dict[str, Any], issue_registry: ir.IssueRegistry, usernames_in_storage: list[str], usernames_in_repair: str, ) -> None: """Test that we create a repair issue for legacy usernames.""" assert not issue_registry.issues.get( ("auth", "homeassistant_provider_not_normalized_usernames") ), "Repair issue already exists" hass_storage[hass_auth.STORAGE_KEY] = { "version": 1, "minor_version": 1, "key": "auth_provider.homeassistant", "data": { "users": [ { "username": username, "password": "onlyherebecauseweneedapasswordstring", } for username in usernames_in_storage ] }, } data = hass_auth.Data(hass) await data.async_load() issue = issue_registry.issues.get( ("auth", "homeassistant_provider_not_normalized_usernames") ) assert issue, "Repair issue not created" assert issue.translation_placeholders == {"usernames": usernames_in_repair} async def test_delete_repair_after_fixing_usernames( hass: HomeAssistant, hass_storage: dict[str, Any], issue_registry: ir.IssueRegistry, ) -> None: """Test that the repair is deleted after fixing the usernames.""" hass_storage[hass_auth.STORAGE_KEY] = { "version": 1, "minor_version": 1, "key": "auth_provider.homeassistant", "data": { "users": [ { "username": "Test", "password": "onlyherebecauseweneedapasswordstring", }, { "username": "bla ", "password": "onlyherebecauseweneedapasswordstring", }, ] }, } data = hass_auth.Data(hass) await data.async_load() issue = issue_registry.issues.get( ("auth", "homeassistant_provider_not_normalized_usernames") ) assert issue, "Repair issue not created" assert issue.translation_placeholders == {"usernames": '- "Test"\n- "bla "'} data.change_username("Test", "test") issue = issue_registry.issues.get( ("auth", "homeassistant_provider_not_normalized_usernames") ) assert issue assert issue.translation_placeholders == {"usernames": '- "bla "'} data.change_username("bla ", "bla") assert not issue_registry.issues.get( ("auth", "homeassistant_provider_not_normalized_usernames") ), "Repair issue should be deleted"