Purge short term statistics (#58028)
* Purge short term statistics * Less meep * Add testspull/58106/head
parent
558c2556f1
commit
b301ab25a3
|
@ -6,11 +6,12 @@ from datetime import datetime
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from sqlalchemy import func
|
||||||
from sqlalchemy.orm.session import Session
|
from sqlalchemy.orm.session import Session
|
||||||
from sqlalchemy.sql.expression import distinct
|
from sqlalchemy.sql.expression import distinct
|
||||||
|
|
||||||
from .const import MAX_ROWS_TO_PURGE
|
from .const import MAX_ROWS_TO_PURGE
|
||||||
from .models import Events, RecorderRuns, States
|
from .models import Events, RecorderRuns, States, StatisticsRuns, StatisticsShortTerm
|
||||||
from .repack import repack_database
|
from .repack import repack_database
|
||||||
from .util import retryable_database_job, session_scope
|
from .util import retryable_database_job, session_scope
|
||||||
|
|
||||||
|
@ -37,18 +38,32 @@ def purge_old_data(
|
||||||
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
|
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
|
||||||
event_ids = _select_event_ids_to_purge(session, purge_before)
|
event_ids = _select_event_ids_to_purge(session, purge_before)
|
||||||
state_ids = _select_state_ids_to_purge(session, purge_before, event_ids)
|
state_ids = _select_state_ids_to_purge(session, purge_before, event_ids)
|
||||||
|
statistics_runs = _select_statistics_runs_to_purge(session, purge_before)
|
||||||
|
short_term_statistics = _select_short_term_statistics_to_purge(
|
||||||
|
session, purge_before
|
||||||
|
)
|
||||||
|
|
||||||
if state_ids:
|
if state_ids:
|
||||||
_purge_state_ids(instance, session, state_ids)
|
_purge_state_ids(instance, session, state_ids)
|
||||||
|
|
||||||
if event_ids:
|
if event_ids:
|
||||||
_purge_event_ids(session, event_ids)
|
_purge_event_ids(session, event_ids)
|
||||||
# If states or events purging isn't processing the purge_before yet,
|
|
||||||
# return false, as we are not done yet.
|
if statistics_runs:
|
||||||
|
_purge_statistics_runs(session, statistics_runs)
|
||||||
|
|
||||||
|
if short_term_statistics:
|
||||||
|
_purge_short_term_statistics(session, short_term_statistics)
|
||||||
|
|
||||||
|
if event_ids or statistics_runs or short_term_statistics:
|
||||||
|
# Return false, as we might not be done yet.
|
||||||
_LOGGER.debug("Purging hasn't fully completed yet")
|
_LOGGER.debug("Purging hasn't fully completed yet")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if apply_filter and _purge_filtered_data(instance, session) is False:
|
if apply_filter and _purge_filtered_data(instance, session) is False:
|
||||||
_LOGGER.debug("Cleanup filtered data hasn't fully completed yet")
|
_LOGGER.debug("Cleanup filtered data hasn't fully completed yet")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
_purge_old_recorder_runs(instance, session, purge_before)
|
_purge_old_recorder_runs(instance, session, purge_before)
|
||||||
if repack:
|
if repack:
|
||||||
repack_database(instance)
|
repack_database(instance)
|
||||||
|
@ -83,6 +98,41 @@ def _select_state_ids_to_purge(
|
||||||
return {state.state_id for state in states}
|
return {state.state_id for state in states}
|
||||||
|
|
||||||
|
|
||||||
|
def _select_statistics_runs_to_purge(
|
||||||
|
session: Session, purge_before: datetime
|
||||||
|
) -> list[int]:
|
||||||
|
"""Return a list of statistic runs to purge, but take care to keep the newest run."""
|
||||||
|
statistic_runs = (
|
||||||
|
session.query(StatisticsRuns.run_id)
|
||||||
|
.filter(StatisticsRuns.start < purge_before)
|
||||||
|
.limit(MAX_ROWS_TO_PURGE)
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
statistic_runs_list = [run.run_id for run in statistic_runs]
|
||||||
|
# Exclude the newest statistics run
|
||||||
|
if (
|
||||||
|
last_run := session.query(func.max(StatisticsRuns.run_id)).scalar()
|
||||||
|
) and last_run in statistic_runs_list:
|
||||||
|
statistic_runs_list.remove(last_run)
|
||||||
|
|
||||||
|
_LOGGER.debug("Selected %s statistic runs to remove", len(statistic_runs))
|
||||||
|
return statistic_runs_list
|
||||||
|
|
||||||
|
|
||||||
|
def _select_short_term_statistics_to_purge(
|
||||||
|
session: Session, purge_before: datetime
|
||||||
|
) -> list[int]:
|
||||||
|
"""Return a list of short term statistics to purge."""
|
||||||
|
statistics = (
|
||||||
|
session.query(StatisticsShortTerm.id)
|
||||||
|
.filter(StatisticsShortTerm.start < purge_before)
|
||||||
|
.limit(MAX_ROWS_TO_PURGE)
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
_LOGGER.debug("Selected %s short term statistics to remove", len(statistics))
|
||||||
|
return [statistic.id for statistic in statistics]
|
||||||
|
|
||||||
|
|
||||||
def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int]) -> None:
|
def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int]) -> None:
|
||||||
"""Disconnect states and delete by state id."""
|
"""Disconnect states and delete by state id."""
|
||||||
|
|
||||||
|
@ -125,6 +175,28 @@ def _evict_purged_states_from_old_states_cache(
|
||||||
old_states.pop(old_state_reversed[purged_state_id], None)
|
old_states.pop(old_state_reversed[purged_state_id], None)
|
||||||
|
|
||||||
|
|
||||||
|
def _purge_statistics_runs(session: Session, statistics_runs: list[int]) -> None:
|
||||||
|
"""Delete by run_id."""
|
||||||
|
deleted_rows = (
|
||||||
|
session.query(StatisticsRuns)
|
||||||
|
.filter(StatisticsRuns.run_id.in_(statistics_runs))
|
||||||
|
.delete(synchronize_session=False)
|
||||||
|
)
|
||||||
|
_LOGGER.debug("Deleted %s statistic runs", deleted_rows)
|
||||||
|
|
||||||
|
|
||||||
|
def _purge_short_term_statistics(
|
||||||
|
session: Session, short_term_statistics: list[int]
|
||||||
|
) -> None:
|
||||||
|
"""Delete by id."""
|
||||||
|
deleted_rows = (
|
||||||
|
session.query(StatisticsShortTerm)
|
||||||
|
.filter(StatisticsShortTerm.id.in_(short_term_statistics))
|
||||||
|
.delete(synchronize_session=False)
|
||||||
|
)
|
||||||
|
_LOGGER.debug("Deleted %s short term statistics", deleted_rows)
|
||||||
|
|
||||||
|
|
||||||
def _purge_event_ids(session: Session, event_ids: list[int]) -> None:
|
def _purge_event_ids(session: Session, event_ids: list[int]) -> None:
|
||||||
"""Delete by event id."""
|
"""Delete by event id."""
|
||||||
deleted_rows = (
|
deleted_rows = (
|
||||||
|
|
|
@ -10,7 +10,13 @@ from sqlalchemy.orm.session import Session
|
||||||
from homeassistant.components import recorder
|
from homeassistant.components import recorder
|
||||||
from homeassistant.components.recorder import PurgeTask
|
from homeassistant.components.recorder import PurgeTask
|
||||||
from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE
|
from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE
|
||||||
from homeassistant.components.recorder.models import Events, RecorderRuns, States
|
from homeassistant.components.recorder.models import (
|
||||||
|
Events,
|
||||||
|
RecorderRuns,
|
||||||
|
States,
|
||||||
|
StatisticsRuns,
|
||||||
|
StatisticsShortTerm,
|
||||||
|
)
|
||||||
from homeassistant.components.recorder.purge import purge_old_data
|
from homeassistant.components.recorder.purge import purge_old_data
|
||||||
from homeassistant.components.recorder.util import session_scope
|
from homeassistant.components.recorder.util import session_scope
|
||||||
from homeassistant.const import EVENT_STATE_CHANGED
|
from homeassistant.const import EVENT_STATE_CHANGED
|
||||||
|
@ -227,6 +233,30 @@ async def test_purge_old_recorder_runs(
|
||||||
assert recorder_runs.count() == 1
|
assert recorder_runs.count() == 1
|
||||||
|
|
||||||
|
|
||||||
|
async def test_purge_old_statistics_runs(
|
||||||
|
hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT
|
||||||
|
):
|
||||||
|
"""Test deleting old statistics runs keeps the latest run."""
|
||||||
|
instance = await async_setup_recorder_instance(hass)
|
||||||
|
|
||||||
|
await _add_test_statistics_runs(hass, instance)
|
||||||
|
|
||||||
|
# make sure we start with 7 statistics runs
|
||||||
|
with session_scope(hass=hass) as session:
|
||||||
|
statistics_runs = session.query(StatisticsRuns)
|
||||||
|
assert statistics_runs.count() == 7
|
||||||
|
|
||||||
|
purge_before = dt_util.utcnow()
|
||||||
|
|
||||||
|
# run purge_old_data()
|
||||||
|
finished = purge_old_data(instance, purge_before, repack=False)
|
||||||
|
assert not finished
|
||||||
|
|
||||||
|
finished = purge_old_data(instance, purge_before, repack=False)
|
||||||
|
assert finished
|
||||||
|
assert statistics_runs.count() == 1
|
||||||
|
|
||||||
|
|
||||||
async def test_purge_method(
|
async def test_purge_method(
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
async_setup_recorder_instance: SetupRecorderInstanceT,
|
async_setup_recorder_instance: SetupRecorderInstanceT,
|
||||||
|
@ -238,7 +268,9 @@ async def test_purge_method(
|
||||||
service_data = {"keep_days": 4}
|
service_data = {"keep_days": 4}
|
||||||
await _add_test_events(hass, instance)
|
await _add_test_events(hass, instance)
|
||||||
await _add_test_states(hass, instance)
|
await _add_test_states(hass, instance)
|
||||||
|
await _add_test_statistics(hass, instance)
|
||||||
await _add_test_recorder_runs(hass, instance)
|
await _add_test_recorder_runs(hass, instance)
|
||||||
|
await _add_test_statistics_runs(hass, instance)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
await async_wait_recording_done(hass, instance)
|
await async_wait_recording_done(hass, instance)
|
||||||
|
|
||||||
|
@ -250,10 +282,17 @@ async def test_purge_method(
|
||||||
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
|
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
|
||||||
assert events.count() == 6
|
assert events.count() == 6
|
||||||
|
|
||||||
|
statistics = session.query(StatisticsShortTerm)
|
||||||
|
assert statistics.count() == 6
|
||||||
|
|
||||||
recorder_runs = session.query(RecorderRuns)
|
recorder_runs = session.query(RecorderRuns)
|
||||||
assert recorder_runs.count() == 7
|
assert recorder_runs.count() == 7
|
||||||
runs_before_purge = recorder_runs.all()
|
runs_before_purge = recorder_runs.all()
|
||||||
|
|
||||||
|
statistics_runs = session.query(StatisticsRuns)
|
||||||
|
assert statistics_runs.count() == 7
|
||||||
|
statistic_runs_before_purge = statistics_runs.all()
|
||||||
|
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
await async_wait_purge_done(hass, instance)
|
await async_wait_purge_done(hass, instance)
|
||||||
|
|
||||||
|
@ -264,9 +303,10 @@ async def test_purge_method(
|
||||||
# Small wait for recorder thread
|
# Small wait for recorder thread
|
||||||
await async_wait_purge_done(hass, instance)
|
await async_wait_purge_done(hass, instance)
|
||||||
|
|
||||||
# only purged old events
|
# only purged old states, events and statistics
|
||||||
assert states.count() == 4
|
assert states.count() == 4
|
||||||
assert events.count() == 4
|
assert events.count() == 4
|
||||||
|
assert statistics.count() == 4
|
||||||
|
|
||||||
# run purge method - correct service data
|
# run purge method - correct service data
|
||||||
await hass.services.async_call("recorder", "purge", service_data=service_data)
|
await hass.services.async_call("recorder", "purge", service_data=service_data)
|
||||||
|
@ -275,11 +315,10 @@ async def test_purge_method(
|
||||||
# Small wait for recorder thread
|
# Small wait for recorder thread
|
||||||
await async_wait_purge_done(hass, instance)
|
await async_wait_purge_done(hass, instance)
|
||||||
|
|
||||||
# we should only have 2 states left after purging
|
# we should only have 2 states, events and statistics left after purging
|
||||||
assert states.count() == 2
|
assert states.count() == 2
|
||||||
|
|
||||||
# now we should only have 2 events left
|
|
||||||
assert events.count() == 2
|
assert events.count() == 2
|
||||||
|
assert statistics.count() == 2
|
||||||
|
|
||||||
# now we should only have 3 recorder runs left
|
# now we should only have 3 recorder runs left
|
||||||
runs = recorder_runs.all()
|
runs = recorder_runs.all()
|
||||||
|
@ -287,6 +326,12 @@ async def test_purge_method(
|
||||||
assert runs[1] == runs_before_purge[5]
|
assert runs[1] == runs_before_purge[5]
|
||||||
assert runs[2] == runs_before_purge[6]
|
assert runs[2] == runs_before_purge[6]
|
||||||
|
|
||||||
|
# now we should only have 3 statistics runs left
|
||||||
|
runs = statistics_runs.all()
|
||||||
|
assert runs[0] == statistic_runs_before_purge[0]
|
||||||
|
assert runs[1] == statistic_runs_before_purge[5]
|
||||||
|
assert runs[2] == statistic_runs_before_purge[6]
|
||||||
|
|
||||||
assert "EVENT_TEST_PURGE" not in (event.event_type for event in events.all())
|
assert "EVENT_TEST_PURGE" not in (event.event_type for event in events.all())
|
||||||
|
|
||||||
# run purge method - correct service data, with repack
|
# run purge method - correct service data, with repack
|
||||||
|
@ -952,6 +997,35 @@ async def _add_test_events(hass: HomeAssistant, instance: recorder.Recorder):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _add_test_statistics(hass: HomeAssistant, instance: recorder.Recorder):
|
||||||
|
"""Add multiple statistics to the db for testing."""
|
||||||
|
utcnow = dt_util.utcnow()
|
||||||
|
five_days_ago = utcnow - timedelta(days=5)
|
||||||
|
eleven_days_ago = utcnow - timedelta(days=11)
|
||||||
|
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
await async_wait_recording_done(hass, instance)
|
||||||
|
|
||||||
|
with recorder.session_scope(hass=hass) as session:
|
||||||
|
for event_id in range(6):
|
||||||
|
if event_id < 2:
|
||||||
|
timestamp = eleven_days_ago
|
||||||
|
state = "-11"
|
||||||
|
elif event_id < 4:
|
||||||
|
timestamp = five_days_ago
|
||||||
|
state = "-5"
|
||||||
|
else:
|
||||||
|
timestamp = utcnow
|
||||||
|
state = "0"
|
||||||
|
|
||||||
|
session.add(
|
||||||
|
StatisticsShortTerm(
|
||||||
|
start=timestamp,
|
||||||
|
state=state,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def _add_test_recorder_runs(hass: HomeAssistant, instance: recorder.Recorder):
|
async def _add_test_recorder_runs(hass: HomeAssistant, instance: recorder.Recorder):
|
||||||
"""Add a few recorder_runs for testing."""
|
"""Add a few recorder_runs for testing."""
|
||||||
utcnow = dt_util.utcnow()
|
utcnow = dt_util.utcnow()
|
||||||
|
@ -979,6 +1053,31 @@ async def _add_test_recorder_runs(hass: HomeAssistant, instance: recorder.Record
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _add_test_statistics_runs(hass: HomeAssistant, instance: recorder.Recorder):
|
||||||
|
"""Add a few recorder_runs for testing."""
|
||||||
|
utcnow = dt_util.utcnow()
|
||||||
|
five_days_ago = utcnow - timedelta(days=5)
|
||||||
|
eleven_days_ago = utcnow - timedelta(days=11)
|
||||||
|
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
await async_wait_recording_done(hass, instance)
|
||||||
|
|
||||||
|
with recorder.session_scope(hass=hass) as session:
|
||||||
|
for rec_id in range(6):
|
||||||
|
if rec_id < 2:
|
||||||
|
timestamp = eleven_days_ago
|
||||||
|
elif rec_id < 4:
|
||||||
|
timestamp = five_days_ago
|
||||||
|
else:
|
||||||
|
timestamp = utcnow
|
||||||
|
|
||||||
|
session.add(
|
||||||
|
StatisticsRuns(
|
||||||
|
start=timestamp,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _add_state_and_state_changed_event(
|
def _add_state_and_state_changed_event(
|
||||||
session: Session,
|
session: Session,
|
||||||
entity_id: str,
|
entity_id: str,
|
||||||
|
|
Loading…
Reference in New Issue