core/homeassistant/components/frontend/storage.py

172 lines
5.2 KiB
Python

"""API for persistent storage for the frontend."""
from __future__ import annotations
from collections.abc import Callable, Coroutine
from functools import wraps
from typing import Any
import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.components.websocket_api import ActiveConnection
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.storage import Store
from homeassistant.util.hass_dict import HassKey
DATA_STORAGE: HassKey[dict[str, UserStore]] = HassKey("frontend_storage")
STORAGE_VERSION_USER_DATA = 1
async def async_setup_frontend_storage(hass: HomeAssistant) -> None:
"""Set up frontend storage."""
websocket_api.async_register_command(hass, websocket_set_user_data)
websocket_api.async_register_command(hass, websocket_get_user_data)
websocket_api.async_register_command(hass, websocket_subscribe_user_data)
async def async_user_store(hass: HomeAssistant, user_id: str) -> UserStore:
"""Access a user store."""
stores = hass.data.setdefault(DATA_STORAGE, {})
if (store := stores.get(user_id)) is None:
store = stores[user_id] = UserStore(hass, user_id)
await store.async_load()
return store
class UserStore:
"""User store for frontend data."""
def __init__(self, hass: HomeAssistant, user_id: str) -> None:
"""Initialize the user store."""
self._store = _UserStore(hass, user_id)
self.data: dict[str, Any] = {}
self.subscriptions: dict[str | None, list[Callable[[], None]]] = {}
async def async_load(self) -> None:
"""Load the data from the store."""
self.data = await self._store.async_load() or {}
async def async_set_item(self, key: str, value: Any) -> None:
"""Set an item item and save the store."""
self.data[key] = value
await self._store.async_save(self.data)
for cb in self.subscriptions.get(None, []):
cb()
for cb in self.subscriptions.get(key, []):
cb()
@callback
def async_subscribe(
self, key: str | None, on_update_callback: Callable[[], None]
) -> Callable[[], None]:
"""Save the data to the store."""
self.subscriptions.setdefault(key, []).append(on_update_callback)
def unsubscribe() -> None:
"""Unsubscribe from the store."""
self.subscriptions[key].remove(on_update_callback)
return unsubscribe
class _UserStore(Store[dict[str, Any]]):
"""User store for frontend data."""
def __init__(self, hass: HomeAssistant, user_id: str) -> None:
"""Initialize the user store."""
super().__init__(
hass,
STORAGE_VERSION_USER_DATA,
f"frontend.user_data_{user_id}",
)
def with_user_store(
orig_func: Callable[
[HomeAssistant, ActiveConnection, dict[str, Any], UserStore],
Coroutine[Any, Any, None],
],
) -> Callable[
[HomeAssistant, ActiveConnection, dict[str, Any]], Coroutine[Any, Any, None]
]:
"""Decorate function to provide data."""
@wraps(orig_func)
async def with_user_store_func(
hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
) -> None:
"""Provide user specific data and store to function."""
user_id = connection.user.id
store = await async_user_store(hass, user_id)
await orig_func(hass, connection, msg, store)
return with_user_store_func
@websocket_api.websocket_command(
{
vol.Required("type"): "frontend/set_user_data",
vol.Required("key"): str,
vol.Required("value"): vol.Any(bool, str, int, float, dict, list, None),
}
)
@websocket_api.async_response
@with_user_store
async def websocket_set_user_data(
hass: HomeAssistant,
connection: ActiveConnection,
msg: dict[str, Any],
store: UserStore,
) -> None:
"""Handle set user data command."""
await store.async_set_item(msg["key"], msg["value"])
connection.send_result(msg["id"])
@websocket_api.websocket_command(
{vol.Required("type"): "frontend/get_user_data", vol.Optional("key"): str}
)
@websocket_api.async_response
@with_user_store
async def websocket_get_user_data(
hass: HomeAssistant,
connection: ActiveConnection,
msg: dict[str, Any],
store: UserStore,
) -> None:
"""Handle get user data command."""
data = store.data
connection.send_result(
msg["id"], {"value": data.get(msg["key"]) if "key" in msg else data}
)
@websocket_api.websocket_command(
{vol.Required("type"): "frontend/subscribe_user_data", vol.Optional("key"): str}
)
@websocket_api.async_response
@with_user_store
async def websocket_subscribe_user_data(
hass: HomeAssistant,
connection: ActiveConnection,
msg: dict[str, Any],
store: UserStore,
) -> None:
"""Handle subscribe to user data command."""
key: str | None = msg.get("key")
def on_data_update() -> None:
"""Handle user data update."""
data = store.data
connection.send_event(
msg["id"], {"value": data.get(key) if key is not None else data}
)
connection.subscriptions[msg["id"]] = store.async_subscribe(key, on_data_update)
on_data_update()
connection.send_result(msg["id"])