From 8567aa9e1332bc89fb7b8922f63f0ce89865d977 Mon Sep 17 00:00:00 2001 From: Erik Montnemery Date: Mon, 4 Oct 2021 17:21:40 +0200 Subject: [PATCH] Evict purged states from recorder's old_state cache (#56877) Co-authored-by: J. Nick Koston --- homeassistant/components/recorder/purge.py | 51 ++++++++++---- tests/components/recorder/test_purge.py | 80 ++++++++++++---------- 2 files changed, 81 insertions(+), 50 deletions(-) diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index bc91f7ce67e..2b84a439871 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -38,7 +38,8 @@ def purge_old_data( event_ids = _select_event_ids_to_purge(session, purge_before) state_ids = _select_state_ids_to_purge(session, purge_before, event_ids) if state_ids: - _purge_state_ids(session, state_ids) + _purge_state_ids(instance, session, state_ids) + if event_ids: _purge_event_ids(session, event_ids) # If states or events purging isn't processing the purge_before yet, @@ -68,10 +69,10 @@ def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list def _select_state_ids_to_purge( session: Session, purge_before: datetime, event_ids: list[int] -) -> list[int]: +) -> set[int]: """Return a list of state ids to purge.""" if not event_ids: - return [] + return set() states = ( session.query(States.state_id) .filter(States.last_updated < purge_before) @@ -79,10 +80,10 @@ def _select_state_ids_to_purge( .all() ) _LOGGER.debug("Selected %s state ids to remove", len(states)) - return [state.state_id for state in states] + return {state.state_id for state in states} -def _purge_state_ids(session: Session, state_ids: list[int]) -> None: +def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int]) -> None: """Disconnect states and delete by state id.""" # Update old_state_id to NULL before deleting to ensure @@ -103,6 +104,26 @@ def _purge_state_ids(session: Session, state_ids: list[int]) -> None: ) _LOGGER.debug("Deleted %s states", deleted_rows) + # Evict eny entries in the old_states cache referring to a purged state + _evict_purged_states_from_old_states_cache(instance, state_ids) + + +def _evict_purged_states_from_old_states_cache( + instance: Recorder, purged_state_ids: set[int] +) -> None: + """Evict purged states from the old states cache.""" + # Make a map from old_state_id to entity_id + old_states = instance._old_states # pylint: disable=protected-access + old_state_reversed = { + old_state.state_id: entity_id + for entity_id, old_state in old_states.items() + if old_state.state_id + } + + # Evict any purged state from the old states cache + for purged_state_id in purged_state_ids.intersection(old_state_reversed): + old_states.pop(old_state_reversed[purged_state_id], None) + def _purge_event_ids(session: Session, event_ids: list[int]) -> None: """Delete by event id.""" @@ -139,7 +160,7 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool: if not instance.entity_filter(entity_id) ] if len(excluded_entity_ids) > 0: - _purge_filtered_states(session, excluded_entity_ids) + _purge_filtered_states(instance, session, excluded_entity_ids) return False # Check if excluded event_types are in database @@ -149,13 +170,15 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool: if event_type in instance.exclude_t ] if len(excluded_event_types) > 0: - _purge_filtered_events(session, excluded_event_types) + _purge_filtered_events(instance, session, excluded_event_types) return False return True -def _purge_filtered_states(session: Session, excluded_entity_ids: list[str]) -> None: +def _purge_filtered_states( + instance: Recorder, session: Session, excluded_entity_ids: list[str] +) -> None: """Remove filtered states and linked events.""" state_ids: list[int] event_ids: list[int | None] @@ -171,11 +194,13 @@ def _purge_filtered_states(session: Session, excluded_entity_ids: list[str]) -> _LOGGER.debug( "Selected %s state_ids to remove that should be filtered", len(state_ids) ) - _purge_state_ids(session, state_ids) + _purge_state_ids(instance, session, set(state_ids)) _purge_event_ids(session, event_ids) # type: ignore # type of event_ids already narrowed to 'list[int]' -def _purge_filtered_events(session: Session, excluded_event_types: list[str]) -> None: +def _purge_filtered_events( + instance: Recorder, session: Session, excluded_event_types: list[str] +) -> None: """Remove filtered events and linked states.""" events: list[Events] = ( session.query(Events.event_id) @@ -190,8 +215,8 @@ def _purge_filtered_events(session: Session, excluded_event_types: list[str]) -> states: list[States] = ( session.query(States.state_id).filter(States.event_id.in_(event_ids)).all() ) - state_ids: list[int] = [state.state_id for state in states] - _purge_state_ids(session, state_ids) + state_ids: set[int] = {state.state_id for state in states} + _purge_state_ids(instance, session, state_ids) _purge_event_ids(session, event_ids) @@ -207,7 +232,7 @@ def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool]) _LOGGER.debug("Purging entity data for %s", selected_entity_ids) if len(selected_entity_ids) > 0: # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record - _purge_filtered_states(session, selected_entity_ids) + _purge_filtered_states(instance, session, selected_entity_ids) _LOGGER.debug("Purging entity data hasn't fully completed yet") return False diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index 40ad71096c1..0e66beecd87 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -44,6 +44,7 @@ async def test_purge_old_states( events = session.query(Events).filter(Events.event_type == "state_changed") assert events.count() == 6 + assert "test.recorder2" in instance._old_states purge_before = dt_util.utcnow() - timedelta(days=4) @@ -51,6 +52,7 @@ async def test_purge_old_states( finished = purge_old_data(instance, purge_before, repack=False) assert not finished assert states.count() == 2 + assert "test.recorder2" in instance._old_states states_after_purge = session.query(States) assert states_after_purge[1].old_state_id == states_after_purge[0].state_id @@ -59,6 +61,28 @@ async def test_purge_old_states( finished = purge_old_data(instance, purge_before, repack=False) assert finished assert states.count() == 2 + assert "test.recorder2" in instance._old_states + + # run purge_old_data again + purge_before = dt_util.utcnow() + finished = purge_old_data(instance, purge_before, repack=False) + assert not finished + assert states.count() == 0 + assert "test.recorder2" not in instance._old_states + + # Add some more states + await _add_test_states(hass, instance) + + # make sure we start with 6 states + with session_scope(hass=hass) as session: + states = session.query(States) + assert states.count() == 6 + assert states[0].old_state_id is None + assert states[-1].old_state_id == states[-2].state_id + + events = session.query(Events).filter(Events.event_type == "state_changed") + assert events.count() == 6 + assert "test.recorder2" in instance._old_states async def test_purge_old_states_encouters_database_corruption( @@ -872,45 +896,27 @@ async def _add_test_states(hass: HomeAssistant, instance: recorder.Recorder): eleven_days_ago = utcnow - timedelta(days=11) attributes = {"test_attr": 5, "test_attr_10": "nice"} - await hass.async_block_till_done() - await async_wait_recording_done(hass, instance) + async def set_state(entity_id, state, **kwargs): + """Set the state.""" + hass.states.async_set(entity_id, state, **kwargs) + await hass.async_block_till_done() + await async_wait_recording_done(hass, instance) - with recorder.session_scope(hass=hass) as session: - old_state_id = None - for event_id in range(6): - if event_id < 2: - timestamp = eleven_days_ago - state = "autopurgeme" - elif event_id < 4: - timestamp = five_days_ago - state = "purgeme" - else: - timestamp = utcnow - state = "dontpurgeme" + for event_id in range(6): + if event_id < 2: + timestamp = eleven_days_ago + state = f"autopurgeme_{event_id}" + elif event_id < 4: + timestamp = five_days_ago + state = f"purgeme_{event_id}" + else: + timestamp = utcnow + state = f"dontpurgeme_{event_id}" - event = Events( - event_type="state_changed", - event_data="{}", - origin="LOCAL", - created=timestamp, - time_fired=timestamp, - ) - session.add(event) - session.flush() - state = States( - entity_id="test.recorder2", - domain="sensor", - state=state, - attributes=json.dumps(attributes), - last_changed=timestamp, - last_updated=timestamp, - created=timestamp, - event_id=event.event_id, - old_state_id=old_state_id, - ) - session.add(state) - session.flush() - old_state_id = state.state_id + with patch( + "homeassistant.components.recorder.dt_util.utcnow", return_value=timestamp + ): + await set_state("test.recorder2", state, attributes=attributes) async def _add_test_events(hass: HomeAssistant, instance: recorder.Recorder):