Only build compressed states once (#85561)
parent
818253ced4
commit
57239769ba
homeassistant
components
recorder
websocket_api
tests
|
@ -17,10 +17,7 @@ from sqlalchemy.sql.expression import literal
|
|||
from sqlalchemy.sql.lambdas import StatementLambdaElement
|
||||
from sqlalchemy.sql.selectable import Subquery
|
||||
|
||||
from homeassistant.components.websocket_api import (
|
||||
COMPRESSED_STATE_LAST_UPDATED,
|
||||
COMPRESSED_STATE_STATE,
|
||||
)
|
||||
from homeassistant.const import COMPRESSED_STATE_LAST_UPDATED, COMPRESSED_STATE_STATE
|
||||
from homeassistant.core import HomeAssistant, State, split_entity_id
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ from typing import Any, Literal, TypedDict, overload
|
|||
|
||||
from sqlalchemy.engine.row import Row
|
||||
|
||||
from homeassistant.components.websocket_api import (
|
||||
from homeassistant.const import (
|
||||
COMPRESSED_STATE_ATTRIBUTES,
|
||||
COMPRESSED_STATE_LAST_CHANGED,
|
||||
COMPRESSED_STATE_LAST_UPDATED,
|
||||
|
|
|
@ -12,10 +12,6 @@ from homeassistant.loader import bind_hass
|
|||
from . import commands, connection, const, decorators, http, messages # noqa: F401
|
||||
from .connection import ActiveConnection, current_connection # noqa: F401
|
||||
from .const import ( # noqa: F401
|
||||
COMPRESSED_STATE_ATTRIBUTES,
|
||||
COMPRESSED_STATE_LAST_CHANGED,
|
||||
COMPRESSED_STATE_LAST_UPDATED,
|
||||
COMPRESSED_STATE_STATE,
|
||||
ERR_HOME_ASSISTANT_ERROR,
|
||||
ERR_INVALID_FORMAT,
|
||||
ERR_NOT_FOUND,
|
||||
|
|
|
@ -311,7 +311,7 @@ def handle_subscribe_entities(
|
|||
connection.send_result(msg["id"])
|
||||
data: dict[str, dict[str, dict]] = {
|
||||
messages.ENTITY_EVENT_ADD: {
|
||||
state.entity_id: messages.compressed_state_dict_add(state)
|
||||
state.entity_id: state.as_compressed_state()
|
||||
for state in states
|
||||
if not entity_ids or state.entity_id in entity_ids
|
||||
}
|
||||
|
|
|
@ -50,10 +50,4 @@ SIGNAL_WEBSOCKET_DISCONNECTED: Final = "websocket_disconnected"
|
|||
# Data used to store the current connection list
|
||||
DATA_CONNECTIONS: Final = f"{DOMAIN}.connections"
|
||||
|
||||
COMPRESSED_STATE_STATE = "s"
|
||||
COMPRESSED_STATE_ATTRIBUTES = "a"
|
||||
COMPRESSED_STATE_CONTEXT = "c"
|
||||
COMPRESSED_STATE_LAST_CHANGED = "lc"
|
||||
COMPRESSED_STATE_LAST_UPDATED = "lu"
|
||||
|
||||
FEATURE_COALESCE_MESSAGES = "coalesce_messages"
|
||||
|
|
|
@ -7,6 +7,13 @@ from typing import Any, Final
|
|||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.const import (
|
||||
COMPRESSED_STATE_ATTRIBUTES,
|
||||
COMPRESSED_STATE_CONTEXT,
|
||||
COMPRESSED_STATE_LAST_CHANGED,
|
||||
COMPRESSED_STATE_LAST_UPDATED,
|
||||
COMPRESSED_STATE_STATE,
|
||||
)
|
||||
from homeassistant.core import Event, State
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
from homeassistant.helpers.json import JSON_DUMP
|
||||
|
@ -17,13 +24,6 @@ from homeassistant.util.json import (
|
|||
from homeassistant.util.yaml.loader import JSON_TYPE
|
||||
|
||||
from . import const
|
||||
from .const import (
|
||||
COMPRESSED_STATE_ATTRIBUTES,
|
||||
COMPRESSED_STATE_CONTEXT,
|
||||
COMPRESSED_STATE_LAST_CHANGED,
|
||||
COMPRESSED_STATE_LAST_UPDATED,
|
||||
COMPRESSED_STATE_STATE,
|
||||
)
|
||||
|
||||
_LOGGER: Final = logging.getLogger(__name__)
|
||||
|
||||
|
@ -128,13 +128,14 @@ def _state_diff_event(event: Event) -> dict:
|
|||
if (event_old_state := event.data["old_state"]) is None:
|
||||
return {
|
||||
ENTITY_EVENT_ADD: {
|
||||
event_new_state.entity_id: compressed_state_dict_add(event_new_state)
|
||||
event_new_state.entity_id: event_new_state.as_compressed_state()
|
||||
}
|
||||
}
|
||||
assert isinstance(event_old_state, State)
|
||||
return _state_diff(event_old_state, event_new_state)
|
||||
|
||||
|
||||
@lru_cache(maxsize=128)
|
||||
def _state_diff(
|
||||
old_state: State, new_state: State
|
||||
) -> dict[str, dict[str, dict[str, dict[str, str | list[str]]]]]:
|
||||
|
@ -169,28 +170,6 @@ def _state_diff(
|
|||
return {ENTITY_EVENT_CHANGE: {new_state.entity_id: diff}}
|
||||
|
||||
|
||||
def compressed_state_dict_add(state: State) -> dict[str, Any]:
|
||||
"""Build a compressed dict of a state for adds.
|
||||
|
||||
Omits the lu (last_updated) if it matches (lc) last_changed.
|
||||
|
||||
Sends c (context) as a string if it only contains an id.
|
||||
"""
|
||||
if state.context.parent_id is None and state.context.user_id is None:
|
||||
context: dict[str, Any] | str = state.context.id
|
||||
else:
|
||||
context = state.context.as_dict()
|
||||
compressed_state: dict[str, Any] = {
|
||||
COMPRESSED_STATE_STATE: state.state,
|
||||
COMPRESSED_STATE_ATTRIBUTES: state.attributes,
|
||||
COMPRESSED_STATE_CONTEXT: context,
|
||||
COMPRESSED_STATE_LAST_CHANGED: state.last_changed.timestamp(),
|
||||
}
|
||||
if state.last_changed != state.last_updated:
|
||||
compressed_state[COMPRESSED_STATE_LAST_UPDATED] = state.last_updated.timestamp()
|
||||
return compressed_state
|
||||
|
||||
|
||||
def message_to_json(message: dict[str, Any]) -> str:
|
||||
"""Serialize a websocket message to json."""
|
||||
try:
|
||||
|
|
|
@ -1031,6 +1031,13 @@ DATA_RATE_GIBIBYTES_PER_SECOND: Final = "GiB/s"
|
|||
"""Deprecated: please use UnitOfDataRate.GIBIBYTES_PER_SECOND"""
|
||||
|
||||
|
||||
# States
|
||||
COMPRESSED_STATE_STATE = "s"
|
||||
COMPRESSED_STATE_ATTRIBUTES = "a"
|
||||
COMPRESSED_STATE_CONTEXT = "c"
|
||||
COMPRESSED_STATE_LAST_CHANGED = "lc"
|
||||
COMPRESSED_STATE_LAST_UPDATED = "lu"
|
||||
|
||||
# #### SERVICES ####
|
||||
SERVICE_HOMEASSISTANT_STOP: Final = "stop"
|
||||
SERVICE_HOMEASSISTANT_RESTART: Final = "restart"
|
||||
|
|
|
@ -50,6 +50,11 @@ from .const import (
|
|||
ATTR_FRIENDLY_NAME,
|
||||
ATTR_SERVICE,
|
||||
ATTR_SERVICE_DATA,
|
||||
COMPRESSED_STATE_ATTRIBUTES,
|
||||
COMPRESSED_STATE_CONTEXT,
|
||||
COMPRESSED_STATE_LAST_CHANGED,
|
||||
COMPRESSED_STATE_LAST_UPDATED,
|
||||
COMPRESSED_STATE_STATE,
|
||||
EVENT_CALL_SERVICE,
|
||||
EVENT_CORE_CONFIG_UPDATE,
|
||||
EVENT_HOMEASSISTANT_CLOSE,
|
||||
|
@ -1115,6 +1120,7 @@ class State:
|
|||
"domain",
|
||||
"object_id",
|
||||
"_as_dict",
|
||||
"_as_compressed_state",
|
||||
]
|
||||
|
||||
def __init__(
|
||||
|
@ -1150,6 +1156,7 @@ class State:
|
|||
self.context = context or Context()
|
||||
self.domain, self.object_id = split_entity_id(self.entity_id)
|
||||
self._as_dict: ReadOnlyDict[str, Collection[Any]] | None = None
|
||||
self._as_compressed_state: dict[str, Any] | None = None
|
||||
|
||||
def __hash__(self) -> int:
|
||||
"""Make the state hashable.
|
||||
|
@ -1191,6 +1198,33 @@ class State:
|
|||
)
|
||||
return self._as_dict
|
||||
|
||||
def as_compressed_state(self) -> dict[str, Any]:
|
||||
"""Build a compressed dict of a state for adds.
|
||||
|
||||
Omits the lu (last_updated) if it matches (lc) last_changed.
|
||||
|
||||
Sends c (context) as a string if it only contains an id.
|
||||
"""
|
||||
if self._as_compressed_state:
|
||||
return self._as_compressed_state
|
||||
state_context = self.context
|
||||
if state_context.parent_id is None and state_context.user_id is None:
|
||||
context: dict[str, Any] | str = state_context.id
|
||||
else:
|
||||
context = state_context.as_dict()
|
||||
compressed_state = {
|
||||
COMPRESSED_STATE_STATE: self.state,
|
||||
COMPRESSED_STATE_ATTRIBUTES: self.attributes,
|
||||
COMPRESSED_STATE_CONTEXT: context,
|
||||
COMPRESSED_STATE_LAST_CHANGED: dt_util.utc_to_timestamp(self.last_changed),
|
||||
}
|
||||
if self.last_changed != self.last_updated:
|
||||
compressed_state[COMPRESSED_STATE_LAST_UPDATED] = dt_util.utc_to_timestamp(
|
||||
self.last_updated
|
||||
)
|
||||
self._as_compressed_state = compressed_state
|
||||
return compressed_state
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls: type[_StateT], json_dict: dict[str, Any]) -> _StateT | None:
|
||||
"""Initialize a state from a dict.
|
||||
|
|
|
@ -400,6 +400,58 @@ def test_state_as_dict():
|
|||
assert state.as_dict() is as_dict_1
|
||||
|
||||
|
||||
def test_state_as_compressed_state():
|
||||
"""Test a State as compressed state."""
|
||||
last_time = datetime(1984, 12, 8, 12, 0, 0, tzinfo=dt_util.UTC)
|
||||
state = ha.State(
|
||||
"happy.happy",
|
||||
"on",
|
||||
{"pig": "dog"},
|
||||
last_updated=last_time,
|
||||
last_changed=last_time,
|
||||
)
|
||||
expected = {
|
||||
"a": {"pig": "dog"},
|
||||
"c": state.context.id,
|
||||
"lc": last_time.timestamp(),
|
||||
"s": "on",
|
||||
}
|
||||
as_compressed_state = state.as_compressed_state()
|
||||
# We are not too concerned about these being ReadOnlyDict
|
||||
# since we don't expect them to be called by external callers
|
||||
assert as_compressed_state == expected
|
||||
# 2nd time to verify cache
|
||||
assert state.as_compressed_state() == expected
|
||||
assert state.as_compressed_state() is as_compressed_state
|
||||
|
||||
|
||||
def test_state_as_compressed_state_unique_last_updated():
|
||||
"""Test a State as compressed state where last_changed is not last_updated."""
|
||||
last_changed = datetime(1984, 12, 8, 11, 0, 0, tzinfo=dt_util.UTC)
|
||||
last_updated = datetime(1984, 12, 8, 12, 0, 0, tzinfo=dt_util.UTC)
|
||||
state = ha.State(
|
||||
"happy.happy",
|
||||
"on",
|
||||
{"pig": "dog"},
|
||||
last_updated=last_updated,
|
||||
last_changed=last_changed,
|
||||
)
|
||||
expected = {
|
||||
"a": {"pig": "dog"},
|
||||
"c": state.context.id,
|
||||
"lc": last_changed.timestamp(),
|
||||
"lu": last_updated.timestamp(),
|
||||
"s": "on",
|
||||
}
|
||||
as_compressed_state = state.as_compressed_state()
|
||||
# We are not too concerned about these being ReadOnlyDict
|
||||
# since we don't expect them to be called by external callers
|
||||
assert as_compressed_state == expected
|
||||
# 2nd time to verify cache
|
||||
assert state.as_compressed_state() == expected
|
||||
assert state.as_compressed_state() is as_compressed_state
|
||||
|
||||
|
||||
async def test_eventbus_add_remove_listener(hass):
|
||||
"""Test remove_listener method."""
|
||||
old_count = len(hass.bus.async_listeners())
|
||||
|
|
Loading…
Reference in New Issue