"""Tests for the Device Registry.""" from contextlib import nullcontext import time from typing import Any from unittest.mock import patch import pytest from yarl import URL from homeassistant import config_entries from homeassistant.const import EVENT_HOMEASSISTANT_STARTED from homeassistant.core import CoreState, HomeAssistant, callback from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers import ( area_registry as ar, device_registry as dr, entity_registry as er, ) from tests.common import MockConfigEntry, flush_store @pytest.fixture def update_events(hass): """Capture update events.""" events = [] @callback def async_capture(event): events.append(event.data) hass.bus.async_listen(dr.EVENT_DEVICE_REGISTRY_UPDATED, async_capture) return events @pytest.fixture def mock_config_entry(hass: HomeAssistant) -> MockConfigEntry: """Create a mock config entry and add it to hass.""" entry = MockConfigEntry(title=None) entry.add_to_hass(hass) return entry async def test_get_or_create_returns_same_entry( hass: HomeAssistant, device_registry: dr.DeviceRegistry, area_registry: ar.AreaRegistry, mock_config_entry: MockConfigEntry, update_events, ) -> None: """Make sure we do not duplicate entries.""" entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, sw_version="sw-version", name="name", manufacturer="manufacturer", model="model", suggested_area="Game Room", ) entry2 = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "11:22:33:66:77:88")}, identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", suggested_area="Game Room", ) entry3 = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, ) game_room_area = area_registry.async_get_area_by_name("Game Room") assert game_room_area is not None assert len(area_registry.areas) == 1 assert len(device_registry.devices) == 1 assert entry.area_id == game_room_area.id assert entry.id == entry2.id assert entry.id == entry3.id assert entry.identifiers == {("bridgeid", "0123")} assert entry2.area_id == game_room_area.id assert entry3.manufacturer == "manufacturer" assert entry3.model == "model" assert entry3.name == "name" assert entry3.sw_version == "sw-version" assert entry3.suggested_area == "Game Room" assert entry3.area_id == game_room_area.id await hass.async_block_till_done() # Only 2 update events. The third entry did not generate any changes. assert len(update_events) == 2 assert update_events[0]["action"] == "create" assert update_events[0]["device_id"] == entry.id assert "changes" not in update_events[0] assert update_events[1]["action"] == "update" assert update_events[1]["device_id"] == entry.id assert update_events[1]["changes"] == { "connections": {("mac", "12:34:56:ab:cd:ef")} } async def test_requirement_for_identifier_or_connection( device_registry: dr.DeviceRegistry, mock_config_entry: MockConfigEntry, ) -> None: """Make sure we do require some descriptor of device.""" entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers=set(), manufacturer="manufacturer", model="model", ) entry2 = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections=set(), identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", ) assert len(device_registry.devices) == 2 assert entry assert entry2 with pytest.raises(HomeAssistantError): device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections=set(), identifiers=set(), manufacturer="manufacturer", model="model", ) async def test_multiple_config_entries( hass: HomeAssistant, device_registry: dr.DeviceRegistry ) -> None: """Make sure we do not get duplicate entries.""" config_entry_1 = MockConfigEntry() config_entry_1.add_to_hass(hass) config_entry_2 = MockConfigEntry() config_entry_2.add_to_hass(hass) entry = device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", ) entry2 = device_registry.async_get_or_create( config_entry_id=config_entry_2.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", ) entry3 = device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", ) assert len(device_registry.devices) == 1 assert entry.id == entry2.id assert entry.id == entry3.id assert entry2.config_entries == {config_entry_1.entry_id, config_entry_2.entry_id} @pytest.mark.parametrize("load_registries", [False]) async def test_loading_from_storage( hass: HomeAssistant, hass_storage: dict[str, Any], mock_config_entry: MockConfigEntry, ) -> None: """Test loading stored devices on start.""" hass_storage[dr.STORAGE_KEY] = { "version": dr.STORAGE_VERSION_MAJOR, "minor_version": dr.STORAGE_VERSION_MINOR, "data": { "devices": [ { "area_id": "12345A", "config_entries": [mock_config_entry.entry_id], "configuration_url": "https://example.com/config", "connections": [["Zigbee", "01.23.45.67.89"]], "disabled_by": dr.DeviceEntryDisabler.USER, "entry_type": dr.DeviceEntryType.SERVICE, "hw_version": "hw_version", "id": "abcdefghijklm", "identifiers": [["serial", "123456ABCDEF"]], "manufacturer": "manufacturer", "model": "model", "name_by_user": "Test Friendly Name", "name": "name", "serial_number": "serial_no", "sw_version": "version", "via_device_id": None, } ], "deleted_devices": [ { "config_entries": [mock_config_entry.entry_id], "connections": [["Zigbee", "23.45.67.89.01"]], "id": "bcdefghijklmn", "identifiers": [["serial", "3456ABCDEF12"]], "orphaned_timestamp": None, } ], }, } await dr.async_load(hass) registry = dr.async_get(hass) assert len(registry.devices) == 1 assert len(registry.deleted_devices) == 1 entry = registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={("Zigbee", "01.23.45.67.89")}, identifiers={("serial", "123456ABCDEF")}, manufacturer="manufacturer", model="model", ) assert entry == dr.DeviceEntry( area_id="12345A", config_entries={mock_config_entry.entry_id}, configuration_url="https://example.com/config", connections={("Zigbee", "01.23.45.67.89")}, disabled_by=dr.DeviceEntryDisabler.USER, entry_type=dr.DeviceEntryType.SERVICE, hw_version="hw_version", id="abcdefghijklm", identifiers={("serial", "123456ABCDEF")}, manufacturer="manufacturer", model="model", name_by_user="Test Friendly Name", name="name", serial_number="serial_no", suggested_area=None, # Not stored sw_version="version", ) assert isinstance(entry.config_entries, set) assert isinstance(entry.connections, set) assert isinstance(entry.identifiers, set) # Restore a device, id should be reused from the deleted device entry entry = registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={("Zigbee", "23.45.67.89.01")}, identifiers={("serial", "3456ABCDEF12")}, manufacturer="manufacturer", model="model", ) assert entry == dr.DeviceEntry( config_entries={mock_config_entry.entry_id}, connections={("Zigbee", "23.45.67.89.01")}, id="bcdefghijklmn", identifiers={("serial", "3456ABCDEF12")}, manufacturer="manufacturer", model="model", ) assert entry.id == "bcdefghijklmn" assert isinstance(entry.config_entries, set) assert isinstance(entry.connections, set) assert isinstance(entry.identifiers, set) @pytest.mark.parametrize("load_registries", [False]) async def test_migration_1_1_to_1_4( hass: HomeAssistant, hass_storage: dict[str, Any], mock_config_entry: MockConfigEntry, ) -> None: """Test migration from version 1.1 to 1.4.""" hass_storage[dr.STORAGE_KEY] = { "version": 1, "minor_version": 1, "data": { "devices": [ { "config_entries": [mock_config_entry.entry_id], "connections": [["Zigbee", "01.23.45.67.89"]], "entry_type": "service", "id": "abcdefghijklm", "identifiers": [["serial", "123456ABCDEF"]], "manufacturer": "manufacturer", "model": "model", "name": "name", "sw_version": "version", }, # Invalid entry type { "config_entries": [None], "connections": [], "entry_type": "INVALID_VALUE", "id": "invalid-entry-type", "identifiers": [["serial", "mock-id-invalid-entry"]], "manufacturer": None, "model": None, "name": None, "sw_version": None, }, ], "deleted_devices": [ { "config_entries": ["123456"], "connections": [], "entry_type": "service", "id": "deletedid", "identifiers": [["serial", "123456ABCDFF"]], "manufacturer": "manufacturer", "model": "model", "name": "name", "sw_version": "version", } ], }, } await dr.async_load(hass) registry = dr.async_get(hass) # Test data was loaded entry = registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={("Zigbee", "01.23.45.67.89")}, identifiers={("serial", "123456ABCDEF")}, ) assert entry.id == "abcdefghijklm" # Update to trigger a store entry = registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={("Zigbee", "01.23.45.67.89")}, identifiers={("serial", "123456ABCDEF")}, sw_version="new_version", ) assert entry.id == "abcdefghijklm" # Check we store migrated data await flush_store(registry._store) assert hass_storage[dr.STORAGE_KEY] == { "version": dr.STORAGE_VERSION_MAJOR, "minor_version": dr.STORAGE_VERSION_MINOR, "key": dr.STORAGE_KEY, "data": { "devices": [ { "area_id": None, "config_entries": [mock_config_entry.entry_id], "configuration_url": None, "connections": [["Zigbee", "01.23.45.67.89"]], "disabled_by": None, "entry_type": "service", "hw_version": None, "id": "abcdefghijklm", "identifiers": [["serial", "123456ABCDEF"]], "manufacturer": "manufacturer", "model": "model", "name": "name", "name_by_user": None, "serial_number": None, "sw_version": "new_version", "via_device_id": None, }, { "area_id": None, "config_entries": [None], "configuration_url": None, "connections": [], "disabled_by": None, "entry_type": None, "hw_version": None, "id": "invalid-entry-type", "identifiers": [["serial", "mock-id-invalid-entry"]], "manufacturer": None, "model": None, "name_by_user": None, "name": None, "serial_number": None, "sw_version": None, "via_device_id": None, }, ], "deleted_devices": [ { "config_entries": ["123456"], "connections": [], "id": "deletedid", "identifiers": [["serial", "123456ABCDFF"]], "orphaned_timestamp": None, } ], }, } @pytest.mark.parametrize("load_registries", [False]) async def test_migration_1_2_to_1_4( hass: HomeAssistant, hass_storage: dict[str, Any], mock_config_entry: MockConfigEntry, ) -> None: """Test migration from version 1.2 to 1.3.""" hass_storage[dr.STORAGE_KEY] = { "version": 1, "minor_version": 2, "key": dr.STORAGE_KEY, "data": { "devices": [ { "area_id": None, "config_entries": [mock_config_entry.entry_id], "configuration_url": None, "connections": [["Zigbee", "01.23.45.67.89"]], "disabled_by": None, "entry_type": "service", "id": "abcdefghijklm", "identifiers": [["serial", "123456ABCDEF"]], "manufacturer": "manufacturer", "model": "model", "name": "name", "name_by_user": None, "sw_version": "version", "via_device_id": None, }, { "area_id": None, "config_entries": [None], "configuration_url": None, "connections": [], "disabled_by": None, "entry_type": None, "id": "invalid-entry-type", "identifiers": [["serial", "mock-id-invalid-entry"]], "manufacturer": None, "model": None, "name_by_user": None, "name": None, "sw_version": None, "via_device_id": None, }, ], "deleted_devices": [], }, } await dr.async_load(hass) registry = dr.async_get(hass) # Test data was loaded entry = registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={("Zigbee", "01.23.45.67.89")}, identifiers={("serial", "123456ABCDEF")}, ) assert entry.id == "abcdefghijklm" # Update to trigger a store entry = registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={("Zigbee", "01.23.45.67.89")}, identifiers={("serial", "123456ABCDEF")}, sw_version="new_version", ) assert entry.id == "abcdefghijklm" # Check we store migrated data await flush_store(registry._store) assert hass_storage[dr.STORAGE_KEY] == { "version": dr.STORAGE_VERSION_MAJOR, "minor_version": dr.STORAGE_VERSION_MINOR, "key": dr.STORAGE_KEY, "data": { "devices": [ { "area_id": None, "config_entries": [mock_config_entry.entry_id], "configuration_url": None, "connections": [["Zigbee", "01.23.45.67.89"]], "disabled_by": None, "entry_type": "service", "hw_version": None, "id": "abcdefghijklm", "identifiers": [["serial", "123456ABCDEF"]], "manufacturer": "manufacturer", "model": "model", "name": "name", "name_by_user": None, "serial_number": None, "sw_version": "new_version", "via_device_id": None, }, { "area_id": None, "config_entries": [None], "configuration_url": None, "connections": [], "disabled_by": None, "entry_type": None, "hw_version": None, "id": "invalid-entry-type", "identifiers": [["serial", "mock-id-invalid-entry"]], "manufacturer": None, "model": None, "name_by_user": None, "name": None, "serial_number": None, "sw_version": None, "via_device_id": None, }, ], "deleted_devices": [], }, } @pytest.mark.parametrize("load_registries", [False]) async def test_migration_1_3_to_1_4( hass: HomeAssistant, hass_storage: dict[str, Any], mock_config_entry: MockConfigEntry, ): """Test migration from version 1.3 to 1.4.""" hass_storage[dr.STORAGE_KEY] = { "version": 1, "minor_version": 3, "key": dr.STORAGE_KEY, "data": { "devices": [ { "area_id": None, "config_entries": [mock_config_entry.entry_id], "configuration_url": None, "connections": [["Zigbee", "01.23.45.67.89"]], "disabled_by": None, "entry_type": "service", "hw_version": "hw_version", "id": "abcdefghijklm", "identifiers": [["serial", "123456ABCDEF"]], "manufacturer": "manufacturer", "model": "model", "name": "name", "name_by_user": None, "sw_version": "version", "via_device_id": None, }, { "area_id": None, "config_entries": [None], "configuration_url": None, "connections": [], "disabled_by": None, "entry_type": None, "hw_version": None, "id": "invalid-entry-type", "identifiers": [["serial", "mock-id-invalid-entry"]], "manufacturer": None, "model": None, "name_by_user": None, "name": None, "sw_version": None, "via_device_id": None, }, ], "deleted_devices": [], }, } await dr.async_load(hass) registry = dr.async_get(hass) # Test data was loaded entry = registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={("Zigbee", "01.23.45.67.89")}, identifiers={("serial", "123456ABCDEF")}, ) assert entry.id == "abcdefghijklm" # Update to trigger a store entry = registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={("Zigbee", "01.23.45.67.89")}, identifiers={("serial", "123456ABCDEF")}, sw_version="new_version", ) assert entry.id == "abcdefghijklm" # Check we store migrated data await flush_store(registry._store) assert hass_storage[dr.STORAGE_KEY] == { "version": dr.STORAGE_VERSION_MAJOR, "minor_version": dr.STORAGE_VERSION_MINOR, "key": dr.STORAGE_KEY, "data": { "devices": [ { "area_id": None, "config_entries": [mock_config_entry.entry_id], "configuration_url": None, "connections": [["Zigbee", "01.23.45.67.89"]], "disabled_by": None, "entry_type": "service", "hw_version": "hw_version", "id": "abcdefghijklm", "identifiers": [["serial", "123456ABCDEF"]], "manufacturer": "manufacturer", "model": "model", "name": "name", "name_by_user": None, "serial_number": None, "sw_version": "new_version", "via_device_id": None, }, { "area_id": None, "config_entries": [None], "configuration_url": None, "connections": [], "disabled_by": None, "entry_type": None, "hw_version": None, "id": "invalid-entry-type", "identifiers": [["serial", "mock-id-invalid-entry"]], "manufacturer": None, "model": None, "name_by_user": None, "name": None, "serial_number": None, "sw_version": None, "via_device_id": None, }, ], "deleted_devices": [], }, } async def test_removing_config_entries( hass: HomeAssistant, device_registry: dr.DeviceRegistry, update_events ) -> None: """Make sure we do not get duplicate entries.""" config_entry_1 = MockConfigEntry() config_entry_1.add_to_hass(hass) config_entry_2 = MockConfigEntry() config_entry_2.add_to_hass(hass) entry = device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", ) entry2 = device_registry.async_get_or_create( config_entry_id=config_entry_2.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", ) entry3 = device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "34:56:78:CD:EF:12")}, identifiers={("bridgeid", "4567")}, manufacturer="manufacturer", model="model", ) assert len(device_registry.devices) == 2 assert entry.id == entry2.id assert entry.id != entry3.id assert entry2.config_entries == {config_entry_1.entry_id, config_entry_2.entry_id} device_registry.async_clear_config_entry(config_entry_1.entry_id) entry = device_registry.async_get_device(identifiers={("bridgeid", "0123")}) entry3_removed = device_registry.async_get_device( identifiers={("bridgeid", "4567")} ) assert entry.config_entries == {config_entry_2.entry_id} assert entry3_removed is None await hass.async_block_till_done() assert len(update_events) == 5 assert update_events[0]["action"] == "create" assert update_events[0]["device_id"] == entry.id assert "changes" not in update_events[0] assert update_events[1]["action"] == "update" assert update_events[1]["device_id"] == entry2.id assert update_events[1]["changes"] == {"config_entries": {config_entry_1.entry_id}} assert update_events[2]["action"] == "create" assert update_events[2]["device_id"] == entry3.id assert "changes" not in update_events[2] assert update_events[3]["action"] == "update" assert update_events[3]["device_id"] == entry.id assert update_events[3]["changes"] == { "config_entries": {config_entry_1.entry_id, config_entry_2.entry_id} } assert update_events[4]["action"] == "remove" assert update_events[4]["device_id"] == entry3.id assert "changes" not in update_events[4] async def test_deleted_device_removing_config_entries( hass: HomeAssistant, device_registry: dr.DeviceRegistry, update_events ) -> None: """Make sure we do not get duplicate entries.""" config_entry_1 = MockConfigEntry() config_entry_1.add_to_hass(hass) config_entry_2 = MockConfigEntry() config_entry_2.add_to_hass(hass) entry = device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", ) entry2 = device_registry.async_get_or_create( config_entry_id=config_entry_2.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", ) entry3 = device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "34:56:78:CD:EF:12")}, identifiers={("bridgeid", "4567")}, manufacturer="manufacturer", model="model", ) assert len(device_registry.devices) == 2 assert len(device_registry.deleted_devices) == 0 assert entry.id == entry2.id assert entry.id != entry3.id assert entry2.config_entries == {config_entry_1.entry_id, config_entry_2.entry_id} device_registry.async_remove_device(entry.id) device_registry.async_remove_device(entry3.id) assert len(device_registry.devices) == 0 assert len(device_registry.deleted_devices) == 2 await hass.async_block_till_done() assert len(update_events) == 5 assert update_events[0]["action"] == "create" assert update_events[0]["device_id"] == entry.id assert "changes" not in update_events[0] assert update_events[1]["action"] == "update" assert update_events[1]["device_id"] == entry2.id assert update_events[1]["changes"] == {"config_entries": {config_entry_1.entry_id}} assert update_events[2]["action"] == "create" assert update_events[2]["device_id"] == entry3.id assert "changes" not in update_events[2]["device_id"] assert update_events[3]["action"] == "remove" assert update_events[3]["device_id"] == entry.id assert "changes" not in update_events[3] assert update_events[4]["action"] == "remove" assert update_events[4]["device_id"] == entry3.id assert "changes" not in update_events[4] device_registry.async_clear_config_entry(config_entry_1.entry_id) assert len(device_registry.devices) == 0 assert len(device_registry.deleted_devices) == 2 device_registry.async_clear_config_entry(config_entry_2.entry_id) assert len(device_registry.devices) == 0 assert len(device_registry.deleted_devices) == 2 # No event when a deleted device is purged await hass.async_block_till_done() assert len(update_events) == 5 # Re-add, expect to keep the device id entry2 = device_registry.async_get_or_create( config_entry_id=config_entry_2.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", ) assert entry.id == entry2.id future_time = time.time() + dr.ORPHANED_DEVICE_KEEP_SECONDS + 1 with patch("time.time", return_value=future_time): device_registry.async_purge_expired_orphaned_devices() # Re-add, expect to get a new device id after the purge entry4 = device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", ) assert entry3.id != entry4.id async def test_removing_area_id( device_registry: dr.DeviceRegistry, mock_config_entry: MockConfigEntry ) -> None: """Make sure we can clear area id.""" entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", ) entry_w_area = device_registry.async_update_device(entry.id, area_id="12345A") device_registry.async_clear_area_id("12345A") entry_wo_area = device_registry.async_get_device(identifiers={("bridgeid", "0123")}) assert not entry_wo_area.area_id assert entry_w_area != entry_wo_area async def test_specifying_via_device_create( hass: HomeAssistant, device_registry: dr.DeviceRegistry ) -> None: """Test specifying a via_device and removal of the hub device.""" config_entry_1 = MockConfigEntry() config_entry_1.add_to_hass(hass) config_entry_2 = MockConfigEntry() config_entry_2.add_to_hass(hass) via = device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("hue", "0123")}, manufacturer="manufacturer", model="via", ) light = device_registry.async_get_or_create( config_entry_id=config_entry_2.entry_id, connections=set(), identifiers={("hue", "456")}, manufacturer="manufacturer", model="light", via_device=("hue", "0123"), ) assert light.via_device_id == via.id device_registry.async_remove_device(via.id) light = device_registry.async_get_device(identifiers={("hue", "456")}) assert light.via_device_id is None async def test_specifying_via_device_update( hass: HomeAssistant, device_registry: dr.DeviceRegistry ) -> None: """Test specifying a via_device and updating.""" config_entry_1 = MockConfigEntry() config_entry_1.add_to_hass(hass) config_entry_2 = MockConfigEntry() config_entry_2.add_to_hass(hass) light = device_registry.async_get_or_create( config_entry_id=config_entry_2.entry_id, connections=set(), identifiers={("hue", "456")}, manufacturer="manufacturer", model="light", via_device=("hue", "0123"), ) assert light.via_device_id is None via = device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("hue", "0123")}, manufacturer="manufacturer", model="via", ) light = device_registry.async_get_or_create( config_entry_id=config_entry_2.entry_id, connections=set(), identifiers={("hue", "456")}, manufacturer="manufacturer", model="light", via_device=("hue", "0123"), ) assert light.via_device_id == via.id async def test_loading_saving_data( hass: HomeAssistant, device_registry: dr.DeviceRegistry ) -> None: """Test that we load/save data correctly.""" config_entry_1 = MockConfigEntry() config_entry_1.add_to_hass(hass) config_entry_2 = MockConfigEntry() config_entry_2.add_to_hass(hass) config_entry_3 = MockConfigEntry() config_entry_3.add_to_hass(hass) config_entry_4 = MockConfigEntry() config_entry_4.add_to_hass(hass) config_entry_5 = MockConfigEntry() config_entry_5.add_to_hass(hass) orig_via = device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("hue", "0123")}, manufacturer="manufacturer", model="via", name="Original Name", sw_version="Orig SW 1", entry_type=None, ) orig_light = device_registry.async_get_or_create( config_entry_id=config_entry_2.entry_id, connections=set(), identifiers={("hue", "456")}, manufacturer="manufacturer", model="light", via_device=("hue", "0123"), disabled_by=dr.DeviceEntryDisabler.USER, ) orig_light2 = device_registry.async_get_or_create( config_entry_id=config_entry_2.entry_id, connections=set(), identifiers={("hue", "789")}, manufacturer="manufacturer", model="light", via_device=("hue", "0123"), ) device_registry.async_remove_device(orig_light2.id) orig_light3 = device_registry.async_get_or_create( config_entry_id=config_entry_3.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "34:56:AB:CD:EF:12")}, identifiers={("hue", "abc")}, manufacturer="manufacturer", model="light", ) device_registry.async_get_or_create( config_entry_id=config_entry_4.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "34:56:AB:CD:EF:12")}, identifiers={("abc", "123")}, manufacturer="manufacturer", model="light", ) device_registry.async_remove_device(orig_light3.id) orig_light4 = device_registry.async_get_or_create( config_entry_id=config_entry_3.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "34:56:AB:CD:EF:12")}, identifiers={("hue", "abc")}, manufacturer="manufacturer", model="light", entry_type=dr.DeviceEntryType.SERVICE, ) assert orig_light4.id == orig_light3.id orig_kitchen_light = device_registry.async_get_or_create( config_entry_id=config_entry_5.entry_id, connections=set(), identifiers={("hue", "999")}, manufacturer="manufacturer", model="light", via_device=("hue", "0123"), disabled_by=dr.DeviceEntryDisabler.USER, suggested_area="Kitchen", ) assert len(device_registry.devices) == 4 assert len(device_registry.deleted_devices) == 1 orig_via = device_registry.async_update_device( orig_via.id, area_id="mock-area-id", name_by_user="mock-name-by-user" ) # Now load written data in new registry registry2 = dr.DeviceRegistry(hass) await flush_store(device_registry._store) await registry2.async_load() # Ensure same order assert list(device_registry.devices) == list(registry2.devices) assert list(device_registry.deleted_devices) == list(registry2.deleted_devices) new_via = registry2.async_get_device(identifiers={("hue", "0123")}) new_light = registry2.async_get_device(identifiers={("hue", "456")}) new_light4 = registry2.async_get_device(identifiers={("hue", "abc")}) assert orig_via == new_via assert orig_light == new_light assert orig_light4 == new_light4 # Ensure enums converted for old, new in ( (orig_via, new_via), (orig_light, new_light), (orig_light4, new_light4), ): assert old.disabled_by is new.disabled_by assert old.entry_type is new.entry_type # Ensure a save/load cycle does not keep suggested area new_kitchen_light = registry2.async_get_device(identifiers={("hue", "999")}) assert orig_kitchen_light.suggested_area == "Kitchen" orig_kitchen_light_witout_suggested_area = device_registry.async_update_device( orig_kitchen_light.id, suggested_area=None ) assert orig_kitchen_light_witout_suggested_area.suggested_area is None assert orig_kitchen_light_witout_suggested_area == new_kitchen_light async def test_no_unnecessary_changes( device_registry: dr.DeviceRegistry, mock_config_entry: MockConfigEntry ) -> None: """Make sure we do not consider devices changes.""" entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={("ethernet", "12:34:56:78:90:AB:CD:EF")}, identifiers={("hue", "456"), ("bla", "123")}, ) with patch( "homeassistant.helpers.device_registry.DeviceRegistry.async_schedule_save" ) as mock_save: entry2 = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, identifiers={("hue", "456")} ) assert entry.id == entry2.id assert len(mock_save.mock_calls) == 0 async def test_format_mac( device_registry: dr.DeviceRegistry, mock_config_entry: MockConfigEntry ) -> None: """Make sure we normalize mac addresses.""" entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, ) for mac in ["123456ABCDEF", "123456abcdef", "12:34:56:ab:cd:ef", "1234.56ab.cdef"]: test_entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, mac)}, ) assert test_entry.id == entry.id, mac assert test_entry.connections == { (dr.CONNECTION_NETWORK_MAC, "12:34:56:ab:cd:ef") } # This should not raise for invalid in [ "invalid_mac", "123456ABCDEFG", # 1 extra char "12:34:56:ab:cdef", # not enough : "12:34:56:ab:cd:e:f", # too many : "1234.56abcdef", # not enough . "123.456.abc.def", # too many . ]: invalid_mac_entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, invalid)}, ) assert list(invalid_mac_entry.connections)[0][1] == invalid async def test_update( hass: HomeAssistant, device_registry: dr.DeviceRegistry, mock_config_entry: MockConfigEntry, update_events, ) -> None: """Verify that we can update some attributes of a device.""" entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("hue", "456"), ("bla", "123")}, ) new_identifiers = {("hue", "654"), ("bla", "321")} assert not entry.area_id assert not entry.name_by_user with patch.object(device_registry, "async_schedule_save") as mock_save: updated_entry = device_registry.async_update_device( entry.id, area_id="12345A", configuration_url="https://example.com/config", disabled_by=dr.DeviceEntryDisabler.USER, entry_type=dr.DeviceEntryType.SERVICE, hw_version="hw_version", manufacturer="Test Producer", model="Test Model", name_by_user="Test Friendly Name", name="name", new_identifiers=new_identifiers, serial_number="serial_no", suggested_area="suggested_area", sw_version="version", via_device_id="98765B", ) assert mock_save.call_count == 1 assert updated_entry != entry assert updated_entry == dr.DeviceEntry( area_id="12345A", config_entries={mock_config_entry.entry_id}, configuration_url="https://example.com/config", connections={("mac", "12:34:56:ab:cd:ef")}, disabled_by=dr.DeviceEntryDisabler.USER, entry_type=dr.DeviceEntryType.SERVICE, hw_version="hw_version", id=entry.id, identifiers={("bla", "321"), ("hue", "654")}, manufacturer="Test Producer", model="Test Model", name_by_user="Test Friendly Name", name="name", serial_number="serial_no", suggested_area="suggested_area", sw_version="version", via_device_id="98765B", ) assert device_registry.async_get_device(identifiers={("hue", "456")}) is None assert device_registry.async_get_device(identifiers={("bla", "123")}) is None assert ( device_registry.async_get_device(identifiers={("hue", "654")}) == updated_entry ) assert ( device_registry.async_get_device(identifiers={("bla", "321")}) == updated_entry ) assert ( device_registry.async_get_device( connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")} ) == updated_entry ) assert device_registry.async_get(updated_entry.id) is not None await hass.async_block_till_done() assert len(update_events) == 2 assert update_events[0]["action"] == "create" assert update_events[0]["device_id"] == entry.id assert "changes" not in update_events[0] assert update_events[1]["action"] == "update" assert update_events[1]["device_id"] == entry.id assert update_events[1]["changes"] == { "area_id": None, "configuration_url": None, "disabled_by": None, "entry_type": None, "hw_version": None, "identifiers": {("bla", "123"), ("hue", "456")}, "manufacturer": None, "model": None, "name": None, "name_by_user": None, "serial_number": None, "suggested_area": None, "sw_version": None, "via_device_id": None, } async def test_update_remove_config_entries( hass: HomeAssistant, device_registry: dr.DeviceRegistry, update_events ) -> None: """Make sure we do not get duplicate entries.""" config_entry_1 = MockConfigEntry() config_entry_1.add_to_hass(hass) config_entry_2 = MockConfigEntry() config_entry_2.add_to_hass(hass) entry = device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", ) entry2 = device_registry.async_get_or_create( config_entry_id=config_entry_2.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", ) entry3 = device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "34:56:78:CD:EF:12")}, identifiers={("bridgeid", "4567")}, manufacturer="manufacturer", model="model", ) assert len(device_registry.devices) == 2 assert entry.id == entry2.id assert entry.id != entry3.id assert entry2.config_entries == {config_entry_1.entry_id, config_entry_2.entry_id} updated_entry = device_registry.async_update_device( entry2.id, remove_config_entry_id=config_entry_1.entry_id ) removed_entry = device_registry.async_update_device( entry3.id, remove_config_entry_id=config_entry_1.entry_id ) assert updated_entry.config_entries == {config_entry_2.entry_id} assert removed_entry is None removed_entry = device_registry.async_get_device(identifiers={("bridgeid", "4567")}) assert removed_entry is None await hass.async_block_till_done() assert len(update_events) == 5 assert update_events[0]["action"] == "create" assert update_events[0]["device_id"] == entry.id assert "changes" not in update_events[0] assert update_events[1]["action"] == "update" assert update_events[1]["device_id"] == entry2.id assert update_events[1]["changes"] == {"config_entries": {config_entry_1.entry_id}} assert update_events[2]["action"] == "create" assert update_events[2]["device_id"] == entry3.id assert "changes" not in update_events[2] assert update_events[3]["action"] == "update" assert update_events[3]["device_id"] == entry.id assert update_events[3]["changes"] == { "config_entries": {config_entry_1.entry_id, config_entry_2.entry_id} } assert update_events[4]["action"] == "remove" assert update_events[4]["device_id"] == entry3.id assert "changes" not in update_events[4] async def test_update_suggested_area( hass: HomeAssistant, device_registry: dr.DeviceRegistry, area_registry: ar.AreaRegistry, mock_config_entry: MockConfigEntry, update_events, ) -> None: """Verify that we can update the suggested area version of a device.""" entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bla", "123")}, ) assert not entry.suggested_area assert entry.area_id is None suggested_area = "Pool" with patch.object(device_registry, "async_schedule_save") as mock_save: updated_entry = device_registry.async_update_device( entry.id, suggested_area=suggested_area ) assert mock_save.call_count == 1 assert updated_entry != entry assert updated_entry.suggested_area == suggested_area pool_area = area_registry.async_get_area_by_name("Pool") assert pool_area is not None assert updated_entry.area_id == pool_area.id assert len(area_registry.areas) == 1 await hass.async_block_till_done() assert len(update_events) == 2 assert update_events[0]["action"] == "create" assert update_events[0]["device_id"] == entry.id assert "changes" not in update_events[0] assert update_events[1]["action"] == "update" assert update_events[1]["device_id"] == entry.id assert update_events[1]["changes"] == {"area_id": None, "suggested_area": None} # Do not save or fire the event if the suggested # area does not result in a change of area # but still update the actual entry with patch.object(device_registry, "async_schedule_save") as mock_save_2: updated_entry = device_registry.async_update_device( entry.id, suggested_area="Other" ) assert len(update_events) == 2 assert mock_save_2.call_count == 0 assert updated_entry != entry assert updated_entry.suggested_area == "Other" async def test_cleanup_device_registry( hass: HomeAssistant, device_registry: dr.DeviceRegistry, entity_registry: er.EntityRegistry, ) -> None: """Test cleanup works.""" config_entry = MockConfigEntry(domain="hue") config_entry.add_to_hass(hass) ghost_config_entry = MockConfigEntry() ghost_config_entry.add_to_hass(hass) d1 = device_registry.async_get_or_create( identifiers={("hue", "d1")}, config_entry_id=config_entry.entry_id ) device_registry.async_get_or_create( identifiers={("hue", "d2")}, config_entry_id=config_entry.entry_id ) d3 = device_registry.async_get_or_create( identifiers={("hue", "d3")}, config_entry_id=config_entry.entry_id ) device_registry.async_get_or_create( identifiers={("something", "d4")}, config_entry_id=ghost_config_entry.entry_id ) # Remove the config entry without triggering the normal cleanup hass.config_entries._entries.pop(ghost_config_entry.entry_id) entity_registry.async_get_or_create("light", "hue", "e1", device_id=d1.id) entity_registry.async_get_or_create("light", "hue", "e2", device_id=d1.id) entity_registry.async_get_or_create("light", "hue", "e3", device_id=d3.id) # Manual cleanup should detect the orphaned config entry dr.async_cleanup(hass, device_registry, entity_registry) assert device_registry.async_get_device(identifiers={("hue", "d1")}) is not None assert device_registry.async_get_device(identifiers={("hue", "d2")}) is not None assert device_registry.async_get_device(identifiers={("hue", "d3")}) is not None assert device_registry.async_get_device(identifiers={("something", "d4")}) is None async def test_cleanup_device_registry_removes_expired_orphaned_devices( hass: HomeAssistant, device_registry: dr.DeviceRegistry, entity_registry: er.EntityRegistry, ) -> None: """Test cleanup removes expired orphaned devices.""" config_entry = MockConfigEntry(domain="hue") config_entry.add_to_hass(hass) device_registry.async_get_or_create( identifiers={("hue", "d1")}, config_entry_id=config_entry.entry_id ) device_registry.async_get_or_create( identifiers={("hue", "d2")}, config_entry_id=config_entry.entry_id ) device_registry.async_get_or_create( identifiers={("hue", "d3")}, config_entry_id=config_entry.entry_id ) device_registry.async_clear_config_entry(config_entry.entry_id) assert len(device_registry.devices) == 0 assert len(device_registry.deleted_devices) == 3 dr.async_cleanup(hass, device_registry, entity_registry) assert len(device_registry.devices) == 0 assert len(device_registry.deleted_devices) == 3 future_time = time.time() + dr.ORPHANED_DEVICE_KEEP_SECONDS + 1 with patch("time.time", return_value=future_time): dr.async_cleanup(hass, device_registry, entity_registry) assert len(device_registry.devices) == 0 assert len(device_registry.deleted_devices) == 0 async def test_cleanup_startup(hass: HomeAssistant) -> None: """Test we run a cleanup on startup.""" hass.state = CoreState.not_running with patch( "homeassistant.helpers.device_registry.Debouncer.async_call" ) as mock_call: hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) await hass.async_block_till_done() assert len(mock_call.mock_calls) == 1 @pytest.mark.parametrize("load_registries", [False]) async def test_cleanup_entity_registry_change(hass: HomeAssistant) -> None: """Test we run a cleanup when entity registry changes. Don't pre-load the registries as the debouncer will then not be waiting for EVENT_ENTITY_REGISTRY_UPDATED events. """ await dr.async_load(hass) await er.async_load(hass) ent_reg = er.async_get(hass) with patch( "homeassistant.helpers.device_registry.Debouncer.async_call" ) as mock_call: entity = ent_reg.async_get_or_create("light", "hue", "e1") await hass.async_block_till_done() assert len(mock_call.mock_calls) == 0 # Normal update does not trigger ent_reg.async_update_entity(entity.entity_id, name="updated") await hass.async_block_till_done() assert len(mock_call.mock_calls) == 0 # Device ID update triggers ent_reg.async_get_or_create("light", "hue", "e1", device_id="bla") await hass.async_block_till_done() assert len(mock_call.mock_calls) == 1 # Removal also triggers ent_reg.async_remove(entity.entity_id) await hass.async_block_till_done() assert len(mock_call.mock_calls) == 2 async def test_restore_device( hass: HomeAssistant, device_registry: dr.DeviceRegistry, mock_config_entry: MockConfigEntry, update_events, ) -> None: """Make sure device id is stable.""" entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", ) assert len(device_registry.devices) == 1 assert len(device_registry.deleted_devices) == 0 device_registry.async_remove_device(entry.id) assert len(device_registry.devices) == 0 assert len(device_registry.deleted_devices) == 1 entry2 = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "34:56:78:CD:EF:12")}, identifiers={("bridgeid", "4567")}, manufacturer="manufacturer", model="model", ) entry3 = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, manufacturer="manufacturer", model="model", ) assert entry.id == entry3.id assert entry.id != entry2.id assert len(device_registry.devices) == 2 assert len(device_registry.deleted_devices) == 0 assert isinstance(entry3.config_entries, set) assert isinstance(entry3.connections, set) assert isinstance(entry3.identifiers, set) await hass.async_block_till_done() assert len(update_events) == 4 assert update_events[0]["action"] == "create" assert update_events[0]["device_id"] == entry.id assert "changes" not in update_events[0] assert update_events[1]["action"] == "remove" assert update_events[1]["device_id"] == entry.id assert "changes" not in update_events[1] assert update_events[2]["action"] == "create" assert update_events[2]["device_id"] == entry2.id assert "changes" not in update_events[2] assert update_events[3]["action"] == "create" assert update_events[3]["device_id"] == entry3.id assert "changes" not in update_events[3] async def test_restore_simple_device( hass: HomeAssistant, device_registry: dr.DeviceRegistry, mock_config_entry: MockConfigEntry, update_events, ) -> None: """Make sure device id is stable.""" entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, ) assert len(device_registry.devices) == 1 assert len(device_registry.deleted_devices) == 0 device_registry.async_remove_device(entry.id) assert len(device_registry.devices) == 0 assert len(device_registry.deleted_devices) == 1 entry2 = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "34:56:78:CD:EF:12")}, identifiers={("bridgeid", "4567")}, ) entry3 = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, ) assert entry.id == entry3.id assert entry.id != entry2.id assert len(device_registry.devices) == 2 assert len(device_registry.deleted_devices) == 0 await hass.async_block_till_done() assert len(update_events) == 4 assert update_events[0]["action"] == "create" assert update_events[0]["device_id"] == entry.id assert "changes" not in update_events[0] assert update_events[1]["action"] == "remove" assert update_events[1]["device_id"] == entry.id assert "changes" not in update_events[1] assert update_events[2]["action"] == "create" assert update_events[2]["device_id"] == entry2.id assert "changes" not in update_events[2] assert update_events[3]["action"] == "create" assert update_events[3]["device_id"] == entry3.id assert "changes" not in update_events[3] async def test_restore_shared_device( hass: HomeAssistant, device_registry: dr.DeviceRegistry, update_events ) -> None: """Make sure device id is stable for shared devices.""" config_entry_1 = MockConfigEntry() config_entry_1.add_to_hass(hass) config_entry_2 = MockConfigEntry() config_entry_2.add_to_hass(hass) entry = device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("entry_123", "0123")}, manufacturer="manufacturer", model="model", ) assert len(device_registry.devices) == 1 assert len(device_registry.deleted_devices) == 0 device_registry.async_get_or_create( config_entry_id=config_entry_2.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("entry_234", "2345")}, manufacturer="manufacturer", model="model", ) assert len(device_registry.devices) == 1 assert len(device_registry.deleted_devices) == 0 device_registry.async_remove_device(entry.id) assert len(device_registry.devices) == 0 assert len(device_registry.deleted_devices) == 1 entry2 = device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("entry_123", "0123")}, manufacturer="manufacturer", model="model", ) assert entry.id == entry2.id assert len(device_registry.devices) == 1 assert len(device_registry.deleted_devices) == 0 assert isinstance(entry2.config_entries, set) assert isinstance(entry2.connections, set) assert isinstance(entry2.identifiers, set) device_registry.async_remove_device(entry.id) entry3 = device_registry.async_get_or_create( config_entry_id=config_entry_2.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("entry_234", "2345")}, manufacturer="manufacturer", model="model", ) assert entry.id == entry3.id assert len(device_registry.devices) == 1 assert len(device_registry.deleted_devices) == 0 assert isinstance(entry3.config_entries, set) assert isinstance(entry3.connections, set) assert isinstance(entry3.identifiers, set) entry4 = device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("entry_123", "0123")}, manufacturer="manufacturer", model="model", ) assert entry.id == entry4.id assert len(device_registry.devices) == 1 assert len(device_registry.deleted_devices) == 0 assert isinstance(entry4.config_entries, set) assert isinstance(entry4.connections, set) assert isinstance(entry4.identifiers, set) await hass.async_block_till_done() assert len(update_events) == 7 assert update_events[0]["action"] == "create" assert update_events[0]["device_id"] == entry.id assert "changes" not in update_events[0] assert update_events[1]["action"] == "update" assert update_events[1]["device_id"] == entry.id assert update_events[1]["changes"] == { "config_entries": {config_entry_1.entry_id}, "identifiers": {("entry_123", "0123")}, } assert update_events[2]["action"] == "remove" assert update_events[2]["device_id"] == entry.id assert "changes" not in update_events[2] assert update_events[3]["action"] == "create" assert update_events[3]["device_id"] == entry.id assert "changes" not in update_events[3] assert update_events[4]["action"] == "remove" assert update_events[4]["device_id"] == entry.id assert "changes" not in update_events[4] assert update_events[5]["action"] == "create" assert update_events[5]["device_id"] == entry.id assert "changes" not in update_events[5] assert update_events[6]["action"] == "update" assert update_events[6]["device_id"] == entry.id assert update_events[6]["changes"] == { "config_entries": {config_entry_2.entry_id}, "identifiers": {("entry_234", "2345")}, } async def test_get_or_create_empty_then_set_default_values( device_registry: dr.DeviceRegistry, mock_config_entry: MockConfigEntry, ) -> None: """Test creating an entry, then setting default name, model, manufacturer.""" entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, ) assert entry.name is None assert entry.model is None assert entry.manufacturer is None entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, default_name="default name 1", default_model="default model 1", default_manufacturer="default manufacturer 1", ) assert entry.name == "default name 1" assert entry.model == "default model 1" assert entry.manufacturer == "default manufacturer 1" entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, default_name="default name 2", default_model="default model 2", default_manufacturer="default manufacturer 2", ) assert entry.name == "default name 1" assert entry.model == "default model 1" assert entry.manufacturer == "default manufacturer 1" async def test_get_or_create_empty_then_update( device_registry: dr.DeviceRegistry, mock_config_entry: MockConfigEntry, ) -> None: """Test creating an entry, then setting name, model, manufacturer.""" entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, ) assert entry.name is None assert entry.model is None assert entry.manufacturer is None entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, name="name 1", model="model 1", manufacturer="manufacturer 1", ) assert entry.name == "name 1" assert entry.model == "model 1" assert entry.manufacturer == "manufacturer 1" entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, default_name="default name 1", default_model="default model 1", default_manufacturer="default manufacturer 1", ) assert entry.name == "name 1" assert entry.model == "model 1" assert entry.manufacturer == "manufacturer 1" async def test_get_or_create_sets_default_values( device_registry: dr.DeviceRegistry, mock_config_entry: MockConfigEntry, ) -> None: """Test creating an entry, then setting default name, model, manufacturer.""" entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, default_name="default name 1", default_model="default model 1", default_manufacturer="default manufacturer 1", ) assert entry.name == "default name 1" assert entry.model == "default model 1" assert entry.manufacturer == "default manufacturer 1" entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, default_name="default name 2", default_model="default model 2", default_manufacturer="default manufacturer 2", ) assert entry.name == "default name 1" assert entry.model == "default model 1" assert entry.manufacturer == "default manufacturer 1" async def test_verify_suggested_area_does_not_overwrite_area_id( device_registry: dr.DeviceRegistry, area_registry: ar.AreaRegistry, mock_config_entry: MockConfigEntry, ) -> None: """Make sure suggested area does not override a set area id.""" game_room_area = area_registry.async_create("Game Room") original_entry = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, sw_version="sw-version", name="name", manufacturer="manufacturer", model="model", ) entry = device_registry.async_update_device( original_entry.id, area_id=game_room_area.id ) assert entry.area_id == game_room_area.id entry2 = device_registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, identifiers={("bridgeid", "0123")}, sw_version="sw-version", name="name", manufacturer="manufacturer", model="model", suggested_area="New Game Room", ) assert entry2.area_id == game_room_area.id async def test_disable_config_entry_disables_devices( hass: HomeAssistant, device_registry: dr.DeviceRegistry ) -> None: """Test that we disable entities tied to a config entry.""" config_entry = MockConfigEntry(domain="light") config_entry.add_to_hass(hass) entry1 = device_registry.async_get_or_create( config_entry_id=config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, ) entry2 = device_registry.async_get_or_create( config_entry_id=config_entry.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "34:56:AB:CD:EF:12")}, disabled_by=dr.DeviceEntryDisabler.USER, ) assert not entry1.disabled assert entry2.disabled await hass.config_entries.async_set_disabled_by( config_entry.entry_id, config_entries.ConfigEntryDisabler.USER ) await hass.async_block_till_done() entry1 = device_registry.async_get(entry1.id) assert entry1.disabled assert entry1.disabled_by is dr.DeviceEntryDisabler.CONFIG_ENTRY entry2 = device_registry.async_get(entry2.id) assert entry2.disabled assert entry2.disabled_by is dr.DeviceEntryDisabler.USER await hass.config_entries.async_set_disabled_by(config_entry.entry_id, None) await hass.async_block_till_done() entry1 = device_registry.async_get(entry1.id) assert not entry1.disabled entry2 = device_registry.async_get(entry2.id) assert entry2.disabled assert entry2.disabled_by is dr.DeviceEntryDisabler.USER async def test_only_disable_device_if_all_config_entries_are_disabled( hass: HomeAssistant, device_registry: dr.DeviceRegistry ) -> None: """Test that we only disable device if all related config entries are disabled.""" config_entry1 = MockConfigEntry(domain="light") config_entry1.add_to_hass(hass) config_entry2 = MockConfigEntry(domain="light") config_entry2.add_to_hass(hass) device_registry.async_get_or_create( config_entry_id=config_entry1.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, ) entry1 = device_registry.async_get_or_create( config_entry_id=config_entry2.entry_id, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, ) assert len(entry1.config_entries) == 2 assert not entry1.disabled await hass.config_entries.async_set_disabled_by( config_entry1.entry_id, config_entries.ConfigEntryDisabler.USER ) await hass.async_block_till_done() entry1 = device_registry.async_get(entry1.id) assert not entry1.disabled await hass.config_entries.async_set_disabled_by( config_entry2.entry_id, config_entries.ConfigEntryDisabler.USER ) await hass.async_block_till_done() entry1 = device_registry.async_get(entry1.id) assert entry1.disabled assert entry1.disabled_by is dr.DeviceEntryDisabler.CONFIG_ENTRY await hass.config_entries.async_set_disabled_by(config_entry1.entry_id, None) await hass.async_block_till_done() entry1 = device_registry.async_get(entry1.id) assert not entry1.disabled @pytest.mark.parametrize( ("configuration_url", "expectation"), [ ("http://localhost", nullcontext()), ("http://localhost:8123", nullcontext()), ("https://example.com", nullcontext()), ("http://localhost/config", nullcontext()), ("http://localhost:8123/config", nullcontext()), ("https://example.com/config", nullcontext()), ("homeassistant://config", nullcontext()), (URL("http://localhost"), nullcontext()), (URL("http://localhost:8123"), nullcontext()), (URL("https://example.com"), nullcontext()), (URL("http://localhost/config"), nullcontext()), (URL("http://localhost:8123/config"), nullcontext()), (URL("https://example.com/config"), nullcontext()), (URL("homeassistant://config"), nullcontext()), (None, nullcontext()), ("http://", pytest.raises(ValueError)), ("https://", pytest.raises(ValueError)), ("gopher://localhost", pytest.raises(ValueError)), ("homeassistant://", pytest.raises(ValueError)), (URL("http://"), pytest.raises(ValueError)), (URL("https://"), pytest.raises(ValueError)), (URL("gopher://localhost"), pytest.raises(ValueError)), (URL("homeassistant://"), pytest.raises(ValueError)), # Exception implements __str__ (Exception("https://example.com"), nullcontext()), (Exception("https://"), pytest.raises(ValueError)), (Exception(), pytest.raises(ValueError)), ], ) async def test_device_info_configuration_url_validation( hass: HomeAssistant, device_registry: dr.DeviceRegistry, configuration_url: str | URL | None, expectation, ) -> None: """Test configuration URL of device info is properly validated.""" config_entry_1 = MockConfigEntry() config_entry_1.add_to_hass(hass) config_entry_2 = MockConfigEntry() config_entry_2.add_to_hass(hass) with expectation: device_registry.async_get_or_create( config_entry_id=config_entry_1.entry_id, identifiers={("something", "1234")}, name="name", configuration_url=configuration_url, ) update_device = device_registry.async_get_or_create( config_entry_id=config_entry_2.entry_id, identifiers={("something", "5678")}, name="name", ) with expectation: device_registry.async_update_device( update_device.id, configuration_url=configuration_url ) @pytest.mark.parametrize("load_registries", [False]) async def test_loading_invalid_configuration_url_from_storage( hass: HomeAssistant, hass_storage: dict[str, Any], mock_config_entry: MockConfigEntry, ) -> None: """Test loading stored devices with an invalid URL.""" hass_storage[dr.STORAGE_KEY] = { "version": dr.STORAGE_VERSION_MAJOR, "minor_version": dr.STORAGE_VERSION_MINOR, "data": { "devices": [ { "area_id": None, "config_entries": ["1234"], "configuration_url": "invalid", "connections": [], "disabled_by": None, "entry_type": dr.DeviceEntryType.SERVICE, "hw_version": None, "id": "abcdefghijklm", "identifiers": [["serial", "123456ABCDEF"]], "manufacturer": None, "model": None, "name_by_user": None, "name": None, "serial_number": None, "sw_version": None, "via_device_id": None, } ], "deleted_devices": [], }, } await dr.async_load(hass) registry = dr.async_get(hass) assert len(registry.devices) == 1 entry = registry.async_get_or_create( config_entry_id=mock_config_entry.entry_id, identifiers={("serial", "123456ABCDEF")}, ) assert entry.configuration_url == "invalid"