core/homeassistant/helpers/floor_registry.py

256 lines
7.5 KiB
Python

"""Provide a way to assign areas to floors in one's home."""
from __future__ import annotations
from collections.abc import Iterable
import dataclasses
from dataclasses import dataclass
from typing import Literal, TypedDict
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.util import slugify
from homeassistant.util.event_type import EventType
from homeassistant.util.hass_dict import HassKey
from .normalized_name_base_registry import (
NormalizedNameBaseRegistryEntry,
NormalizedNameBaseRegistryItems,
normalize_name,
)
from .registry import BaseRegistry
from .singleton import singleton
from .storage import Store
from .typing import UNDEFINED, UndefinedType
DATA_REGISTRY: HassKey[FloorRegistry] = HassKey("floor_registry")
EVENT_FLOOR_REGISTRY_UPDATED: EventType[EventFloorRegistryUpdatedData] = EventType(
"floor_registry_updated"
)
STORAGE_KEY = "core.floor_registry"
STORAGE_VERSION_MAJOR = 1
class _FloorStoreData(TypedDict):
"""Data type for individual floor. Used in FloorRegistryStoreData."""
aliases: list[str]
floor_id: str
icon: str | None
level: int | None
name: str
class FloorRegistryStoreData(TypedDict):
"""Store data type for FloorRegistry."""
floors: list[_FloorStoreData]
class EventFloorRegistryUpdatedData(TypedDict):
"""Event data for when the floor registry is updated."""
action: Literal["create", "remove", "update"]
floor_id: str
type EventFloorRegistryUpdated = Event[EventFloorRegistryUpdatedData]
@dataclass(slots=True, kw_only=True, frozen=True)
class FloorEntry(NormalizedNameBaseRegistryEntry):
"""Floor registry entry."""
aliases: set[str]
floor_id: str
icon: str | None = None
level: int | None = None
class FloorRegistry(BaseRegistry[FloorRegistryStoreData]):
"""Class to hold a registry of floors."""
floors: NormalizedNameBaseRegistryItems[FloorEntry]
_floor_data: dict[str, FloorEntry]
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the floor registry."""
self.hass = hass
self._store = Store(
hass,
STORAGE_VERSION_MAJOR,
STORAGE_KEY,
atomic_writes=True,
)
@callback
def async_get_floor(self, floor_id: str) -> FloorEntry | None:
"""Get floor by id.
We retrieve the FloorEntry from the underlying dict to avoid
the overhead of the UserDict __getitem__.
"""
return self._floor_data.get(floor_id)
@callback
def async_get_floor_by_name(self, name: str) -> FloorEntry | None:
"""Get floor by name."""
return self.floors.get_by_name(name)
@callback
def async_list_floors(self) -> Iterable[FloorEntry]:
"""Get all floors."""
return self.floors.values()
@callback
def _generate_id(self, name: str) -> str:
"""Generate floor ID."""
suggestion = suggestion_base = slugify(name)
tries = 1
while suggestion in self.floors:
tries += 1
suggestion = f"{suggestion_base}_{tries}"
return suggestion
@callback
def async_create(
self,
name: str,
*,
aliases: set[str] | None = None,
icon: str | None = None,
level: int | None = None,
) -> FloorEntry:
"""Create a new floor."""
self.hass.verify_event_loop_thread("floor_registry.async_create")
if floor := self.async_get_floor_by_name(name):
raise ValueError(
f"The name {name} ({floor.normalized_name}) is already in use"
)
normalized_name = normalize_name(name)
floor = FloorEntry(
aliases=aliases or set(),
icon=icon,
floor_id=self._generate_id(name),
name=name,
normalized_name=normalized_name,
level=level,
)
floor_id = floor.floor_id
self.floors[floor_id] = floor
self.async_schedule_save()
self.hass.bus.async_fire_internal(
EVENT_FLOOR_REGISTRY_UPDATED,
EventFloorRegistryUpdatedData(
action="create",
floor_id=floor_id,
),
)
return floor
@callback
def async_delete(self, floor_id: str) -> None:
"""Delete floor."""
self.hass.verify_event_loop_thread("floor_registry.async_delete")
del self.floors[floor_id]
self.hass.bus.async_fire_internal(
EVENT_FLOOR_REGISTRY_UPDATED,
EventFloorRegistryUpdatedData(
action="remove",
floor_id=floor_id,
),
)
self.async_schedule_save()
@callback
def async_update(
self,
floor_id: str,
*,
aliases: set[str] | UndefinedType = UNDEFINED,
icon: str | None | UndefinedType = UNDEFINED,
level: int | UndefinedType = UNDEFINED,
name: str | UndefinedType = UNDEFINED,
) -> FloorEntry:
"""Update name of the floor."""
old = self.floors[floor_id]
changes = {
attr_name: value
for attr_name, value in (
("aliases", aliases),
("icon", icon),
("level", level),
)
if value is not UNDEFINED and value != getattr(old, attr_name)
}
if name is not UNDEFINED and name != old.name:
changes["name"] = name
changes["normalized_name"] = normalize_name(name)
if not changes:
return old
self.hass.verify_event_loop_thread("floor_registry.async_update")
new = self.floors[floor_id] = dataclasses.replace(old, **changes) # type: ignore[arg-type]
self.async_schedule_save()
self.hass.bus.async_fire_internal(
EVENT_FLOOR_REGISTRY_UPDATED,
EventFloorRegistryUpdatedData(
action="update",
floor_id=floor_id,
),
)
return new
async def async_load(self) -> None:
"""Load the floor registry."""
data = await self._store.async_load()
floors = NormalizedNameBaseRegistryItems[FloorEntry]()
if data is not None:
for floor in data["floors"]:
normalized_name = normalize_name(floor["name"])
floors[floor["floor_id"]] = FloorEntry(
aliases=set(floor["aliases"]),
icon=floor["icon"],
floor_id=floor["floor_id"],
name=floor["name"],
level=floor["level"],
normalized_name=normalized_name,
)
self.floors = floors
self._floor_data = floors.data
@callback
def _data_to_save(self) -> FloorRegistryStoreData:
"""Return data of floor registry to store in a file."""
return {
"floors": [
{
"aliases": list(entry.aliases),
"floor_id": entry.floor_id,
"icon": entry.icon,
"level": entry.level,
"name": entry.name,
}
for entry in self.floors.values()
]
}
@callback
@singleton(DATA_REGISTRY)
def async_get(hass: HomeAssistant) -> FloorRegistry:
"""Get floor registry."""
return FloorRegistry(hass)
async def async_load(hass: HomeAssistant) -> None:
"""Load floor registry."""
assert DATA_REGISTRY not in hass.data
await async_get(hass).async_load()