diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index 2b84a439871..e44ae9aafff 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -6,11 +6,12 @@ from datetime import datetime import logging from typing import TYPE_CHECKING +from sqlalchemy import func from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import distinct from .const import MAX_ROWS_TO_PURGE -from .models import Events, RecorderRuns, States +from .models import Events, RecorderRuns, States, StatisticsRuns, StatisticsShortTerm from .repack import repack_database from .util import retryable_database_job, session_scope @@ -37,18 +38,32 @@ def purge_old_data( # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record event_ids = _select_event_ids_to_purge(session, purge_before) state_ids = _select_state_ids_to_purge(session, purge_before, event_ids) + statistics_runs = _select_statistics_runs_to_purge(session, purge_before) + short_term_statistics = _select_short_term_statistics_to_purge( + session, purge_before + ) + if state_ids: _purge_state_ids(instance, session, state_ids) if event_ids: _purge_event_ids(session, event_ids) - # If states or events purging isn't processing the purge_before yet, - # return false, as we are not done yet. + + if statistics_runs: + _purge_statistics_runs(session, statistics_runs) + + if short_term_statistics: + _purge_short_term_statistics(session, short_term_statistics) + + if event_ids or statistics_runs or short_term_statistics: + # Return false, as we might not be done yet. _LOGGER.debug("Purging hasn't fully completed yet") return False + if apply_filter and _purge_filtered_data(instance, session) is False: _LOGGER.debug("Cleanup filtered data hasn't fully completed yet") return False + _purge_old_recorder_runs(instance, session, purge_before) if repack: repack_database(instance) @@ -83,6 +98,41 @@ def _select_state_ids_to_purge( return {state.state_id for state in states} +def _select_statistics_runs_to_purge( + session: Session, purge_before: datetime +) -> list[int]: + """Return a list of statistic runs to purge, but take care to keep the newest run.""" + statistic_runs = ( + session.query(StatisticsRuns.run_id) + .filter(StatisticsRuns.start < purge_before) + .limit(MAX_ROWS_TO_PURGE) + .all() + ) + statistic_runs_list = [run.run_id for run in statistic_runs] + # Exclude the newest statistics run + if ( + last_run := session.query(func.max(StatisticsRuns.run_id)).scalar() + ) and last_run in statistic_runs_list: + statistic_runs_list.remove(last_run) + + _LOGGER.debug("Selected %s statistic runs to remove", len(statistic_runs)) + return statistic_runs_list + + +def _select_short_term_statistics_to_purge( + session: Session, purge_before: datetime +) -> list[int]: + """Return a list of short term statistics to purge.""" + statistics = ( + session.query(StatisticsShortTerm.id) + .filter(StatisticsShortTerm.start < purge_before) + .limit(MAX_ROWS_TO_PURGE) + .all() + ) + _LOGGER.debug("Selected %s short term statistics to remove", len(statistics)) + return [statistic.id for statistic in statistics] + + def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int]) -> None: """Disconnect states and delete by state id.""" @@ -125,6 +175,28 @@ def _evict_purged_states_from_old_states_cache( old_states.pop(old_state_reversed[purged_state_id], None) +def _purge_statistics_runs(session: Session, statistics_runs: list[int]) -> None: + """Delete by run_id.""" + deleted_rows = ( + session.query(StatisticsRuns) + .filter(StatisticsRuns.run_id.in_(statistics_runs)) + .delete(synchronize_session=False) + ) + _LOGGER.debug("Deleted %s statistic runs", deleted_rows) + + +def _purge_short_term_statistics( + session: Session, short_term_statistics: list[int] +) -> None: + """Delete by id.""" + deleted_rows = ( + session.query(StatisticsShortTerm) + .filter(StatisticsShortTerm.id.in_(short_term_statistics)) + .delete(synchronize_session=False) + ) + _LOGGER.debug("Deleted %s short term statistics", deleted_rows) + + def _purge_event_ids(session: Session, event_ids: list[int]) -> None: """Delete by event id.""" deleted_rows = ( diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index 0e66beecd87..8920843e8fe 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -10,7 +10,13 @@ from sqlalchemy.orm.session import Session from homeassistant.components import recorder from homeassistant.components.recorder import PurgeTask from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE -from homeassistant.components.recorder.models import Events, RecorderRuns, States +from homeassistant.components.recorder.models import ( + Events, + RecorderRuns, + States, + StatisticsRuns, + StatisticsShortTerm, +) from homeassistant.components.recorder.purge import purge_old_data from homeassistant.components.recorder.util import session_scope from homeassistant.const import EVENT_STATE_CHANGED @@ -227,6 +233,30 @@ async def test_purge_old_recorder_runs( assert recorder_runs.count() == 1 +async def test_purge_old_statistics_runs( + hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT +): + """Test deleting old statistics runs keeps the latest run.""" + instance = await async_setup_recorder_instance(hass) + + await _add_test_statistics_runs(hass, instance) + + # make sure we start with 7 statistics runs + with session_scope(hass=hass) as session: + statistics_runs = session.query(StatisticsRuns) + assert statistics_runs.count() == 7 + + purge_before = dt_util.utcnow() + + # run purge_old_data() + finished = purge_old_data(instance, purge_before, repack=False) + assert not finished + + finished = purge_old_data(instance, purge_before, repack=False) + assert finished + assert statistics_runs.count() == 1 + + async def test_purge_method( hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT, @@ -238,7 +268,9 @@ async def test_purge_method( service_data = {"keep_days": 4} await _add_test_events(hass, instance) await _add_test_states(hass, instance) + await _add_test_statistics(hass, instance) await _add_test_recorder_runs(hass, instance) + await _add_test_statistics_runs(hass, instance) await hass.async_block_till_done() await async_wait_recording_done(hass, instance) @@ -250,10 +282,17 @@ async def test_purge_method( events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%")) assert events.count() == 6 + statistics = session.query(StatisticsShortTerm) + assert statistics.count() == 6 + recorder_runs = session.query(RecorderRuns) assert recorder_runs.count() == 7 runs_before_purge = recorder_runs.all() + statistics_runs = session.query(StatisticsRuns) + assert statistics_runs.count() == 7 + statistic_runs_before_purge = statistics_runs.all() + await hass.async_block_till_done() await async_wait_purge_done(hass, instance) @@ -264,9 +303,10 @@ async def test_purge_method( # Small wait for recorder thread await async_wait_purge_done(hass, instance) - # only purged old events + # only purged old states, events and statistics assert states.count() == 4 assert events.count() == 4 + assert statistics.count() == 4 # run purge method - correct service data await hass.services.async_call("recorder", "purge", service_data=service_data) @@ -275,11 +315,10 @@ async def test_purge_method( # Small wait for recorder thread await async_wait_purge_done(hass, instance) - # we should only have 2 states left after purging + # we should only have 2 states, events and statistics left after purging assert states.count() == 2 - - # now we should only have 2 events left assert events.count() == 2 + assert statistics.count() == 2 # now we should only have 3 recorder runs left runs = recorder_runs.all() @@ -287,6 +326,12 @@ async def test_purge_method( assert runs[1] == runs_before_purge[5] assert runs[2] == runs_before_purge[6] + # now we should only have 3 statistics runs left + runs = statistics_runs.all() + assert runs[0] == statistic_runs_before_purge[0] + assert runs[1] == statistic_runs_before_purge[5] + assert runs[2] == statistic_runs_before_purge[6] + assert "EVENT_TEST_PURGE" not in (event.event_type for event in events.all()) # run purge method - correct service data, with repack @@ -952,6 +997,35 @@ async def _add_test_events(hass: HomeAssistant, instance: recorder.Recorder): ) +async def _add_test_statistics(hass: HomeAssistant, instance: recorder.Recorder): + """Add multiple statistics to the db for testing.""" + utcnow = dt_util.utcnow() + five_days_ago = utcnow - timedelta(days=5) + eleven_days_ago = utcnow - timedelta(days=11) + + await hass.async_block_till_done() + await async_wait_recording_done(hass, instance) + + with recorder.session_scope(hass=hass) as session: + for event_id in range(6): + if event_id < 2: + timestamp = eleven_days_ago + state = "-11" + elif event_id < 4: + timestamp = five_days_ago + state = "-5" + else: + timestamp = utcnow + state = "0" + + session.add( + StatisticsShortTerm( + start=timestamp, + state=state, + ) + ) + + async def _add_test_recorder_runs(hass: HomeAssistant, instance: recorder.Recorder): """Add a few recorder_runs for testing.""" utcnow = dt_util.utcnow() @@ -979,6 +1053,31 @@ async def _add_test_recorder_runs(hass: HomeAssistant, instance: recorder.Record ) +async def _add_test_statistics_runs(hass: HomeAssistant, instance: recorder.Recorder): + """Add a few recorder_runs for testing.""" + utcnow = dt_util.utcnow() + five_days_ago = utcnow - timedelta(days=5) + eleven_days_ago = utcnow - timedelta(days=11) + + await hass.async_block_till_done() + await async_wait_recording_done(hass, instance) + + with recorder.session_scope(hass=hass) as session: + for rec_id in range(6): + if rec_id < 2: + timestamp = eleven_days_ago + elif rec_id < 4: + timestamp = five_days_ago + else: + timestamp = utcnow + + session.add( + StatisticsRuns( + start=timestamp, + ) + ) + + def _add_state_and_state_changed_event( session: Session, entity_id: str,