Record state.last_reported (#114201)

* Record state.last_reported

* Include last_reported in parts of the history API

* Use a bulk update

* fix refactoring error

---------

Co-authored-by: J. Nick Koston <nick@koston.org>
pull/114300/head
Erik Montnemery 2024-03-27 12:48:06 +01:00 committed by GitHub
parent 24168dfba7
commit a3059fe504
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 225 additions and 23 deletions

View File

@ -15,7 +15,7 @@ import time
from typing import TYPE_CHECKING, Any, TypeVar, cast
import psutil_home_assistant as ha_psutil
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select, update
from sqlalchemy.engine import Engine
from sqlalchemy.engine.interfaces import DBAPIConnection
from sqlalchemy.exc import SQLAlchemyError
@ -1090,12 +1090,22 @@ class Recorder(threading.Thread):
entity_id = event.data["entity_id"]
dbstate = States.from_event(event)
old_state = event.data["old_state"]
assert self.event_session is not None
session = self.event_session
states_manager = self.states_manager
if old_state := states_manager.pop_pending(entity_id):
dbstate.old_state = old_state
if pending_state := states_manager.pop_pending(entity_id):
dbstate.old_state = pending_state
if old_state:
pending_state.last_reported_ts = old_state.last_reported_timestamp
elif old_state_id := states_manager.pop_committed(entity_id):
dbstate.old_state_id = old_state_id
if old_state:
states_manager.update_pending_last_reported(
old_state_id, old_state.last_reported_timestamp
)
if entity_removed:
dbstate.state = None
else:
@ -1109,8 +1119,6 @@ class Recorder(threading.Thread):
):
return
assert self.event_session is not None
session = self.event_session
# Map the entity_id to the StatesMeta table
if pending_states_meta := states_meta_manager.get_pending(entity_id):
dbstate.states_meta_rel = pending_states_meta
@ -1192,7 +1200,23 @@ class Recorder(threading.Thread):
session = self.event_session
self._commits_without_expire += 1
if (
pending_last_reported
:= self.states_manager.get_pending_last_reported_timestamp()
):
with session.no_autoflush:
session.execute(
update(States),
[
{
"state_id": state_id,
"last_reported_ts": last_reported_timestamp,
}
for state_id, last_reported_timestamp in pending_last_reported.items()
],
)
session.commit()
self._event_session_has_pending_writes = False
# We just committed the state attributes to the database
# and we now know the attributes_ids. We can save

View File

