"""Provide a registry to track entity IDs. The Entity Registry keeps a registry of entities. Entities are uniquely identified by their domain, platform and a unique id provided by that platform. The Entity Registry will persist itself 10 seconds after a new entity is registered. Registering a new entity while a timer is in progress resets the timer. """ from collections import OrderedDict import logging from typing import ( TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple, cast, ) import attr from homeassistant.const import ( ATTR_DEVICE_CLASS, ATTR_FRIENDLY_NAME, ATTR_ICON, ATTR_SUPPORTED_FEATURES, ATTR_UNIT_OF_MEASUREMENT, EVENT_HOMEASSISTANT_START, STATE_UNAVAILABLE, ) from homeassistant.core import Event, callback, split_entity_id, valid_entity_id from homeassistant.helpers.device_registry import EVENT_DEVICE_REGISTRY_UPDATED from homeassistant.util import slugify from homeassistant.util.yaml import load_yaml from .singleton import singleton from .typing import HomeAssistantType if TYPE_CHECKING: from homeassistant.config_entries import ConfigEntry # noqa: F401 # mypy: allow-untyped-defs, no-check-untyped-defs PATH_REGISTRY = "entity_registry.yaml" DATA_REGISTRY = "entity_registry" EVENT_ENTITY_REGISTRY_UPDATED = "entity_registry_updated" SAVE_DELAY = 10 _LOGGER = logging.getLogger(__name__) _UNDEF = object() DISABLED_CONFIG_ENTRY = "config_entry" DISABLED_HASS = "hass" DISABLED_USER = "user" DISABLED_INTEGRATION = "integration" ATTR_RESTORED = "restored" STORAGE_VERSION = 1 STORAGE_KEY = "core.entity_registry" # Attributes relevant to describing entity # to external services. ENTITY_DESCRIBING_ATTRIBUTES = { "entity_id", "name", "original_name", "capabilities", "supported_features", "device_class", "unit_of_measurement", } @attr.s(slots=True, frozen=True) class RegistryEntry: """Entity Registry Entry.""" entity_id: str = attr.ib() unique_id: str = attr.ib() platform: str = attr.ib() name: Optional[str] = attr.ib(default=None) icon: Optional[str] = attr.ib(default=None) device_id: Optional[str] = attr.ib(default=None) config_entry_id: Optional[str] = attr.ib(default=None) disabled_by: Optional[str] = attr.ib( default=None, validator=attr.validators.in_( ( DISABLED_HASS, DISABLED_USER, DISABLED_INTEGRATION, DISABLED_CONFIG_ENTRY, None, ) ), ) capabilities: Optional[Dict[str, Any]] = attr.ib(default=None) supported_features: int = attr.ib(default=0) device_class: Optional[str] = attr.ib(default=None) unit_of_measurement: Optional[str] = attr.ib(default=None) # As set by integration original_name: Optional[str] = attr.ib(default=None) original_icon: Optional[str] = attr.ib(default=None) domain: str = attr.ib(init=False, repr=False) @domain.default def _domain_default(self) -> str: """Compute domain value.""" return split_entity_id(self.entity_id)[0] @property def disabled(self) -> bool: """Return if entry is disabled.""" return self.disabled_by is not None class EntityRegistry: """Class to hold a registry of entities.""" def __init__(self, hass: HomeAssistantType): """Initialize the registry.""" self.hass = hass self.entities: Dict[str, RegistryEntry] self._index: Dict[Tuple[str, str, str], str] = {} self._store = hass.helpers.storage.Store(STORAGE_VERSION, STORAGE_KEY) self.hass.bus.async_listen( EVENT_DEVICE_REGISTRY_UPDATED, self.async_device_removed ) @callback def async_get_device_class_lookup(self, domain_device_classes: set) -> dict: """Return a lookup for the device class by domain.""" lookup: Dict[str, Dict[Tuple[Any, Any], str]] = {} for entity in self.entities.values(): if not entity.device_id: continue domain_device_class = (entity.domain, entity.device_class) if domain_device_class not in domain_device_classes: continue if entity.device_id not in lookup: lookup[entity.device_id] = {domain_device_class: entity.entity_id} else: lookup[entity.device_id][domain_device_class] = entity.entity_id return lookup @callback def async_is_registered(self, entity_id: str) -> bool: """Check if an entity_id is currently registered.""" return entity_id in self.entities @callback def async_get(self, entity_id: str) -> Optional[RegistryEntry]: """Get EntityEntry for an entity_id.""" return self.entities.get(entity_id) @callback def async_get_entity_id( self, domain: str, platform: str, unique_id: str ) -> Optional[str]: """Check if an entity_id is currently registered.""" return self._index.get((domain, platform, unique_id)) @callback def async_generate_entity_id( self, domain: str, suggested_object_id: str, known_object_ids: Optional[Iterable[str]] = None, ) -> str: """Generate an entity ID that does not conflict. Conflicts checked against registered and currently existing entities. """ preferred_string = f"{domain}.{slugify(suggested_object_id)}" test_string = preferred_string if not known_object_ids: known_object_ids = {} tries = 1 while ( test_string in self.entities or test_string in known_object_ids or self.hass.states.get(test_string) ): tries += 1 test_string = f"{preferred_string}_{tries}" return test_string @callback def async_get_or_create( self, domain: str, platform: str, unique_id: str, *, # To influence entity ID generation suggested_object_id: Optional[str] = None, known_object_ids: Optional[Iterable[str]] = None, # To disable an entity if it gets created disabled_by: Optional[str] = None, # Data that we want entry to have config_entry: Optional["ConfigEntry"] = None, device_id: Optional[str] = None, capabilities: Optional[Dict[str, Any]] = None, supported_features: Optional[int] = None, device_class: Optional[str] = None, unit_of_measurement: Optional[str] = None, original_name: Optional[str] = None, original_icon: Optional[str] = None, ) -> RegistryEntry: """Get entity. Create if it doesn't exist.""" config_entry_id = None if config_entry: config_entry_id = config_entry.entry_id entity_id = self.async_get_entity_id(domain, platform, unique_id) if entity_id: return self._async_update_entity( # type: ignore entity_id, config_entry_id=config_entry_id or _UNDEF, device_id=device_id or _UNDEF, capabilities=capabilities or _UNDEF, supported_features=supported_features or _UNDEF, device_class=device_class or _UNDEF, unit_of_measurement=unit_of_measurement or _UNDEF, original_name=original_name or _UNDEF, original_icon=original_icon or _UNDEF, # When we changed our slugify algorithm, we invalidated some # stored entity IDs with either a __ or ending in _. # Fix introduced in 0.86 (Jan 23, 2019). Next line can be # removed when we release 1.0 or in 2020. new_entity_id=".".join( slugify(part) for part in entity_id.split(".", 1) ), ) entity_id = self.async_generate_entity_id( domain, suggested_object_id or f"{platform}_{unique_id}", known_object_ids ) if ( disabled_by is None and config_entry and config_entry.system_options.disable_new_entities ): disabled_by = DISABLED_INTEGRATION entity = RegistryEntry( entity_id=entity_id, config_entry_id=config_entry_id, device_id=device_id, unique_id=unique_id, platform=platform, disabled_by=disabled_by, capabilities=capabilities, supported_features=supported_features or 0, device_class=device_class, unit_of_measurement=unit_of_measurement, original_name=original_name, original_icon=original_icon, ) self._register_entry(entity) _LOGGER.info("Registered new %s.%s entity: %s", domain, platform, entity_id) self.async_schedule_save() self.hass.bus.async_fire( EVENT_ENTITY_REGISTRY_UPDATED, {"action": "create", "entity_id": entity_id} ) return entity @callback def async_remove(self, entity_id: str) -> None: """Remove an entity from registry.""" self._unregister_entry(self.entities[entity_id]) self.hass.bus.async_fire( EVENT_ENTITY_REGISTRY_UPDATED, {"action": "remove", "entity_id": entity_id} ) self.async_schedule_save() @callback def async_device_removed(self, event: Event) -> None: """Handle the removal of a device. Remove entities from the registry that are associated to a device when the device is removed. """ if event.data["action"] != "remove": return entities = async_entries_for_device(self, event.data["device_id"]) for entity in entities: self.async_remove(entity.entity_id) @callback def async_update_entity( self, entity_id, *, name=_UNDEF, icon=_UNDEF, new_entity_id=_UNDEF, new_unique_id=_UNDEF, disabled_by=_UNDEF, ): """Update properties of an entity.""" return cast( # cast until we have _async_update_entity type hinted RegistryEntry, self._async_update_entity( entity_id, name=name, icon=icon, new_entity_id=new_entity_id, new_unique_id=new_unique_id, disabled_by=disabled_by, ), ) @callback def _async_update_entity( self, entity_id, *, name=_UNDEF, icon=_UNDEF, config_entry_id=_UNDEF, new_entity_id=_UNDEF, device_id=_UNDEF, new_unique_id=_UNDEF, disabled_by=_UNDEF, capabilities=_UNDEF, supported_features=_UNDEF, device_class=_UNDEF, unit_of_measurement=_UNDEF, original_name=_UNDEF, original_icon=_UNDEF, ): """Private facing update properties method.""" old = self.entities[entity_id] changes = {} for attr_name, value in ( ("name", name), ("icon", icon), ("config_entry_id", config_entry_id), ("device_id", device_id), ("disabled_by", disabled_by), ("capabilities", capabilities), ("supported_features", supported_features), ("device_class", device_class), ("unit_of_measurement", unit_of_measurement), ("original_name", original_name), ("original_icon", original_icon), ): if value is not _UNDEF and value != getattr(old, attr_name): changes[attr_name] = value if new_entity_id is not _UNDEF and new_entity_id != old.entity_id: if self.async_is_registered(new_entity_id): raise ValueError("Entity is already registered") if not valid_entity_id(new_entity_id): raise ValueError("Invalid entity ID") if split_entity_id(new_entity_id)[0] != split_entity_id(entity_id)[0]: raise ValueError("New entity ID should be same domain") self.entities.pop(entity_id) entity_id = changes["entity_id"] = new_entity_id if new_unique_id is not _UNDEF: conflict_entity_id = self.async_get_entity_id( old.domain, old.platform, new_unique_id ) if conflict_entity_id: raise ValueError( f"Unique id '{new_unique_id}' is already in use by " f"'{conflict_entity_id}'" ) changes["unique_id"] = new_unique_id if not changes: return old self._remove_index(old) new = attr.evolve(old, **changes) self._register_entry(new) self.async_schedule_save() data = {"action": "update", "entity_id": entity_id, "changes": list(changes)} if old.entity_id != entity_id: data["old_entity_id"] = old.entity_id self.hass.bus.async_fire(EVENT_ENTITY_REGISTRY_UPDATED, data) return new async def async_load(self) -> None: """Load the entity registry.""" async_setup_entity_restore(self.hass, self) data = await self.hass.helpers.storage.async_migrator( self.hass.config.path(PATH_REGISTRY), self._store, old_conf_load_func=load_yaml, old_conf_migrate_func=_async_migrate, ) entities: Dict[str, RegistryEntry] = OrderedDict() if data is not None: for entity in data["entities"]: # Some old installations can have some bad entities. # Filter them out as they cause errors down the line. # Can be removed in Jan 2021 if not valid_entity_id(entity["entity_id"]): continue entities[entity["entity_id"]] = RegistryEntry( entity_id=entity["entity_id"], config_entry_id=entity.get("config_entry_id"), device_id=entity.get("device_id"), unique_id=entity["unique_id"], platform=entity["platform"], name=entity.get("name"), icon=entity.get("icon"), disabled_by=entity.get("disabled_by"), capabilities=entity.get("capabilities") or {}, supported_features=entity.get("supported_features", 0), device_class=entity.get("device_class"), unit_of_measurement=entity.get("unit_of_measurement"), original_name=entity.get("original_name"), original_icon=entity.get("original_icon"), ) self.entities = entities self._rebuild_index() @callback def async_schedule_save(self) -> None: """Schedule saving the entity registry.""" self._store.async_delay_save(self._data_to_save, SAVE_DELAY) @callback def _data_to_save(self) -> Dict[str, Any]: """Return data of entity registry to store in a file.""" data = {} data["entities"] = [ { "entity_id": entry.entity_id, "config_entry_id": entry.config_entry_id, "device_id": entry.device_id, "unique_id": entry.unique_id, "platform": entry.platform, "name": entry.name, "icon": entry.icon, "disabled_by": entry.disabled_by, "capabilities": entry.capabilities, "supported_features": entry.supported_features, "device_class": entry.device_class, "unit_of_measurement": entry.unit_of_measurement, "original_name": entry.original_name, "original_icon": entry.original_icon, } for entry in self.entities.values() ] return data @callback def async_clear_config_entry(self, config_entry: str) -> None: """Clear config entry from registry entries.""" for entity_id in [ entity_id for entity_id, entry in self.entities.items() if config_entry == entry.config_entry_id ]: self.async_remove(entity_id) def _register_entry(self, entry: RegistryEntry) -> None: self.entities[entry.entity_id] = entry self._add_index(entry) def _add_index(self, entry: RegistryEntry) -> None: self._index[(entry.domain, entry.platform, entry.unique_id)] = entry.entity_id def _unregister_entry(self, entry: RegistryEntry) -> None: self._remove_index(entry) del self.entities[entry.entity_id] def _remove_index(self, entry: RegistryEntry) -> None: del self._index[(entry.domain, entry.platform, entry.unique_id)] def _rebuild_index(self) -> None: self._index = {} for entry in self.entities.values(): self._add_index(entry) @singleton(DATA_REGISTRY) async def async_get_registry(hass: HomeAssistantType) -> EntityRegistry: """Create entity registry.""" reg = EntityRegistry(hass) await reg.async_load() return reg @callback def async_entries_for_device( registry: EntityRegistry, device_id: str ) -> List[RegistryEntry]: """Return entries that match a device.""" return [ entry for entry in registry.entities.values() if entry.device_id == device_id ] @callback def async_entries_for_config_entry( registry: EntityRegistry, config_entry_id: str ) -> List[RegistryEntry]: """Return entries that match a config entry.""" return [ entry for entry in registry.entities.values() if entry.config_entry_id == config_entry_id ] async def _async_migrate(entities: Dict[str, Any]) -> Dict[str, List[Dict[str, Any]]]: """Migrate the YAML config file to storage helper format.""" return { "entities": [ {"entity_id": entity_id, **info} for entity_id, info in entities.items() ] } @callback def async_setup_entity_restore( hass: HomeAssistantType, registry: EntityRegistry ) -> None: """Set up the entity restore mechanism.""" @callback def cleanup_restored_states(event: Event) -> None: """Clean up restored states.""" if event.data["action"] != "remove": return state = hass.states.get(event.data["entity_id"]) if state is None or not state.attributes.get(ATTR_RESTORED): return hass.states.async_remove(event.data["entity_id"], context=event.context) hass.bus.async_listen(EVENT_ENTITY_REGISTRY_UPDATED, cleanup_restored_states) if hass.is_running: return @callback def _write_unavailable_states(_: Event) -> None: """Make sure state machine contains entry for each registered entity.""" states = hass.states existing = set(states.async_entity_ids()) for entry in registry.entities.values(): if entry.entity_id in existing or entry.disabled: continue attrs: Dict[str, Any] = {ATTR_RESTORED: True} if entry.capabilities is not None: attrs.update(entry.capabilities) if entry.supported_features is not None: attrs[ATTR_SUPPORTED_FEATURES] = entry.supported_features if entry.device_class is not None: attrs[ATTR_DEVICE_CLASS] = entry.device_class if entry.unit_of_measurement is not None: attrs[ATTR_UNIT_OF_MEASUREMENT] = entry.unit_of_measurement name = entry.name or entry.original_name if name is not None: attrs[ATTR_FRIENDLY_NAME] = name icon = entry.icon or entry.original_icon if icon is not None: attrs[ATTR_ICON] = icon states.async_set(entry.entity_id, STATE_UNAVAILABLE, attrs) hass.bus.async_listen(EVENT_HOMEASSISTANT_START, _write_unavailable_states) async def async_migrate_entries( hass: HomeAssistantType, config_entry_id: str, entry_callback: Callable[[RegistryEntry], Optional[dict]], ) -> None: """Migrator of unique IDs.""" ent_reg = await async_get_registry(hass) for entry in ent_reg.entities.values(): if entry.config_entry_id != config_entry_id: continue updates = entry_callback(entry) if updates is not None: ent_reg.async_update_entity(entry.entity_id, **updates)