From 92852b9c10e9454f713f1ffc68fe6c87bf686b27 Mon Sep 17 00:00:00 2001 From: Marc Mueller <30130371+cdce8p@users.noreply.github.com> Date: Fri, 12 Mar 2021 04:03:30 +0100 Subject: [PATCH] Add apply_filter attribute to recorder.purge service (#45826) --- homeassistant/components/recorder/__init__.py | 18 +- homeassistant/components/recorder/purge.py | 75 ++- .../components/recorder/services.yaml | 8 + tests/components/recorder/test_purge.py | 429 +++++++++++++++++- 4 files changed, 522 insertions(+), 8 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 9b84518b6d3..f8f95fd7ccc 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -52,11 +52,13 @@ SERVICE_DISABLE = "disable" ATTR_KEEP_DAYS = "keep_days" ATTR_REPACK = "repack" +ATTR_APPLY_FILTER = "apply_filter" SERVICE_PURGE_SCHEMA = vol.Schema( { vol.Optional(ATTR_KEEP_DAYS): cv.positive_int, vol.Optional(ATTR_REPACK, default=False): cv.boolean, + vol.Optional(ATTR_APPLY_FILTER, default=False): cv.boolean, } ) SERVICE_ENABLE_SCHEMA = vol.Schema({}) @@ -227,6 +229,7 @@ class PurgeTask(NamedTuple): keep_days: int repack: bool + apply_filter: bool class WaitTask: @@ -309,8 +312,9 @@ class Recorder(threading.Thread): """Trigger an adhoc purge retaining keep_days worth of data.""" keep_days = kwargs.get(ATTR_KEEP_DAYS, self.keep_days) repack = kwargs.get(ATTR_REPACK) + apply_filter = kwargs.get(ATTR_APPLY_FILTER) - self.queue.put(PurgeTask(keep_days, repack)) + self.queue.put(PurgeTask(keep_days, repack, apply_filter)) def run(self): """Start processing events to save.""" @@ -364,7 +368,9 @@ class Recorder(threading.Thread): @callback def async_purge(now): """Trigger the purge.""" - self.queue.put(PurgeTask(self.keep_days, repack=False)) + self.queue.put( + PurgeTask(self.keep_days, repack=False, apply_filter=False) + ) # Purge every night at 4:12am self.hass.helpers.event.track_time_change( @@ -425,8 +431,12 @@ class Recorder(threading.Thread): """Process one event.""" if isinstance(event, PurgeTask): # Schedule a new purge task if this one didn't finish - if not purge.purge_old_data(self, event.keep_days, event.repack): - self.queue.put(PurgeTask(event.keep_days, event.repack)) + if not purge.purge_old_data( + self, event.keep_days, event.repack, event.apply_filter + ): + self.queue.put( + PurgeTask(event.keep_days, event.repack, event.apply_filter) + ) return if isinstance(event, WaitTask): self._queue_watch.set() diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index 3717ed49f30..ef626a744c4 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -8,6 +8,7 @@ from typing import TYPE_CHECKING from sqlalchemy.exc import OperationalError, SQLAlchemyError from sqlalchemy.orm.session import Session +from sqlalchemy.sql.expression import distinct import homeassistant.util.dt as dt_util @@ -22,7 +23,9 @@ if TYPE_CHECKING: _LOGGER = logging.getLogger(__name__) -def purge_old_data(instance: Recorder, purge_days: int, repack: bool) -> bool: +def purge_old_data( + instance: Recorder, purge_days: int, repack: bool, apply_filter: bool = False +) -> bool: """Purge events and states older than purge_days ago. Cleans up an timeframe of an hour, based on the oldest record. @@ -45,6 +48,9 @@ def purge_old_data(instance: Recorder, purge_days: int, repack: bool) -> bool: # return false, as we are not done yet. _LOGGER.debug("Purging hasn't fully completed yet") return False + if apply_filter and _purge_filtered_data(instance, session) is False: + _LOGGER.debug("Cleanup filtered data hasn't fully completed yet") + return False _purge_old_recorder_runs(instance, session, purge_before) if repack: repack_database(instance) @@ -140,3 +146,70 @@ def _purge_old_recorder_runs( .delete(synchronize_session=False) ) _LOGGER.debug("Deleted %s recorder_runs", deleted_rows) + + +def _purge_filtered_data(instance: Recorder, session: Session) -> bool: + """Remove filtered states and events that shouldn't be in the database.""" + _LOGGER.debug("Cleanup filtered data") + + # Check if excluded entity_ids are in database + excluded_entity_ids: list[str] = [ + entity_id + for (entity_id,) in session.query(distinct(States.entity_id)).all() + if not instance.entity_filter(entity_id) + ] + if len(excluded_entity_ids) > 0: + _purge_filtered_states(session, excluded_entity_ids) + return False + + # Check if excluded event_types are in database + excluded_event_types: list[str] = [ + event_type + for (event_type,) in session.query(distinct(Events.event_type)).all() + if event_type in instance.exclude_t + ] + if len(excluded_event_types) > 0: + _purge_filtered_events(session, excluded_event_types) + return False + + return True + + +def _purge_filtered_states(session: Session, excluded_entity_ids: list[str]) -> None: + """Remove filtered states and linked events.""" + state_ids: list[int] + event_ids: list[int | None] + state_ids, event_ids = zip( + *( + session.query(States.state_id, States.event_id) + .filter(States.entity_id.in_(excluded_entity_ids)) + .limit(MAX_ROWS_TO_PURGE) + .all() + ) + ) + event_ids = [id_ for id_ in event_ids if id_ is not None] + _LOGGER.debug( + "Selected %s state_ids to remove that should be filtered", len(state_ids) + ) + _purge_state_ids(session, state_ids) + _purge_event_ids(session, event_ids) # type: ignore # type of event_ids already narrowed to 'list[int]' + + +def _purge_filtered_events(session: Session, excluded_event_types: list[str]) -> None: + """Remove filtered events and linked states.""" + events: list[Events] = ( + session.query(Events.event_id) + .filter(Events.event_type.in_(excluded_event_types)) + .limit(MAX_ROWS_TO_PURGE) + .all() + ) + event_ids: list[int] = [event.event_id for event in events] + _LOGGER.debug( + "Selected %s event_ids to remove that should be filtered", len(event_ids) + ) + states: list[States] = ( + session.query(States.state_id).filter(States.event_id.in_(event_ids)).all() + ) + state_ids: list[int] = [state.state_id for state in states] + _purge_state_ids(session, state_ids) + _purge_event_ids(session, event_ids) diff --git a/homeassistant/components/recorder/services.yaml b/homeassistant/components/recorder/services.yaml index 2be5b0e095e..2c4f35b5e7a 100644 --- a/homeassistant/components/recorder/services.yaml +++ b/homeassistant/components/recorder/services.yaml @@ -25,6 +25,14 @@ purge: selector: boolean: + apply_filter: + name: Apply filter + description: Apply entity_id and event_type filter in addition to time based purge. + example: true + default: false + selector: + boolean: + disable: description: Stop the recording of events and state changes diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index 3535a58d33d..db3906595db 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -1,15 +1,18 @@ """Test data purging.""" -from datetime import timedelta +from datetime import datetime, timedelta import json +from sqlalchemy.orm.session import Session + from homeassistant.components import recorder from homeassistant.components.recorder.models import Events, RecorderRuns, States from homeassistant.components.recorder.purge import purge_old_data from homeassistant.components.recorder.util import session_scope -from homeassistant.helpers.typing import HomeAssistantType +from homeassistant.const import EVENT_STATE_CHANGED +from homeassistant.helpers.typing import ConfigType, HomeAssistantType from homeassistant.util import dt as dt_util -from .common import async_wait_recording_done +from .common import async_recorder_block_till_done, async_wait_recording_done from .conftest import SetupRecorderInstanceT @@ -154,6 +157,394 @@ async def test_purge_method( assert "Vacuuming SQL DB to free space" in caplog.text +async def test_purge_edge_case( + hass: HomeAssistantType, + async_setup_recorder_instance: SetupRecorderInstanceT, +): + """Test states and events are purged even if they occurred shortly before purge_before.""" + + async def _add_db_entries(hass: HomeAssistantType, timestamp: datetime) -> None: + with recorder.session_scope(hass=hass) as session: + session.add( + Events( + event_id=1001, + event_type="EVENT_TEST_PURGE", + event_data="{}", + origin="LOCAL", + created=timestamp, + time_fired=timestamp, + ) + ) + session.add( + States( + entity_id="test.recorder2", + domain="sensor", + state="purgeme", + attributes="{}", + last_changed=timestamp, + last_updated=timestamp, + created=timestamp, + event_id=1001, + ) + ) + + instance = await async_setup_recorder_instance(hass, None) + await async_wait_recording_done(hass, instance) + + service_data = {"keep_days": 2} + timestamp = dt_util.utcnow() - timedelta(days=2, minutes=1) + + await _add_db_entries(hass, timestamp) + with session_scope(hass=hass) as session: + states = session.query(States) + assert states.count() == 1 + + events = session.query(Events).filter(Events.event_type == "EVENT_TEST_PURGE") + assert events.count() == 1 + + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, service_data + ) + await hass.async_block_till_done() + + await async_recorder_block_till_done(hass, instance) + await async_wait_recording_done(hass, instance) + + assert states.count() == 0 + assert events.count() == 0 + + +async def test_purge_filtered_states( + hass: HomeAssistantType, + async_setup_recorder_instance: SetupRecorderInstanceT, +): + """Test filtered states are purged.""" + config: ConfigType = {"exclude": {"entities": ["sensor.excluded"]}} + instance = await async_setup_recorder_instance(hass, config) + assert instance.entity_filter("sensor.excluded") is False + + def _add_db_entries(hass: HomeAssistantType) -> None: + with recorder.session_scope(hass=hass) as session: + # Add states and state_changed events that should be purged + for days in range(1, 4): + timestamp = dt_util.utcnow() - timedelta(days=days) + for event_id in range(1000, 1020): + _add_state_and_state_changed_event( + session, + "sensor.excluded", + "purgeme", + timestamp, + event_id * days, + ) + # Add state **without** state_changed event that should be purged + timestamp = dt_util.utcnow() - timedelta(days=1) + session.add( + States( + entity_id="sensor.excluded", + domain="sensor", + state="purgeme", + attributes="{}", + last_changed=timestamp, + last_updated=timestamp, + created=timestamp, + ) + ) + # Add states and state_changed events that should be keeped + timestamp = dt_util.utcnow() - timedelta(days=2) + for event_id in range(200, 210): + _add_state_and_state_changed_event( + session, + "sensor.keep", + "keep", + timestamp, + event_id, + ) + # Add states with linked old_state_ids that need to be handled + timestamp = dt_util.utcnow() - timedelta(days=0) + state_1 = States( + entity_id="sensor.linked_old_state_id", + domain="sensor", + state="keep", + attributes="{}", + last_changed=timestamp, + last_updated=timestamp, + created=timestamp, + old_state_id=1, + ) + timestamp = dt_util.utcnow() - timedelta(days=4) + state_2 = States( + entity_id="sensor.linked_old_state_id", + domain="sensor", + state="keep", + attributes="{}", + last_changed=timestamp, + last_updated=timestamp, + created=timestamp, + old_state_id=2, + ) + state_3 = States( + entity_id="sensor.linked_old_state_id", + domain="sensor", + state="keep", + attributes="{}", + last_changed=timestamp, + last_updated=timestamp, + created=timestamp, + old_state_id=62, # keep + ) + session.add_all((state_1, state_2, state_3)) + # Add event that should be keeped + session.add( + Events( + event_id=100, + event_type="EVENT_KEEP", + event_data="{}", + origin="LOCAL", + created=timestamp, + time_fired=timestamp, + ) + ) + + service_data = {"keep_days": 10} + _add_db_entries(hass) + + with session_scope(hass=hass) as session: + states = session.query(States) + assert states.count() == 74 + + events_state_changed = session.query(Events).filter( + Events.event_type == EVENT_STATE_CHANGED + ) + events_keep = session.query(Events).filter(Events.event_type == "EVENT_KEEP") + assert events_state_changed.count() == 70 + assert events_keep.count() == 1 + + # Normal purge doesn't remove excluded entities + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, service_data + ) + await hass.async_block_till_done() + + await async_recorder_block_till_done(hass, instance) + await async_wait_recording_done(hass, instance) + + assert states.count() == 74 + assert events_state_changed.count() == 70 + assert events_keep.count() == 1 + + # Test with 'apply_filter' = True + service_data["apply_filter"] = True + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, service_data + ) + await hass.async_block_till_done() + + await async_recorder_block_till_done(hass, instance) + await async_wait_recording_done(hass, instance) + + await async_recorder_block_till_done(hass, instance) + await async_wait_recording_done(hass, instance) + + assert states.count() == 13 + assert events_state_changed.count() == 10 + assert events_keep.count() == 1 + + states_sensor_excluded = session.query(States).filter( + States.entity_id == "sensor.excluded" + ) + assert states_sensor_excluded.count() == 0 + + session.query(States).get(71).old_state_id is None + session.query(States).get(72).old_state_id is None + session.query(States).get(73).old_state_id == 62 # should have been keeped + + +async def test_purge_filtered_events( + hass: HomeAssistantType, + async_setup_recorder_instance: SetupRecorderInstanceT, +): + """Test filtered events are purged.""" + config: ConfigType = {"exclude": {"event_types": ["EVENT_PURGE"]}} + instance = await async_setup_recorder_instance(hass, config) + + def _add_db_entries(hass: HomeAssistantType) -> None: + with recorder.session_scope(hass=hass) as session: + # Add events that should be purged + for days in range(1, 4): + timestamp = dt_util.utcnow() - timedelta(days=days) + for event_id in range(1000, 1020): + session.add( + Events( + event_id=event_id * days, + event_type="EVENT_PURGE", + event_data="{}", + origin="LOCAL", + created=timestamp, + time_fired=timestamp, + ) + ) + + # Add states and state_changed events that should be keeped + timestamp = dt_util.utcnow() - timedelta(days=1) + for event_id in range(200, 210): + _add_state_and_state_changed_event( + session, + "sensor.keep", + "keep", + timestamp, + event_id, + ) + + service_data = {"keep_days": 10} + _add_db_entries(hass) + + with session_scope(hass=hass) as session: + events_purge = session.query(Events).filter(Events.event_type == "EVENT_PURGE") + events_keep = session.query(Events).filter( + Events.event_type == EVENT_STATE_CHANGED + ) + states = session.query(States) + + assert events_purge.count() == 60 + assert events_keep.count() == 10 + assert states.count() == 10 + + # Normal purge doesn't remove excluded events + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, service_data + ) + await hass.async_block_till_done() + + await async_recorder_block_till_done(hass, instance) + await async_wait_recording_done(hass, instance) + + assert events_purge.count() == 60 + assert events_keep.count() == 10 + assert states.count() == 10 + + # Test with 'apply_filter' = True + service_data["apply_filter"] = True + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, service_data + ) + await hass.async_block_till_done() + + await async_recorder_block_till_done(hass, instance) + await async_wait_recording_done(hass, instance) + + await async_recorder_block_till_done(hass, instance) + await async_wait_recording_done(hass, instance) + + assert events_purge.count() == 0 + assert events_keep.count() == 10 + assert states.count() == 10 + + +async def test_purge_filtered_events_state_changed( + hass: HomeAssistantType, + async_setup_recorder_instance: SetupRecorderInstanceT, +): + """Test filtered state_changed events are purged. This should also remove all states.""" + config: ConfigType = {"exclude": {"event_types": [EVENT_STATE_CHANGED]}} + instance = await async_setup_recorder_instance(hass, config) + # Assert entity_id is NOT excluded + assert instance.entity_filter("sensor.excluded") is True + + def _add_db_entries(hass: HomeAssistantType) -> None: + with recorder.session_scope(hass=hass) as session: + # Add states and state_changed events that should be purged + for days in range(1, 4): + timestamp = dt_util.utcnow() - timedelta(days=days) + for event_id in range(1000, 1020): + _add_state_and_state_changed_event( + session, + "sensor.excluded", + "purgeme", + timestamp, + event_id * days, + ) + # Add events that should be keeped + timestamp = dt_util.utcnow() - timedelta(days=1) + for event_id in range(200, 210): + session.add( + Events( + event_id=event_id, + event_type="EVENT_KEEP", + event_data="{}", + origin="LOCAL", + created=timestamp, + time_fired=timestamp, + ) + ) + # Add states with linked old_state_ids that need to be handled + timestamp = dt_util.utcnow() - timedelta(days=0) + state_1 = States( + entity_id="sensor.linked_old_state_id", + domain="sensor", + state="keep", + attributes="{}", + last_changed=timestamp, + last_updated=timestamp, + created=timestamp, + old_state_id=1, + ) + timestamp = dt_util.utcnow() - timedelta(days=4) + state_2 = States( + entity_id="sensor.linked_old_state_id", + domain="sensor", + state="keep", + attributes="{}", + last_changed=timestamp, + last_updated=timestamp, + created=timestamp, + old_state_id=2, + ) + state_3 = States( + entity_id="sensor.linked_old_state_id", + domain="sensor", + state="keep", + attributes="{}", + last_changed=timestamp, + last_updated=timestamp, + created=timestamp, + old_state_id=62, # keep + ) + session.add_all((state_1, state_2, state_3)) + + service_data = {"keep_days": 10, "apply_filter": True} + _add_db_entries(hass) + + with session_scope(hass=hass) as session: + events_keep = session.query(Events).filter(Events.event_type == "EVENT_KEEP") + events_purge = session.query(Events).filter( + Events.event_type == EVENT_STATE_CHANGED + ) + states = session.query(States) + + assert events_keep.count() == 10 + assert events_purge.count() == 60 + assert states.count() == 63 + + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, service_data + ) + await hass.async_block_till_done() + + await async_recorder_block_till_done(hass, instance) + await async_wait_recording_done(hass, instance) + + await async_recorder_block_till_done(hass, instance) + await async_wait_recording_done(hass, instance) + + assert events_keep.count() == 10 + assert events_purge.count() == 0 + assert states.count() == 3 + + session.query(States).get(61).old_state_id is None + session.query(States).get(62).old_state_id is None + session.query(States).get(63).old_state_id == 62 # should have been keeped + + async def _add_test_states(hass: HomeAssistantType, instance: recorder.Recorder): """Add multiple states to the db for testing.""" utcnow = dt_util.utcnow() @@ -260,3 +651,35 @@ async def _add_test_recorder_runs(hass: HomeAssistantType, instance: recorder.Re end=timestamp + timedelta(days=1), ) ) + + +def _add_state_and_state_changed_event( + session: Session, + entity_id: str, + state: str, + timestamp: datetime, + event_id: int, +) -> None: + """Add state and state_changed event to database for testing.""" + session.add( + States( + entity_id=entity_id, + domain="sensor", + state=state, + attributes="{}", + last_changed=timestamp, + last_updated=timestamp, + created=timestamp, + event_id=event_id, + ) + ) + session.add( + Events( + event_id=event_id, + event_type=EVENT_STATE_CHANGED, + event_data="{}", + origin="LOCAL", + created=timestamp, + time_fired=timestamp, + ) + )