172 lines
5.2 KiB
Python
172 lines
5.2 KiB
Python
"""API for persistent storage for the frontend."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable, Coroutine
|
|
from functools import wraps
|
|
from typing import Any
|
|
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.components import websocket_api
|
|
from homeassistant.components.websocket_api import ActiveConnection
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.helpers.storage import Store
|
|
from homeassistant.util.hass_dict import HassKey
|
|
|
|
DATA_STORAGE: HassKey[dict[str, UserStore]] = HassKey("frontend_storage")
|
|
STORAGE_VERSION_USER_DATA = 1
|
|
|
|
|
|
async def async_setup_frontend_storage(hass: HomeAssistant) -> None:
|
|
"""Set up frontend storage."""
|
|
websocket_api.async_register_command(hass, websocket_set_user_data)
|
|
websocket_api.async_register_command(hass, websocket_get_user_data)
|
|
websocket_api.async_register_command(hass, websocket_subscribe_user_data)
|
|
|
|
|
|
async def async_user_store(hass: HomeAssistant, user_id: str) -> UserStore:
|
|
"""Access a user store."""
|
|
stores = hass.data.setdefault(DATA_STORAGE, {})
|
|
if (store := stores.get(user_id)) is None:
|
|
store = stores[user_id] = UserStore(hass, user_id)
|
|
await store.async_load()
|
|
|
|
return store
|
|
|
|
|
|
class UserStore:
|
|
"""User store for frontend data."""
|
|
|
|
def __init__(self, hass: HomeAssistant, user_id: str) -> None:
|
|
"""Initialize the user store."""
|
|
self._store = _UserStore(hass, user_id)
|
|
self.data: dict[str, Any] = {}
|
|
self.subscriptions: dict[str | None, list[Callable[[], None]]] = {}
|
|
|
|
async def async_load(self) -> None:
|
|
"""Load the data from the store."""
|
|
self.data = await self._store.async_load() or {}
|
|
|
|
async def async_set_item(self, key: str, value: Any) -> None:
|
|
"""Set an item item and save the store."""
|
|
self.data[key] = value
|
|
await self._store.async_save(self.data)
|
|
for cb in self.subscriptions.get(None, []):
|
|
cb()
|
|
for cb in self.subscriptions.get(key, []):
|
|
cb()
|
|
|
|
@callback
|
|
def async_subscribe(
|
|
self, key: str | None, on_update_callback: Callable[[], None]
|
|
) -> Callable[[], None]:
|
|
"""Save the data to the store."""
|
|
self.subscriptions.setdefault(key, []).append(on_update_callback)
|
|
|
|
def unsubscribe() -> None:
|
|
"""Unsubscribe from the store."""
|
|
self.subscriptions[key].remove(on_update_callback)
|
|
|
|
return unsubscribe
|
|
|
|
|
|
class _UserStore(Store[dict[str, Any]]):
|
|
"""User store for frontend data."""
|
|
|
|
def __init__(self, hass: HomeAssistant, user_id: str) -> None:
|
|
"""Initialize the user store."""
|
|
super().__init__(
|
|
hass,
|
|
STORAGE_VERSION_USER_DATA,
|
|
f"frontend.user_data_{user_id}",
|
|
)
|
|
|
|
|
|
def with_user_store(
|
|
orig_func: Callable[
|
|
[HomeAssistant, ActiveConnection, dict[str, Any], UserStore],
|
|
Coroutine[Any, Any, None],
|
|
],
|
|
) -> Callable[
|
|
[HomeAssistant, ActiveConnection, dict[str, Any]], Coroutine[Any, Any, None]
|
|
]:
|
|
"""Decorate function to provide data."""
|
|
|
|
@wraps(orig_func)
|
|
async def with_user_store_func(
|
|
hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
|
|
) -> None:
|
|
"""Provide user specific data and store to function."""
|
|
user_id = connection.user.id
|
|
|
|
store = await async_user_store(hass, user_id)
|
|
|
|
await orig_func(hass, connection, msg, store)
|
|
|
|
return with_user_store_func
|
|
|
|
|
|
@websocket_api.websocket_command(
|
|
{
|
|
vol.Required("type"): "frontend/set_user_data",
|
|
vol.Required("key"): str,
|
|
vol.Required("value"): vol.Any(bool, str, int, float, dict, list, None),
|
|
}
|
|
)
|
|
@websocket_api.async_response
|
|
@with_user_store
|
|
async def websocket_set_user_data(
|
|
hass: HomeAssistant,
|
|
connection: ActiveConnection,
|
|
msg: dict[str, Any],
|
|
store: UserStore,
|
|
) -> None:
|
|
"""Handle set user data command."""
|
|
await store.async_set_item(msg["key"], msg["value"])
|
|
connection.send_result(msg["id"])
|
|
|
|
|
|
@websocket_api.websocket_command(
|
|
{vol.Required("type"): "frontend/get_user_data", vol.Optional("key"): str}
|
|
)
|
|
@websocket_api.async_response
|
|
@with_user_store
|
|
async def websocket_get_user_data(
|
|
hass: HomeAssistant,
|
|
connection: ActiveConnection,
|
|
msg: dict[str, Any],
|
|
store: UserStore,
|
|
) -> None:
|
|
"""Handle get user data command."""
|
|
data = store.data
|
|
connection.send_result(
|
|
msg["id"], {"value": data.get(msg["key"]) if "key" in msg else data}
|
|
)
|
|
|
|
|
|
@websocket_api.websocket_command(
|
|
{vol.Required("type"): "frontend/subscribe_user_data", vol.Optional("key"): str}
|
|
)
|
|
@websocket_api.async_response
|
|
@with_user_store
|
|
async def websocket_subscribe_user_data(
|
|
hass: HomeAssistant,
|
|
connection: ActiveConnection,
|
|
msg: dict[str, Any],
|
|
store: UserStore,
|
|
) -> None:
|
|
"""Handle subscribe to user data command."""
|
|
key: str | None = msg.get("key")
|
|
|
|
def on_data_update() -> None:
|
|
"""Handle user data update."""
|
|
data = store.data
|
|
connection.send_event(
|
|
msg["id"], {"value": data.get(key) if key is not None else data}
|
|
)
|
|
|
|
connection.subscriptions[msg["id"]] = store.async_subscribe(key, on_data_update)
|
|
on_data_update()
|
|
connection.send_result(msg["id"])
|