"""Tests for the floor registry.""" from datetime import datetime from functools import partial import re from typing import Any from freezegun.api import FrozenDateTimeFactory import pytest from homeassistant.core import HomeAssistant from homeassistant.helpers import area_registry as ar, floor_registry as fr from homeassistant.util.dt import utcnow from tests.common import ANY, async_capture_events, flush_store async def test_list_floors(floor_registry: fr.FloorRegistry) -> None: """Make sure that we can read floors.""" floors = floor_registry.async_list_floors() assert len(list(floors)) == len(floor_registry.floors) @pytest.mark.usefixtures("freezer") async def test_create_floor( hass: HomeAssistant, floor_registry: fr.FloorRegistry, ) -> None: """Make sure that we can create floors.""" update_events = async_capture_events(hass, fr.EVENT_FLOOR_REGISTRY_UPDATED) floor = floor_registry.async_create( name="First floor", icon="mdi:home-floor-1", aliases={"first", "ground", "ground floor"}, level=1, ) assert floor == fr.FloorEntry( floor_id="first_floor", name="First floor", icon="mdi:home-floor-1", aliases={"first", "ground", "ground floor"}, level=1, created_at=utcnow(), modified_at=utcnow(), normalized_name=ANY, ) assert len(floor_registry.floors) == 1 await hass.async_block_till_done() assert len(update_events) == 1 assert update_events[0].data == { "action": "create", "floor_id": floor.floor_id, } async def test_create_floor_with_name_already_in_use( hass: HomeAssistant, floor_registry: fr.FloorRegistry ) -> None: """Make sure that we can't create a floor with a name already in use.""" update_events = async_capture_events(hass, fr.EVENT_FLOOR_REGISTRY_UPDATED) floor_registry.async_create("First floor") with pytest.raises( ValueError, match=re.escape("The name First floor (firstfloor) is already in use"), ): floor_registry.async_create("First floor") await hass.async_block_till_done() assert len(floor_registry.floors) == 1 assert len(update_events) == 1 async def test_create_floor_with_id_already_in_use( floor_registry: fr.FloorRegistry, ) -> None: """Make sure that we can't create an floor with an id already in use.""" floor = floor_registry.async_create("First") updated_floor = floor_registry.async_update(floor.floor_id, name="Second") assert updated_floor.floor_id == floor.floor_id another_floor = floor_registry.async_create("First") assert floor.floor_id != another_floor.floor_id assert another_floor.floor_id == "first_2" async def test_delete_floor( hass: HomeAssistant, floor_registry: fr.FloorRegistry ) -> None: """Make sure that we can delete a floor.""" update_events = async_capture_events(hass, fr.EVENT_FLOOR_REGISTRY_UPDATED) floor = floor_registry.async_create("First floor") assert len(floor_registry.floors) == 1 floor_registry.async_delete(floor.floor_id) assert not floor_registry.floors await hass.async_block_till_done() assert len(update_events) == 2 assert update_events[0].data == { "action": "create", "floor_id": floor.floor_id, } assert update_events[1].data == { "action": "remove", "floor_id": floor.floor_id, } async def test_delete_non_existing_floor(floor_registry: fr.FloorRegistry) -> None: """Make sure that we can't delete a floor that doesn't exist.""" floor_registry.async_create("First floor") with pytest.raises(KeyError): floor_registry.async_delete("") assert len(floor_registry.floors) == 1 async def test_update_floor( hass: HomeAssistant, floor_registry: fr.FloorRegistry, freezer: FrozenDateTimeFactory, ) -> None: """Make sure that we can update floors.""" created_at = datetime.fromisoformat("2024-01-01T01:00:00+00:00") freezer.move_to(created_at) update_events = async_capture_events(hass, fr.EVENT_FLOOR_REGISTRY_UPDATED) floor = floor_registry.async_create("First floor") assert floor == fr.FloorEntry( floor_id="first_floor", name="First floor", icon=None, aliases=set(), level=None, created_at=created_at, modified_at=created_at, normalized_name=ANY, ) assert len(floor_registry.floors) == 1 modified_at = datetime.fromisoformat("2024-02-01T01:00:00+00:00") freezer.move_to(modified_at) updated_floor = floor_registry.async_update( floor.floor_id, name="Second floor", icon="mdi:home-floor-2", aliases={"ground", "downstairs"}, level=2, ) assert updated_floor != floor assert updated_floor == fr.FloorEntry( floor_id="first_floor", name="Second floor", icon="mdi:home-floor-2", aliases={"ground", "downstairs"}, level=2, created_at=created_at, modified_at=modified_at, normalized_name=ANY, ) assert len(floor_registry.floors) == 1 await hass.async_block_till_done() assert len(update_events) == 2 assert update_events[0].data == { "action": "create", "floor_id": floor.floor_id, } assert update_events[1].data == { "action": "update", "floor_id": floor.floor_id, } async def test_update_floor_with_same_data( hass: HomeAssistant, floor_registry: fr.FloorRegistry ) -> None: """Make sure that we can reapply the same data to a floor and it won't update.""" update_events = async_capture_events(hass, fr.EVENT_FLOOR_REGISTRY_UPDATED) floor = floor_registry.async_create( "First floor", icon="mdi:home-floor-1", ) updated_floor = floor_registry.async_update( floor_id=floor.floor_id, name="First floor", icon="mdi:home-floor-1", ) assert floor == updated_floor await hass.async_block_till_done() # No update event assert len(update_events) == 1 assert update_events[0].data == { "action": "create", "floor_id": floor.floor_id, } async def test_update_floor_with_same_name_change_case( floor_registry: fr.FloorRegistry, ) -> None: """Make sure that we can reapply the same name with a different case to a floor.""" floor = floor_registry.async_create("first floor") updated_floor = floor_registry.async_update(floor.floor_id, name="First floor") assert updated_floor.floor_id == floor.floor_id assert updated_floor.name == "First floor" assert updated_floor.normalized_name == floor.normalized_name assert len(floor_registry.floors) == 1 async def test_update_floor_with_name_already_in_use( floor_registry: fr.FloorRegistry, ) -> None: """Make sure that we can't update a floor with a name already in use.""" floor1 = floor_registry.async_create("First floor") floor2 = floor_registry.async_create("Second floor") with pytest.raises( ValueError, match=re.escape("The name Second floor (secondfloor) is already in use"), ): floor_registry.async_update(floor1.floor_id, name="Second floor") assert floor1.name == "First floor" assert floor2.name == "Second floor" assert len(floor_registry.floors) == 2 async def test_update_floor_with_normalized_name_already_in_use( floor_registry: fr.FloorRegistry, ) -> None: """Make sure that we can't update a floor with a normalized name already in use.""" floor1 = floor_registry.async_create("first") floor2 = floor_registry.async_create("S E C O N D") with pytest.raises( ValueError, match=re.escape("The name second (second) is already in use") ): floor_registry.async_update(floor1.floor_id, name="second") assert floor1.name == "first" assert floor2.name == "S E C O N D" assert len(floor_registry.floors) == 2 async def test_load_floors( hass: HomeAssistant, floor_registry: fr.FloorRegistry, freezer: FrozenDateTimeFactory, ) -> None: """Make sure that we can load/save data correctly.""" floor1_created = datetime.fromisoformat("2024-01-01T00:00:00+00:00") freezer.move_to(floor1_created) floor1 = floor_registry.async_create( "First floor", icon="mdi:home-floor-1", aliases={"first", "ground"}, level=1, ) floor2_created = datetime.fromisoformat("2024-02-01T00:00:00+00:00") freezer.move_to(floor2_created) floor2 = floor_registry.async_create( "Second floor", icon="mdi:home-floor-2", aliases={"first", "ground"}, level=2, ) assert len(floor_registry.floors) == 2 registry2 = fr.FloorRegistry(hass) await flush_store(floor_registry._store) await registry2.async_load() assert len(registry2.floors) == 2 assert list(floor_registry.floors) == list(registry2.floors) floor1_registry2 = registry2.async_get_floor_by_name("First floor") assert floor1_registry2 == floor1 floor2_registry2 = registry2.async_get_floor_by_name("Second floor") assert floor2_registry2 == floor2 @pytest.mark.parametrize("load_registries", [False]) async def test_loading_floors_from_storage( hass: HomeAssistant, hass_storage: dict[str, Any], ) -> None: """Test loading stored floors on start.""" hass_storage[fr.STORAGE_KEY] = { "version": fr.STORAGE_VERSION_MAJOR, "data": { "floors": [ { "icon": "mdi:home-floor-1", "floor_id": "first_floor", "name": "First floor", "aliases": ["first", "ground"], "level": 1, } ] }, } await fr.async_load(hass) registry = fr.async_get(hass) assert len(registry.floors) == 1 async def test_getting_floor(floor_registry: fr.FloorRegistry) -> None: """Make sure we can get the floors by name.""" floor = floor_registry.async_create("First floor") floor2 = floor_registry.async_get_floor_by_name("first floor") floor3 = floor_registry.async_get_floor_by_name("first floor") assert floor == floor2 assert floor == floor3 assert floor2 == floor3 get_floor = floor_registry.async_get_floor(floor.floor_id) assert get_floor == floor async def test_async_get_floor_by_name_not_found( floor_registry: fr.FloorRegistry, ) -> None: """Make sure we return None for non-existent floors.""" floor_registry.async_create("First floor") assert len(floor_registry.floors) == 1 assert floor_registry.async_get_floor_by_name("non_exist") is None async def test_floor_removed_from_areas( hass: HomeAssistant, area_registry: ar.AreaRegistry, floor_registry: fr.FloorRegistry, ) -> None: """Test if floor gets removed from areas when the floor is removed.""" floor = floor_registry.async_create("First floor") assert len(floor_registry.floors) == 1 entry = area_registry.async_create(name="Kitchen") area_registry.async_update(entry.id, floor_id=floor.floor_id) entries = ar.async_entries_for_floor(area_registry, floor.floor_id) assert len(entries) == 1 floor_registry.async_delete(floor.floor_id) await hass.async_block_till_done() entries = ar.async_entries_for_floor(area_registry, floor.floor_id) assert len(entries) == 0 async def test_async_create_thread_safety( hass: HomeAssistant, floor_registry: fr.FloorRegistry, ) -> None: """Test async_create raises when called from wrong thread.""" with pytest.raises( RuntimeError, match="Detected code that calls floor_registry.async_create from a thread.", ): await hass.async_add_executor_job(floor_registry.async_create, "any") async def test_async_delete_thread_safety( hass: HomeAssistant, floor_registry: fr.FloorRegistry, ) -> None: """Test async_delete raises when called from wrong thread.""" any_floor = floor_registry.async_create("any") with pytest.raises( RuntimeError, match="Detected code that calls floor_registry.async_delete from a thread.", ): await hass.async_add_executor_job(floor_registry.async_delete, any_floor) async def test_async_update_thread_safety( hass: HomeAssistant, floor_registry: fr.FloorRegistry, ) -> None: """Test async_update raises when called from wrong thread.""" any_floor = floor_registry.async_create("any") with pytest.raises( RuntimeError, match="Detected code that calls floor_registry.async_update from a thread.", ): await hass.async_add_executor_job( partial(floor_registry.async_update, any_floor.floor_id, name="new name") ) @pytest.mark.parametrize("load_registries", [False]) async def test_migration_from_1_1( hass: HomeAssistant, hass_storage: dict[str, Any] ) -> None: """Test migration from version 1.1.""" hass_storage[fr.STORAGE_KEY] = { "version": 1, "data": { "floors": [ { "floor_id": "12345A", "name": "mock", "aliases": [], "icon": None, "level": None, } ] }, } await fr.async_load(hass) registry = fr.async_get(hass) # Test data was loaded entry = registry.async_get_floor_by_name("mock") assert entry.floor_id == "12345A" # Check we store migrated data await flush_store(registry._store) assert hass_storage[fr.STORAGE_KEY] == { "version": fr.STORAGE_VERSION_MAJOR, "minor_version": fr.STORAGE_VERSION_MINOR, "key": fr.STORAGE_KEY, "data": { "floors": [ { "aliases": [], "icon": None, "floor_id": "12345A", "level": None, "name": "mock", "created_at": "1970-01-01T00:00:00+00:00", "modified_at": "1970-01-01T00:00:00+00:00", } ] }, }