diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 21d42405b75..7de9cf46311 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -15,7 +15,7 @@ import time from typing import TYPE_CHECKING, Any, TypeVar, cast import psutil_home_assistant as ha_psutil -from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select +from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select, update from sqlalchemy.engine import Engine from sqlalchemy.engine.interfaces import DBAPIConnection from sqlalchemy.exc import SQLAlchemyError @@ -1090,12 +1090,22 @@ class Recorder(threading.Thread): entity_id = event.data["entity_id"] dbstate = States.from_event(event) + old_state = event.data["old_state"] + + assert self.event_session is not None + session = self.event_session states_manager = self.states_manager - if old_state := states_manager.pop_pending(entity_id): - dbstate.old_state = old_state + if pending_state := states_manager.pop_pending(entity_id): + dbstate.old_state = pending_state + if old_state: + pending_state.last_reported_ts = old_state.last_reported_timestamp elif old_state_id := states_manager.pop_committed(entity_id): dbstate.old_state_id = old_state_id + if old_state: + states_manager.update_pending_last_reported( + old_state_id, old_state.last_reported_timestamp + ) if entity_removed: dbstate.state = None else: @@ -1109,8 +1119,6 @@ class Recorder(threading.Thread): ): return - assert self.event_session is not None - session = self.event_session # Map the entity_id to the StatesMeta table if pending_states_meta := states_meta_manager.get_pending(entity_id): dbstate.states_meta_rel = pending_states_meta @@ -1192,7 +1200,23 @@ class Recorder(threading.Thread): session = self.event_session self._commits_without_expire += 1 + if ( + pending_last_reported + := self.states_manager.get_pending_last_reported_timestamp() + ): + with session.no_autoflush: + session.execute( + update(States), + [ + { + "state_id": state_id, + "last_reported_ts": last_reported_timestamp, + } + for state_id, last_reported_timestamp in pending_last_reported.items() + ], + ) session.commit() + self._event_session_has_pending_writes = False # We just committed the state attributes to the database # and we now know the attributes_ids. We can save diff --git a/homeassistant/components/recorder/db_schema.py b/homeassistant/components/recorder/db_schema.py index 61e39e40034..5b24448211d 100644 --- a/homeassistant/components/recorder/db_schema.py +++ b/homeassistant/components/recorder/db_schema.py @@ -68,7 +68,7 @@ class Base(DeclarativeBase): """Base class for tables.""" -SCHEMA_VERSION = 42 +SCHEMA_VERSION = 43 _LOGGER = logging.getLogger(__name__) @@ -428,6 +428,7 @@ class States(Base): event_id: Mapped[int | None] = mapped_column(UNUSED_LEGACY_INTEGER_COLUMN) last_changed: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN) last_changed_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE) + last_reported_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE) last_updated: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN) last_updated_ts: Mapped[float | None] = mapped_column( TIMESTAMP_TYPE, default=time.time, index=True @@ -499,6 +500,7 @@ class States(Base): dbstate.state = "" dbstate.last_updated_ts = event.time_fired_timestamp dbstate.last_changed_ts = None + dbstate.last_reported_ts = None return dbstate dbstate.state = state.state @@ -507,6 +509,10 @@ class States(Base): dbstate.last_changed_ts = None else: dbstate.last_changed_ts = state.last_changed_timestamp + if state.last_updated == state.last_reported: + dbstate.last_reported_ts = None + else: + dbstate.last_reported_ts = state.last_reported_timestamp return dbstate @@ -523,13 +529,18 @@ class States(Base): # When json_loads fails _LOGGER.exception("Error converting row to state: %s", self) return None + last_updated = dt_util.utc_from_timestamp(self.last_updated_ts or 0) if self.last_changed_ts is None or self.last_changed_ts == self.last_updated_ts: - last_changed = last_updated = dt_util.utc_from_timestamp( - self.last_updated_ts or 0 - ) + last_changed = dt_util.utc_from_timestamp(self.last_updated_ts or 0) else: - last_updated = dt_util.utc_from_timestamp(self.last_updated_ts or 0) last_changed = dt_util.utc_from_timestamp(self.last_changed_ts or 0) + if ( + self.last_reported_ts is None + or self.last_reported_ts == self.last_updated_ts + ): + last_reported = dt_util.utc_from_timestamp(self.last_updated_ts or 0) + else: + last_reported = dt_util.utc_from_timestamp(self.last_reported_ts or 0) return State( self.entity_id or "", self.state, # type: ignore[arg-type] @@ -537,7 +548,7 @@ class States(Base): # for newer states attrs, last_changed=last_changed, - last_reported=last_updated, # Recorder does not yet record last_reported + last_reported=last_reported, last_updated=last_updated, context=context, validate_entity_id=validate_entity_id, diff --git a/homeassistant/components/recorder/history/modern.py b/homeassistant/components/recorder/history/modern.py index 124a6a43869..a909f799ea9 100644 --- a/homeassistant/components/recorder/history/modern.py +++ b/homeassistant/components/recorder/history/modern.py @@ -52,32 +52,43 @@ _FIELD_MAP = { def _stmt_and_join_attributes( - no_attributes: bool, include_last_changed: bool + no_attributes: bool, + include_last_changed: bool, + include_last_reported: bool, ) -> Select: """Return the statement and if StateAttributes should be joined.""" _select = select(States.metadata_id, States.state, States.last_updated_ts) if include_last_changed: _select = _select.add_columns(States.last_changed_ts) + if include_last_reported: + _select = _select.add_columns(States.last_reported_ts) if not no_attributes: _select = _select.add_columns(SHARED_ATTR_OR_LEGACY_ATTRIBUTES) return _select def _stmt_and_join_attributes_for_start_state( - no_attributes: bool, include_last_changed: bool + no_attributes: bool, + include_last_changed: bool, + include_last_reported: bool, ) -> Select: """Return the statement and if StateAttributes should be joined.""" _select = select(States.metadata_id, States.state) _select = _select.add_columns(literal(value=0).label("last_updated_ts")) if include_last_changed: _select = _select.add_columns(literal(value=0).label("last_changed_ts")) + if include_last_reported: + _select = _select.add_columns(literal(value=0).label("last_reported_ts")) if not no_attributes: _select = _select.add_columns(SHARED_ATTR_OR_LEGACY_ATTRIBUTES) return _select def _select_from_subquery( - subquery: Subquery | CompoundSelect, no_attributes: bool, include_last_changed: bool + subquery: Subquery | CompoundSelect, + no_attributes: bool, + include_last_changed: bool, + include_last_reported: bool, ) -> Select: """Return the statement to select from the union.""" base_select = select( @@ -87,6 +98,8 @@ def _select_from_subquery( ) if include_last_changed: base_select = base_select.add_columns(subquery.c.last_changed_ts) + if include_last_reported: + base_select = base_select.add_columns(subquery.c.last_reported_ts) if no_attributes: return base_select return base_select.add_columns(subquery.c.attributes) @@ -134,7 +147,7 @@ def _significant_states_stmt( ) -> Select | CompoundSelect: """Query the database for significant state changes.""" include_last_changed = not significant_changes_only - stmt = _stmt_and_join_attributes(no_attributes, include_last_changed) + stmt = _stmt_and_join_attributes(no_attributes, include_last_changed, False) if significant_changes_only: # Since we are filtering on entity_id (metadata_id) we can avoid # the join of the states_meta table since we already know which @@ -174,13 +187,17 @@ def _significant_states_stmt( ).subquery(), no_attributes, include_last_changed, + False, + ), + _select_from_subquery( + stmt.subquery(), no_attributes, include_last_changed, False ), - _select_from_subquery(stmt.subquery(), no_attributes, include_last_changed), ).subquery() return _select_from_subquery( unioned_subquery, no_attributes, include_last_changed, + False, ).order_by(unioned_subquery.c.metadata_id, unioned_subquery.c.last_updated_ts) @@ -312,7 +329,7 @@ def _state_changed_during_period_stmt( run_start_ts: float | None, ) -> Select | CompoundSelect: stmt = ( - _stmt_and_join_attributes(no_attributes, False) + _stmt_and_join_attributes(no_attributes, False, True) .filter( ( (States.last_changed_ts == States.last_updated_ts) @@ -344,18 +361,22 @@ def _state_changed_during_period_stmt( single_metadata_id, no_attributes, False, + True, ).subquery(), no_attributes, False, + True, ), _select_from_subquery( stmt.subquery(), no_attributes, False, + True, ), ).subquery(), no_attributes, False, + True, ) @@ -427,7 +448,7 @@ def state_changes_during_period( def _get_last_state_changes_single_stmt(metadata_id: int) -> Select: return ( - _stmt_and_join_attributes(False, False) + _stmt_and_join_attributes(False, False, False) .join( ( lastest_state_for_metadata_id := ( @@ -457,7 +478,7 @@ def _get_last_state_changes_multiple_stmt( number_of_states: int, metadata_id: int ) -> Select: return ( - _stmt_and_join_attributes(False, False) + _stmt_and_join_attributes(False, False, True) .where( States.state_id == ( @@ -530,7 +551,9 @@ def _get_start_time_state_for_entities_stmt( # We got an include-list of entities, accelerate the query by filtering already # in the inner and the outer query. stmt = ( - _stmt_and_join_attributes_for_start_state(no_attributes, include_last_changed) + _stmt_and_join_attributes_for_start_state( + no_attributes, include_last_changed, False + ) .join( ( most_recent_states_for_entities_by_date := ( @@ -598,6 +621,7 @@ def _get_start_time_state_stmt( single_metadata_id, no_attributes, include_last_changed, + False, ) # We have more than one entity to look at so we need to do a query on states # since the last recorder run started. @@ -615,11 +639,14 @@ def _get_single_entity_start_time_stmt( metadata_id: int, no_attributes: bool, include_last_changed: bool, + include_last_reported: bool, ) -> Select: # Use an entirely different (and extremely fast) query if we only # have a single entity id stmt = ( - _stmt_and_join_attributes_for_start_state(no_attributes, include_last_changed) + _stmt_and_join_attributes_for_start_state( + no_attributes, include_last_changed, include_last_reported + ) .filter( States.last_updated_ts < epoch_time, States.metadata_id == metadata_id, diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 60373a053c9..fc2e6ec2b3f 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -1081,6 +1081,12 @@ def _apply_update( # noqa: C901 _migrate_statistics_columns_to_timestamp_removing_duplicates( hass, instance, session_maker, engine ) + elif new_version == 43: + _add_columns( + session_maker, + "states", + [f"last_reported_ts {_column_types.timestamp_type}"], + ) else: raise ValueError(f"No schema migration defined for version {new_version}") diff --git a/homeassistant/components/recorder/models/legacy.py b/homeassistant/components/recorder/models/legacy.py index af9fcf22f70..4b32ae65748 100644 --- a/homeassistant/components/recorder/models/legacy.py +++ b/homeassistant/components/recorder/models/legacy.py @@ -48,6 +48,7 @@ class LegacyLazyStatePreSchema31(State): self.state = self._row.state or "" self._attributes: dict[str, Any] | None = None self._last_changed: datetime | None = start_time + self._last_reported: datetime | None = start_time self._last_updated: datetime | None = start_time self._context: Context | None = None self.attr_cache = attr_cache @@ -93,6 +94,18 @@ class LegacyLazyStatePreSchema31(State): """Set last changed datetime.""" self._last_changed = value + @property + def last_reported(self) -> datetime: + """Last reported datetime.""" + if self._last_reported is None: + self._last_reported = self.last_updated + return self._last_reported + + @last_reported.setter + def last_reported(self, value: datetime) -> None: + """Set last reported datetime.""" + self._last_reported = value + @property def last_updated(self) -> datetime: """Last updated datetime.""" @@ -196,6 +209,7 @@ class LegacyLazyState(State): self._last_changed_ts: float | None = ( self._row.last_changed_ts or self._last_updated_ts ) + self._last_reported_ts: float | None = self._last_updated_ts self._context: Context | None = None self.attr_cache = attr_cache @@ -236,6 +250,17 @@ class LegacyLazyState(State): """Set last changed datetime.""" self._last_changed_ts = process_timestamp(value).timestamp() + @property + def last_reported(self) -> datetime: + """Last reported datetime.""" + assert self._last_reported_ts is not None + return dt_util.utc_from_timestamp(self._last_reported_ts) + + @last_reported.setter + def last_reported(self, value: datetime) -> None: + """Set last reported datetime.""" + self._last_reported_ts = process_timestamp(value).timestamp() + @property def last_updated(self) -> datetime: """Last updated datetime.""" diff --git a/homeassistant/components/recorder/models/state.py b/homeassistant/components/recorder/models/state.py index 9805aa56909..e1f23f32118 100644 --- a/homeassistant/components/recorder/models/state.py +++ b/homeassistant/components/recorder/models/state.py @@ -81,6 +81,18 @@ class LazyState(State): self._last_changed_ts or self._last_updated_ts ) + @cached_property + def _last_reported_ts(self) -> float | None: + """Last reported timestamp.""" + return getattr(self._row, "last_reported_ts", None) + + @cached_property + def last_reported(self) -> datetime: # type: ignore[override] + """Last reported datetime.""" + return dt_util.utc_from_timestamp( + self._last_reported_ts or self._last_updated_ts + ) + @cached_property def last_updated(self) -> datetime: # type: ignore[override] """Last updated datetime.""" diff --git a/homeassistant/components/recorder/table_managers/states.py b/homeassistant/components/recorder/table_managers/states.py index 80d2fcaddaf..d5cef759c54 100644 --- a/homeassistant/components/recorder/table_managers/states.py +++ b/homeassistant/components/recorder/table_managers/states.py @@ -12,6 +12,7 @@ class StatesManager: """Initialize the states manager for linking old_state_id.""" self._pending: dict[str, States] = {} self._last_committed_id: dict[str, int] = {} + self._last_reported: dict[int, float] = {} def pop_pending(self, entity_id: str) -> States | None: """Pop a pending state. @@ -44,6 +45,16 @@ class StatesManager: """ self._pending[entity_id] = state + def update_pending_last_reported( + self, state_id: int, last_reported_timestamp: float + ) -> None: + """Update the last reported timestamp for a state.""" + self._last_reported[state_id] = last_reported_timestamp + + def get_pending_last_reported_timestamp(self) -> dict[int, float]: + """Return the last reported timestamp for all pending states.""" + return self._last_reported + def post_commit_pending(self) -> None: """Call after commit to load the state_id of the new States into committed. @@ -53,6 +64,7 @@ class StatesManager: for entity_id, db_states in self._pending.items(): self._last_committed_id[entity_id] = db_states.state_id self._pending.clear() + self._last_reported.clear() def reset(self) -> None: """Reset after the database has been reset or changed. diff --git a/tests/components/recorder/auto_repairs/states/test_schema.py b/tests/components/recorder/auto_repairs/states/test_schema.py index c0a9e930966..7d14a873bfe 100644 --- a/tests/components/recorder/auto_repairs/states/test_schema.py +++ b/tests/components/recorder/auto_repairs/states/test_schema.py @@ -45,6 +45,7 @@ async def test_validate_db_schema_fix_float_issue( ) modification = [ "last_changed_ts DOUBLE PRECISION", + "last_reported_ts DOUBLE PRECISION", "last_updated_ts DOUBLE PRECISION", ] modify_columns_mock.assert_called_once_with(ANY, ANY, "states", modification) diff --git a/tests/components/recorder/common.py b/tests/components/recorder/common.py index 1c44511678e..e8fd6dbcf53 100644 --- a/tests/components/recorder/common.py +++ b/tests/components/recorder/common.py @@ -193,6 +193,7 @@ def assert_states_equal_without_context(state: State, other: State) -> None: """Assert that two states are equal, ignoring context.""" assert_states_equal_without_context_and_last_changed(state, other) assert state.last_changed == other.last_changed + assert state.last_reported == other.last_reported def assert_states_equal_without_context_and_last_changed( diff --git a/tests/components/recorder/db_schema_30.py b/tests/components/recorder/db_schema_30.py index 0fba51d588e..b82213cbc89 100644 --- a/tests/components/recorder/db_schema_30.py +++ b/tests/components/recorder/db_schema_30.py @@ -375,6 +375,9 @@ class States(Base): # type: ignore[misc,valid-type] last_changed_ts = Column( TIMESTAMP_TYPE ) # *** Not originally in v30, only added for recorder to startup ok + last_reported_ts = Column( + TIMESTAMP_TYPE + ) # *** Not originally in v30, only added for recorder to startup ok last_updated = Column(DATETIME_TYPE, default=dt_util.utcnow, index=True) last_updated_ts = Column( TIMESTAMP_TYPE, default=time.time, index=True diff --git a/tests/components/recorder/db_schema_32.py b/tests/components/recorder/db_schema_32.py index 7def9867373..15b56e2fc86 100644 --- a/tests/components/recorder/db_schema_32.py +++ b/tests/components/recorder/db_schema_32.py @@ -372,6 +372,9 @@ class States(Base): # type: ignore[misc,valid-type] ) last_changed = Column(DATETIME_TYPE) last_changed_ts = Column(TIMESTAMP_TYPE) + last_reported_ts = Column( + TIMESTAMP_TYPE + ) # *** Not originally in v32, only added for recorder to startup ok last_updated = Column(DATETIME_TYPE, default=dt_util.utcnow, index=True) last_updated_ts = Column(TIMESTAMP_TYPE, default=time.time, index=True) old_state_id = Column(Integer, ForeignKey("states.state_id"), index=True) diff --git a/tests/components/recorder/test_history.py b/tests/components/recorder/test_history.py index 40290924d11..d16a6856399 100644 --- a/tests/components/recorder/test_history.py +++ b/tests/components/recorder/test_history.py @@ -246,6 +246,41 @@ def test_state_changes_during_period( assert_multiple_states_equal_without_context(states[:limit], hist[entity_id]) +def test_state_changes_during_period_last_reported( + hass_recorder: Callable[..., HomeAssistant], +) -> None: + """Test state change during period.""" + hass = hass_recorder() + entity_id = "media_player.test" + + def set_state(state): + """Set the state.""" + hass.states.set(entity_id, state) + wait_recording_done(hass) + return hass.states.get(entity_id) + + start = dt_util.utcnow() + point1 = start + timedelta(seconds=1) + point2 = point1 + timedelta(seconds=1) + end = point2 + timedelta(seconds=1) + + with freeze_time(start) as freezer: + set_state("idle") + + freezer.move_to(point1) + set_state("YouTube") + + freezer.move_to(point2) + states = [set_state("YouTube")] + + freezer.move_to(end) + set_state("Netflix") + + hist = history.state_changes_during_period(hass, start, end, entity_id) + + assert_multiple_states_equal_without_context(states, hist[entity_id]) + + def test_state_changes_during_period_descending( hass_recorder: Callable[..., HomeAssistant], ) -> None: @@ -380,6 +415,38 @@ def test_get_last_state_changes(hass_recorder: Callable[..., HomeAssistant]) -> assert_multiple_states_equal_without_context(states, hist[entity_id]) +def test_get_last_state_changes_last_reported( + hass_recorder: Callable[..., HomeAssistant], +) -> None: + """Test number of state changes.""" + hass = hass_recorder() + entity_id = "sensor.test" + + def set_state(state): + """Set the state.""" + hass.states.set(entity_id, state) + wait_recording_done(hass) + return hass.states.get(entity_id) + + start = dt_util.utcnow() - timedelta(minutes=2) + point = start + timedelta(minutes=1) + point2 = point + timedelta(minutes=1, seconds=1) + states = [] + + with freeze_time(start) as freezer: + set_state("1") + + freezer.move_to(point) + states.append(set_state("1")) + + freezer.move_to(point2) + states.append(set_state("2")) + + hist = history.get_last_state_changes(hass, 2, entity_id) + + assert_multiple_states_equal_without_context(states, hist[entity_id]) + + def test_get_last_state_change(hass_recorder: Callable[..., HomeAssistant]) -> None: """Test getting the last state change for an entity.""" hass = hass_recorder() @@ -577,6 +644,7 @@ def test_get_significant_states_without_initial( ) ) del states["media_player.test2"] + del states["thermostat.test3"] hist = history.get_significant_states( hass, @@ -598,6 +666,7 @@ def test_get_significant_states_entity_id( del states["media_player.test3"] del states["thermostat.test"] del states["thermostat.test2"] + del states["thermostat.test3"] del states["script.can_cancel_this_one"] hist = history.get_significant_states(hass, zero, four, ["media_player.test"]) @@ -745,6 +814,7 @@ def record_states(hass) -> tuple[datetime, datetime, dict[str, list[State]]]: mp3 = "media_player.test3" therm = "thermostat.test" therm2 = "thermostat.test2" + therm3 = "thermostat.test3" zone = "zone.home" script_c = "script.can_cancel_this_one" @@ -760,7 +830,7 @@ def record_states(hass) -> tuple[datetime, datetime, dict[str, list[State]]]: three = two + timedelta(seconds=1) four = three + timedelta(seconds=1) - states = {therm: [], therm2: [], mp: [], mp2: [], mp3: [], script_c: []} + states = {therm: [], therm2: [], therm3: [], mp: [], mp2: [], mp3: [], script_c: []} with freeze_time(one) as freezer: states[mp].append( set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)}) @@ -774,6 +844,8 @@ def record_states(hass) -> tuple[datetime, datetime, dict[str, list[State]]]: states[therm].append( set_state(therm, 20, attributes={"current_temperature": 19.5}) ) + # This state will be updated + set_state(therm3, 20, attributes={"current_temperature": 19.5}) freezer.move_to(one + timedelta(microseconds=1)) states[mp].append( @@ -794,6 +866,8 @@ def record_states(hass) -> tuple[datetime, datetime, dict[str, list[State]]]: states[therm2].append( set_state(therm2, 20, attributes={"current_temperature": 19}) ) + # This state will be updated + set_state(therm3, 20, attributes={"current_temperature": 19.5}) freezer.move_to(three) states[mp].append( @@ -806,6 +880,9 @@ def record_states(hass) -> tuple[datetime, datetime, dict[str, list[State]]]: states[therm].append( set_state(therm, 21, attributes={"current_temperature": 20}) ) + states[therm3].append( + set_state(therm3, 20, attributes={"current_temperature": 19.5}) + ) return zero, four, states diff --git a/tests/components/recorder/test_util.py b/tests/components/recorder/test_util.py index 9a5b91fa8f8..549280efba2 100644 --- a/tests/components/recorder/test_util.py +++ b/tests/components/recorder/test_util.py @@ -925,7 +925,7 @@ def test_execute_stmt_lambda_element( start_time_ts = dt_util.utcnow().timestamp() stmt = lambda_stmt( lambda: _get_single_entity_start_time_stmt( - start_time_ts, metadata_id, False, False + start_time_ts, metadata_id, False, False, False ) ) rows = util.execute_stmt_lambda_element(session, stmt)