@ -68,7 +68,7 @@ class Base(DeclarativeBase):
"""Base class for tables."""
SCHEMA_VERSION = 42
SCHEMA_VERSION = 43
_LOGGER = logging.getLogger(__name__)
@ -428,6 +428,7 @@ class States(Base):
event_id: Mapped[int | None] = mapped_column(UNUSED_LEGACY_INTEGER_COLUMN)
last_changed: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN)
last_changed_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE)
last_reported_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE)
last_updated: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN)
last_updated_ts: Mapped[float | None] = mapped_column(
TIMESTAMP_TYPE, default=time.time, index=True
@ -499,6 +500,7 @@ class States(Base):
dbstate.state = ""
dbstate.last_updated_ts = event.time_fired_timestamp
dbstate.last_changed_ts = None
dbstate.last_reported_ts = None
return dbstate
dbstate.state = state.state
@ -507,6 +509,10 @@ class States(Base):
dbstate.last_changed_ts = None
else:
dbstate.last_changed_ts = state.last_changed_timestamp
if state.last_updated == state.last_reported:
dbstate.last_reported_ts = None
else:
dbstate.last_reported_ts = state.last_reported_timestamp
return dbstate
@ -523,13 +529,18 @@ class States(Base):
# When json_loads fails
_LOGGER.exception("Error converting row to state: %s", self)
return None
last_updated = dt_util.utc_from_timestamp(self.last_updated_ts or 0)
if self.last_changed_ts is None or self.last_changed_ts == self.last_updated_ts:
last_changed = last_updated = dt_util.utc_from_timestamp(
self.last_updated_ts or 0
)
last_changed = dt_util.utc_from_timestamp(self.last_updated_ts or 0)
else:
last_updated = dt_util.utc_from_timestamp(self.last_updated_ts or 0)
last_changed = dt_util.utc_from_timestamp(self.last_changed_ts or 0)
if (
self.last_reported_ts is None
or self.last_reported_ts == self.last_updated_ts
):
last_reported = dt_util.utc_from_timestamp(self.last_updated_ts or 0)
else:
last_reported = dt_util.utc_from_timestamp(self.last_reported_ts or 0)
return State(
self.entity_id or "",
self.state, # type: ignore[arg-type]
@ -537,7 +548,7 @@ class States(Base):
# for newer states
attrs,
last_changed=last_changed,
last_reported=last_updated, # Recorder does not yet record last_reported
last_reported=last_reported,
last_updated=last_updated,
context=context,
validate_entity_id=validate_entity_id,

View File

@ -52,32 +52,43 @@ _FIELD_MAP = {
def _stmt_and_join_attributes(
no_attributes: bool, include_last_changed: bool
no_attributes: bool,
include_last_changed: bool,
include_last_reported: bool,
) -> Select:
"""Return the statement and if StateAttributes should be joined."""
_select = select(States.metadata_id, States.state, States.last_updated_ts)
if include_last_changed:
_select = _select.add_columns(States.last_changed_ts)
if include_last_reported:
_select = _select.add_columns(States.last_reported_ts)
if not no_attributes:
_select = _select.add_columns(SHARED_ATTR_OR_LEGACY_ATTRIBUTES)
return _select
def _stmt_and_join_attributes_for_start_state(
no_attributes: bool, include_last_changed: bool
no_attributes: bool,
include_last_changed: bool,
include_last_reported: bool,
) -> Select:
"""Return the statement and if StateAttributes should be joined."""
_select = select(States.metadata_id, States.state)
_select = _select.add_columns(literal(value=0).label("last_updated_ts"))
if include_last_changed:
_select = _select.add_columns(literal(value=0).label("last_changed_ts"))
if include_last_reported:
_select = _select.add_columns(literal(value=0).label("last_reported_ts"))
if not no_attributes:
_select = _select.add_columns(SHARED_ATTR_OR_LEGACY_ATTRIBUTES)
return _select
def _select_from_subquery(
subquery: Subquery | CompoundSelect, no_attributes: bool, include_last_changed: bool
subquery: Subquery | CompoundSelect,
no_attributes: bool,
include_last_changed: bool,
include_last_reported: bool,
) -> Select:
"""Return the statement to select from the union."""
base_select = select(
@ -87,6 +98,8 @@ def _select_from_subquery(
)
if include_last_changed:
base_select = base_select.add_columns(subquery.c.last_changed_ts)
if include_last_reported:
base_select = base_select.add_columns(subquery.c.last_reported_ts)
if no_attributes:
return base_select
return base_select.add_columns(subquery.c.attributes)
@ -134,7 +147,7 @@ def _significant_states_stmt(
) -> Select | CompoundSelect:
"""Query the database for significant state changes."""
include_last_changed = not significant_changes_only
stmt = _stmt_and_join_attributes(no_attributes, include_last_changed)
stmt = _stmt_and_join_attributes(no_attributes, include_last_changed, False)
if significant_changes_only:
# Since we are filtering on entity_id (metadata_id) we can avoid
# the join of the states_meta table since we already know which
@ -174,13 +187,17 @@ def _significant_states_stmt(
).subquery(),
no_attributes,
include_last_changed,
False,
),
_select_from_subquery(
stmt.subquery(), no_attributes, include_last_changed, False
),
_select_from_subquery(stmt.subquery(), no_attributes, include_last_changed),
).subquery()
return _select_from_subquery(
unioned_subquery,
no_attributes,
include_last_changed,
False,
).order_by(unioned_subquery.c.metadata_id, unioned_subquery.c.last_updated_ts)
@ -312,7 +329,7 @@ def _state_changed_during_period_stmt(
run_start_ts: float | None,
) -> Select | CompoundSelect:
stmt = (
_stmt_and_join_attributes(no_attributes, False)
_stmt_and_join_attributes(no_attributes, False, True)
.filter(
(
(States.last_changed_ts == States.last_updated_ts)
@ -344,18 +361,22 @@ def _state_changed_during_period_stmt(
single_metadata_id,
no_attributes,
False,
True,
).subquery(),
no_attributes,
False,
True,
),
_select_from_subquery(
stmt.subquery(),
no_attributes,
False,
True,
),
).subquery(),
no_attributes,
False,
True,
)
@ -427,7 +448,7 @@ def state_changes_during_period(
def _get_last_state_changes_single_stmt(metadata_id: int) -> Select:
return (
_stmt_and_join_attributes(False, False)
_stmt_and_join_attributes(False, False, False)
.join(
(
lastest_state_for_metadata_id := (
@ -457,7 +478,7 @@ def _get_last_state_changes_multiple_stmt(
number_of_states: int, metadata_id: int
) -> Select:
return (
_stmt_and_join_attributes(False, False)
_stmt_and_join_attributes(False, False, True)
.where(
States.state_id
== (
@ -530,7 +551,9 @@ def _get_start_time_state_for_entities_stmt(
# We got an include-list of entities, accelerate the query by filtering already
# in the inner and the outer query.
stmt = (
_stmt_and_join_attributes_for_start_state(no_attributes, include_last_changed)
_stmt_and_join_attributes_for_start_state(
no_attributes, include_last_changed, False
)
.join(
(
most_recent_states_for_entities_by_date := (
@ -598,6 +621,7 @@ def _get_start_time_state_stmt(
single_metadata_id,
no_attributes,
include_last_changed,
False,
)
# We have more than one entity to look at so we need to do a query on states
# since the last recorder run started.
@ -615,11 +639,14 @@ def _get_single_entity_start_time_stmt(
metadata_id: int,
no_attributes: bool,
include_last_changed: bool,
include_last_reported: bool,
) -> Select:
# Use an entirely different (and extremely fast) query if we only
# have a single entity id
stmt = (
_stmt_and_join_attributes_for_start_state(no_attributes, include_last_changed)
_stmt_and_join_attributes_for_start_state(
no_attributes, include_last_changed, include_last_reported
)
.filter(
States.last_updated_ts < epoch_time,
States.metadata_id == metadata_id,

View File

@ -1081,6 +1081,12 @@ def _apply_update( # noqa: C901
_migrate_statistics_columns_to_timestamp_removing_duplicates(
hass, instance, session_maker, engine
)
elif new_version == 43:
_add_columns(
session_maker,
"states",
[f"last_reported_ts {_column_types.timestamp_type}"],
)
else:
raise ValueError(f"No schema migration defined for version {new_version}")

View File

@ -48,6 +48,7 @@ class LegacyLazyStatePreSchema31(State):
self.state = self._row.state or ""
self._attributes: dict[str, Any] | None = None
self._last_changed: datetime | None = start_time
self._last_reported: datetime | None = start_time
self._last_updated: datetime | None = start_time
self._context: Context | None = None
self.attr_cache = attr_cache
@ -93,6 +94,18 @@ class LegacyLazyStatePreSchema31(State):
"""Set last changed datetime."""
self._last_changed = value
@property
def last_reported(self) -> datetime:
"""Last reported datetime."""
if self._last_reported is None:
self._last_reported = self.last_updated
return self._last_reported
@last_reported.setter
def last_reported(self, value: datetime) -> None:
"""Set last reported datetime."""
self._last_reported = value
@property
def last_updated(self) -> datetime:
"""Last updated datetime."""
@ -196,6 +209,7 @@ class LegacyLazyState(State):
self._last_changed_ts: float | None = (
self._row.last_changed_ts or self._last_updated_ts
)
self._last_reported_ts: float | None = self._last_updated_ts
self._context: Context | None = None
self.attr_cache = attr_cache
@ -236,6 +250,17 @@ class LegacyLazyState(State):
"""Set last changed datetime."""
self._last_changed_ts = process_timestamp(value).timestamp()
@property
def last_reported(self) -> datetime:
"""Last reported datetime."""
assert self._last_reported_ts is not None
return dt_util.utc_from_timestamp(self._last_reported_ts)
@last_reported.setter
def last_reported(self, value: datetime) -> None:
"""Set last reported datetime."""
self._last_reported_ts = process_timestamp(value).timestamp()
@property
def last_updated(self) -> datetime:
"""Last updated datetime."""

View File

@ -81,6 +81,18 @@ class LazyState(State):
self._last_changed_ts or self._last_updated_ts
)
@cached_property
def _last_reported_ts(self) -> float | None:
"""Last reported timestamp."""
return getattr(self._row, "last_reported_ts", None)
@cached_property
def last_reported(self) -> datetime: # type: ignore[override]
"""Last reported datetime."""
return dt_util.utc_from_timestamp(
self._last_reported_ts or self._last_updated_ts
)
@cached_property
def last_updated(self) -> datetime: # type: ignore[override]
"""Last updated datetime."""

View File

@ -12,6 +12,7 @@ class StatesManager:
"""Initialize the states manager for linking old_state_id."""
self._pending: dict[str, States] = {}
self._last_committed_id: dict[str, int] = {}
self._last_reported: dict[int, float] = {}
def pop_pending(self, entity_id: str) -> States | None:
"""Pop a pending state.
@ -44,6 +45,16 @@ class StatesManager:
"""
self._pending[entity_id] = state
def update_pending_last_reported(
self, state_id: int, last_reported_timestamp: float
) -> None:
"""Update the last reported timestamp for a state."""
self._last_reported[state_id] = last_reported_timestamp
def get_pending_last_reported_timestamp(self) -> dict[int, float]:
"""Return the last reported timestamp for all pending states."""
return self._last_reported
def post_commit_pending(self) -> None:
"""Call after commit to load the state_id of the new States into committed.
@ -53,6 +64,7 @@ class StatesManager:
for entity_id, db_states in self._pending.items():
self._last_committed_id[entity_id] = db_states.state_id
self._pending.clear()
self._last_reported.clear()
def reset(self) -> None:
"""Reset after the database has been reset or changed.

View File

@ -45,6 +45,7 @@ async def test_validate_db_schema_fix_float_issue(
)
modification = [
"last_changed_ts DOUBLE PRECISION",
"last_reported_ts DOUBLE PRECISION",
"last_updated_ts DOUBLE PRECISION",
]
modify_columns_mock.assert_called_once_with(ANY, ANY, "states", modification)

View File

@ -193,6 +193,7 @@ def assert_states_equal_without_context(state: State, other: State) -> None:
"""Assert that two states are equal, ignoring context."""
assert_states_equal_without_context_and_last_changed(state, other)
assert state.last_changed == other.last_changed
assert state.last_reported == other.last_reported
def assert_states_equal_without_context_and_last_changed(

View File

@ -375,6 +375,9 @@ class States(Base): # type: ignore[misc,valid-type]
last_changed_ts = Column(
TIMESTAMP_TYPE
) # *** Not originally in v30, only added for recorder to startup ok
last_reported_ts = Column(
TIMESTAMP_TYPE
) # *** Not originally in v30, only added for recorder to startup ok
last_updated = Column(DATETIME_TYPE, default=dt_util.utcnow, index=True)
last_updated_ts = Column(
TIMESTAMP_TYPE, default=time.time, index=True

View File

@ -372,6 +372,9 @@ class States(Base): # type: ignore[misc,valid-type]
)
last_changed = Column(DATETIME_TYPE)
last_changed_ts = Column(TIMESTAMP_TYPE)
last_reported_ts = Column(
TIMESTAMP_TYPE
) # *** Not originally in v32, only added for recorder to startup ok
last_updated = Column(DATETIME_TYPE, default=dt_util.utcnow, index=True)
last_updated_ts = Column(TIMESTAMP_TYPE, default=time.time, index=True)
old_state_id = Column(Integer, ForeignKey("states.state_id"), index=True)

View File

@ -246,6 +246,41 @@ def test_state_changes_during_period(
assert_multiple_states_equal_without_context(states[:limit], hist[entity_id])
def test_state_changes_during_period_last_reported(
hass_recorder: Callable[..., HomeAssistant],
) -> None:
"""Test state change during period."""
hass = hass_recorder()
entity_id = "media_player.test"
def set_state(state):
"""Set the state."""
hass.states.set(entity_id, state)
wait_recording_done(hass)
return hass.states.get(entity_id)
start = dt_util.utcnow()
point1 = start + timedelta(seconds=1)
point2 = point1 + timedelta(seconds=1)
end = point2 + timedelta(seconds=1)
with freeze_time(start) as freezer:
set_state("idle")
freezer.move_to(point1)
set_state("YouTube")
freezer.move_to(point2)
states = [set_state("YouTube")]
freezer.move_to(end)
set_state("Netflix")
hist = history.state_changes_during_period(hass, start, end, entity_id)
assert_multiple_states_equal_without_context(states, hist[entity_id])
def test_state_changes_during_period_descending(
hass_recorder: Callable[..., HomeAssistant],
) -> None:
@ -380,6 +415,38 @@ def test_get_last_state_changes(hass_recorder: Callable[..., HomeAssistant]) ->
assert_multiple_states_equal_without_context(states, hist[entity_id])
def test_get_last_state_changes_last_reported(
hass_recorder: Callable[..., HomeAssistant],
) -> None:
"""Test number of state changes."""
hass = hass_recorder()
entity_id = "sensor.test"
def set_state(state):
"""Set the state."""
hass.states.set(entity_id, state)
wait_recording_done(hass)
return hass.states.get(entity_id)
start = dt_util.utcnow() - timedelta(minutes=2)
point = start + timedelta(minutes=1)
point2 = point + timedelta(minutes=1, seconds=1)
states = []
with freeze_time(start) as freezer:
set_state("1")
freezer.move_to(point)
states.append(set_state("1"))
freezer.move_to(point2)
states.append(set_state("2"))
hist = history.get_last_state_changes(hass, 2, entity_id)
assert_multiple_states_equal_without_context(states, hist[entity_id])
def test_get_last_state_change(hass_recorder: Callable[..., HomeAssistant]) -> None:
"""Test getting the last state change for an entity."""
hass = hass_recorder()
@ -577,6 +644,7 @@ def test_get_significant_states_without_initial(
)
)
del states["media_player.test2"]
del states["thermostat.test3"]
hist = history.get_significant_states(
hass,
@ -598,6 +666,7 @@ def test_get_significant_states_entity_id(
del states["media_player.test3"]
del states["thermostat.test"]
del states["thermostat.test2"]
del states["thermostat.test3"]
del states["script.can_cancel_this_one"]
hist = history.get_significant_states(hass, zero, four, ["media_player.test"])
@ -745,6 +814,7 @@ def record_states(hass) -> tuple[datetime, datetime, dict[str, list[State]]]:
mp3 = "media_player.test3"
therm = "thermostat.test"
therm2 = "thermostat.test2"
therm3 = "thermostat.test3"
zone = "zone.home"
script_c = "script.can_cancel_this_one"
@ -760,7 +830,7 @@ def record_states(hass) -> tuple[datetime, datetime, dict[str, list[State]]]:
three = two + timedelta(seconds=1)
four = three + timedelta(seconds=1)
states = {therm: [], therm2: [], mp: [], mp2: [], mp3: [], script_c: []}
states = {therm: [], therm2: [], therm3: [], mp: [], mp2: [], mp3: [], script_c: []}
with freeze_time(one) as freezer:
states[mp].append(
set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)})
@ -774,6 +844,8 @@ def record_states(hass) -> tuple[datetime, datetime, dict[str, list[State]]]:
states[therm].append(
set_state(therm, 20, attributes={"current_temperature": 19.5})
)
# This state will be updated
set_state(therm3, 20, attributes={"current_temperature": 19.5})
freezer.move_to(one + timedelta(microseconds=1))
states[mp].append(
@ -794,6 +866,8 @@ def record_states(hass) -> tuple[datetime, datetime, dict[str, list[State]]]:
states[therm2].append(
set_state(therm2, 20, attributes={"current_temperature": 19})
)
# This state will be updated
set_state(therm3, 20, attributes={"current_temperature": 19.5})
freezer.move_to(three)
states[mp].append(
@ -806,6 +880,9 @@ def record_states(hass) -> tuple[datetime, datetime, dict[str, list[State]]]:
states[therm].append(
set_state(therm, 21, attributes={"current_temperature": 20})
)
states[therm3].append(
set_state(therm3, 20, attributes={"current_temperature": 19.5})
)
return zero, four, states

View File

@ -925,7 +925,7 @@ def test_execute_stmt_lambda_element(
start_time_ts = dt_util.utcnow().timestamp()
stmt = lambda_stmt(
lambda: _get_single_entity_start_time_stmt(
start_time_ts, metadata_id, False, False
start_time_ts, metadata_id, False, False, False
)
)
rows = util.execute_stmt_lambda_element(session, stmt)