Add apply_filter attribute to recorder.purge service (#45826)

pull/47765/head
Marc Mueller 2021-03-12 04:03:30 +01:00 committed by GitHub
parent 66605b5994
commit 92852b9c10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 522 additions and 8 deletions

View File

@ -52,11 +52,13 @@ SERVICE_DISABLE = "disable"
ATTR_KEEP_DAYS = "keep_days"
ATTR_REPACK = "repack"
ATTR_APPLY_FILTER = "apply_filter"
SERVICE_PURGE_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_KEEP_DAYS): cv.positive_int,
vol.Optional(ATTR_REPACK, default=False): cv.boolean,
vol.Optional(ATTR_APPLY_FILTER, default=False): cv.boolean,
}
)
SERVICE_ENABLE_SCHEMA = vol.Schema({})
@ -227,6 +229,7 @@ class PurgeTask(NamedTuple):
keep_days: int
repack: bool
apply_filter: bool
class WaitTask:
@ -309,8 +312,9 @@ class Recorder(threading.Thread):
"""Trigger an adhoc purge retaining keep_days worth of data."""
keep_days = kwargs.get(ATTR_KEEP_DAYS, self.keep_days)
repack = kwargs.get(ATTR_REPACK)
apply_filter = kwargs.get(ATTR_APPLY_FILTER)
self.queue.put(PurgeTask(keep_days, repack))
self.queue.put(PurgeTask(keep_days, repack, apply_filter))
def run(self):
"""Start processing events to save."""
@ -364,7 +368,9 @@ class Recorder(threading.Thread):
@callback
def async_purge(now):
"""Trigger the purge."""
self.queue.put(PurgeTask(self.keep_days, repack=False))
self.queue.put(
PurgeTask(self.keep_days, repack=False, apply_filter=False)
)
# Purge every night at 4:12am
self.hass.helpers.event.track_time_change(
@ -425,8 +431,12 @@ class Recorder(threading.Thread):
"""Process one event."""
if isinstance(event, PurgeTask):
# Schedule a new purge task if this one didn't finish
if not purge.purge_old_data(self, event.keep_days, event.repack):
self.queue.put(PurgeTask(event.keep_days, event.repack))
if not purge.purge_old_data(
self, event.keep_days, event.repack, event.apply_filter
):
self.queue.put(
PurgeTask(event.keep_days, event.repack, event.apply_filter)
)
return
if isinstance(event, WaitTask):
self._queue_watch.set()

View File

@ -8,6 +8,7 @@ from typing import TYPE_CHECKING
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import distinct
import homeassistant.util.dt as dt_util
@ -22,7 +23,9 @@ if TYPE_CHECKING:
_LOGGER = logging.getLogger(__name__)
def purge_old_data(instance: Recorder, purge_days: int, repack: bool) -> bool:
def purge_old_data(
instance: Recorder, purge_days: int, repack: bool, apply_filter: bool = False
) -> bool:
"""Purge events and states older than purge_days ago.
Cleans up an timeframe of an hour, based on the oldest record.
@ -45,6 +48,9 @@ def purge_old_data(instance: Recorder, purge_days: int, repack: bool) -> bool:
# return false, as we are not done yet.
_LOGGER.debug("Purging hasn't fully completed yet")
return False
if apply_filter and _purge_filtered_data(instance, session) is False:
_LOGGER.debug("Cleanup filtered data hasn't fully completed yet")
return False
_purge_old_recorder_runs(instance, session, purge_before)
if repack:
repack_database(instance)
@ -140,3 +146,70 @@ def _purge_old_recorder_runs(
.delete(synchronize_session=False)
)
_LOGGER.debug("Deleted %s recorder_runs", deleted_rows)
def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
"""Remove filtered states and events that shouldn't be in the database."""
_LOGGER.debug("Cleanup filtered data")
# Check if excluded entity_ids are in database
excluded_entity_ids: list[str] = [
entity_id
for (entity_id,) in session.query(distinct(States.entity_id)).all()
if not instance.entity_filter(entity_id)
]
if len(excluded_entity_ids) > 0:
_purge_filtered_states(session, excluded_entity_ids)
return False
# Check if excluded event_types are in database
excluded_event_types: list[str] = [
event_type
for (event_type,) in session.query(distinct(Events.event_type)).all()
if event_type in instance.exclude_t
]
if len(excluded_event_types) > 0:
_purge_filtered_events(session, excluded_event_types)
return False
return True
def _purge_filtered_states(session: Session, excluded_entity_ids: list[str]) -> None:
"""Remove filtered states and linked events."""
state_ids: list[int]
event_ids: list[int | None]
state_ids, event_ids = zip(
*(
session.query(States.state_id, States.event_id)
.filter(States.entity_id.in_(excluded_entity_ids))
.limit(MAX_ROWS_TO_PURGE)
.all()
)
)
event_ids = [id_ for id_ in event_ids if id_ is not None]
_LOGGER.debug(
"Selected %s state_ids to remove that should be filtered", len(state_ids)
)
_purge_state_ids(session, state_ids)
_purge_event_ids(session, event_ids) # type: ignore # type of event_ids already narrowed to 'list[int]'
def _purge_filtered_events(session: Session, excluded_event_types: list[str]) -> None:
"""Remove filtered events and linked states."""
events: list[Events] = (
session.query(Events.event_id)
.filter(Events.event_type.in_(excluded_event_types))
.limit(MAX_ROWS_TO_PURGE)
.all()
)
event_ids: list[int] = [event.event_id for event in events]
_LOGGER.debug(
"Selected %s event_ids to remove that should be filtered", len(event_ids)
)
states: list[States] = (
session.query(States.state_id).filter(States.event_id.in_(event_ids)).all()
)
state_ids: list[int] = [state.state_id for state in states]
_purge_state_ids(session, state_ids)
_purge_event_ids(session, event_ids)

View File

@ -25,6 +25,14 @@ purge:
selector:
boolean:
apply_filter:
name: Apply filter
description: Apply entity_id and event_type filter in addition to time based purge.
example: true
default: false
selector:
boolean:
disable:
description: Stop the recording of events and state changes

View File

@ -1,15 +1,18 @@
"""Test data purging."""
from datetime import timedelta
from datetime import datetime, timedelta
import json
from sqlalchemy.orm.session import Session
from homeassistant.components import recorder
from homeassistant.components.recorder.models import Events, RecorderRuns, States
from homeassistant.components.recorder.purge import purge_old_data
from homeassistant.components.recorder.util import session_scope
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.const import EVENT_STATE_CHANGED
from homeassistant.helpers.typing import ConfigType, HomeAssistantType
from homeassistant.util import dt as dt_util
from .common import async_wait_recording_done
from .common import async_recorder_block_till_done, async_wait_recording_done
from .conftest import SetupRecorderInstanceT
@ -154,6 +157,394 @@ async def test_purge_method(
assert "Vacuuming SQL DB to free space" in caplog.text
async def test_purge_edge_case(
hass: HomeAssistantType,
async_setup_recorder_instance: SetupRecorderInstanceT,
):
"""Test states and events are purged even if they occurred shortly before purge_before."""
async def _add_db_entries(hass: HomeAssistantType, timestamp: datetime) -> None:
with recorder.session_scope(hass=hass) as session:
session.add(
Events(
event_id=1001,
event_type="EVENT_TEST_PURGE",
event_data="{}",
origin="LOCAL",
created=timestamp,
time_fired=timestamp,
)
)
session.add(
States(
entity_id="test.recorder2",
domain="sensor",
state="purgeme",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
created=timestamp,
event_id=1001,
)
)
instance = await async_setup_recorder_instance(hass, None)
await async_wait_recording_done(hass, instance)
service_data = {"keep_days": 2}
timestamp = dt_util.utcnow() - timedelta(days=2, minutes=1)
await _add_db_entries(hass, timestamp)
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 1
events = session.query(Events).filter(Events.event_type == "EVENT_TEST_PURGE")
assert events.count() == 1
await hass.services.async_call(
recorder.DOMAIN, recorder.SERVICE_PURGE, service_data
)
await hass.async_block_till_done()
await async_recorder_block_till_done(hass, instance)
await async_wait_recording_done(hass, instance)
assert states.count() == 0
assert events.count() == 0
async def test_purge_filtered_states(
hass: HomeAssistantType,
async_setup_recorder_instance: SetupRecorderInstanceT,
):
"""Test filtered states are purged."""
config: ConfigType = {"exclude": {"entities": ["sensor.excluded"]}}
instance = await async_setup_recorder_instance(hass, config)
assert instance.entity_filter("sensor.excluded") is False
def _add_db_entries(hass: HomeAssistantType) -> None:
with recorder.session_scope(hass=hass) as session:
# Add states and state_changed events that should be purged
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(1000, 1020):
_add_state_and_state_changed_event(
session,
"sensor.excluded",
"purgeme",
timestamp,
event_id * days,
)
# Add state **without** state_changed event that should be purged
timestamp = dt_util.utcnow() - timedelta(days=1)
session.add(
States(
entity_id="sensor.excluded",
domain="sensor",
state="purgeme",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
created=timestamp,
)
)
# Add states and state_changed events that should be keeped
timestamp = dt_util.utcnow() - timedelta(days=2)
for event_id in range(200, 210):
_add_state_and_state_changed_event(
session,
"sensor.keep",
"keep",
timestamp,
event_id,
)
# Add states with linked old_state_ids that need to be handled
timestamp = dt_util.utcnow() - timedelta(days=0)
state_1 = States(
entity_id="sensor.linked_old_state_id",
domain="sensor",
state="keep",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
created=timestamp,
old_state_id=1,
)
timestamp = dt_util.utcnow() - timedelta(days=4)
state_2 = States(
entity_id="sensor.linked_old_state_id",
domain="sensor",
state="keep",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
created=timestamp,
old_state_id=2,
)
state_3 = States(
entity_id="sensor.linked_old_state_id",
domain="sensor",
state="keep",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
created=timestamp,
old_state_id=62, # keep
)
session.add_all((state_1, state_2, state_3))
# Add event that should be keeped
session.add(
Events(
event_id=100,
event_type="EVENT_KEEP",
event_data="{}",
origin="LOCAL",
created=timestamp,
time_fired=timestamp,
)
)
service_data = {"keep_days": 10}
_add_db_entries(hass)
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 74
events_state_changed = session.query(Events).filter(
Events.event_type == EVENT_STATE_CHANGED
)
events_keep = session.query(Events).filter(Events.event_type == "EVENT_KEEP")
assert events_state_changed.count() == 70
assert events_keep.count() == 1
# Normal purge doesn't remove excluded entities
await hass.services.async_call(
recorder.DOMAIN, recorder.SERVICE_PURGE, service_data
)
await hass.async_block_till_done()
await async_recorder_block_till_done(hass, instance)
await async_wait_recording_done(hass, instance)
assert states.count() == 74
assert events_state_changed.count() == 70
assert events_keep.count() == 1
# Test with 'apply_filter' = True
service_data["apply_filter"] = True
await hass.services.async_call(
recorder.DOMAIN, recorder.SERVICE_PURGE, service_data
)
await hass.async_block_till_done()
await async_recorder_block_till_done(hass, instance)
await async_wait_recording_done(hass, instance)
await async_recorder_block_till_done(hass, instance)
await async_wait_recording_done(hass, instance)
assert states.count() == 13
assert events_state_changed.count() == 10
assert events_keep.count() == 1
states_sensor_excluded = session.query(States).filter(
States.entity_id == "sensor.excluded"
)
assert states_sensor_excluded.count() == 0
session.query(States).get(71).old_state_id is None
session.query(States).get(72).old_state_id is None
session.query(States).get(73).old_state_id == 62 # should have been keeped
async def test_purge_filtered_events(
hass: HomeAssistantType,
async_setup_recorder_instance: SetupRecorderInstanceT,
):
"""Test filtered events are purged."""
config: ConfigType = {"exclude": {"event_types": ["EVENT_PURGE"]}}
instance = await async_setup_recorder_instance(hass, config)
def _add_db_entries(hass: HomeAssistantType) -> None:
with recorder.session_scope(hass=hass) as session:
# Add events that should be purged
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(1000, 1020):
session.add(
Events(
event_id=event_id * days,
event_type="EVENT_PURGE",
event_data="{}",
origin="LOCAL",
created=timestamp,
time_fired=timestamp,
)
)
# Add states and state_changed events that should be keeped
timestamp = dt_util.utcnow() - timedelta(days=1)
for event_id in range(200, 210):
_add_state_and_state_changed_event(
session,
"sensor.keep",
"keep",
timestamp,
event_id,
)
service_data = {"keep_days": 10}
_add_db_entries(hass)
with session_scope(hass=hass) as session:
events_purge = session.query(Events).filter(Events.event_type == "EVENT_PURGE")
events_keep = session.query(Events).filter(
Events.event_type == EVENT_STATE_CHANGED
)
states = session.query(States)
assert events_purge.count() == 60
assert events_keep.count() == 10
assert states.count() == 10
# Normal purge doesn't remove excluded events
await hass.services.async_call(
recorder.DOMAIN, recorder.SERVICE_PURGE, service_data
)
await hass.async_block_till_done()
await async_recorder_block_till_done(hass, instance)
await async_wait_recording_done(hass, instance)
assert events_purge.count() == 60
assert events_keep.count() == 10
assert states.count() == 10
# Test with 'apply_filter' = True
service_data["apply_filter"] = True
await hass.services.async_call(
recorder.DOMAIN, recorder.SERVICE_PURGE, service_data
)
await hass.async_block_till_done()
await async_recorder_block_till_done(hass, instance)
await async_wait_recording_done(hass, instance)
await async_recorder_block_till_done(hass, instance)
await async_wait_recording_done(hass, instance)
assert events_purge.count() == 0
assert events_keep.count() == 10
assert states.count() == 10
async def test_purge_filtered_events_state_changed(
hass: HomeAssistantType,
async_setup_recorder_instance: SetupRecorderInstanceT,
):
"""Test filtered state_changed events are purged. This should also remove all states."""
config: ConfigType = {"exclude": {"event_types": [EVENT_STATE_CHANGED]}}
instance = await async_setup_recorder_instance(hass, config)
# Assert entity_id is NOT excluded
assert instance.entity_filter("sensor.excluded") is True
def _add_db_entries(hass: HomeAssistantType) -> None:
with recorder.session_scope(hass=hass) as session:
# Add states and state_changed events that should be purged
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(1000, 1020):
_add_state_and_state_changed_event(
session,
"sensor.excluded",
"purgeme",
timestamp,
event_id * days,
)
# Add events that should be keeped
timestamp = dt_util.utcnow() - timedelta(days=1)
for event_id in range(200, 210):
session.add(
Events(
event_id=event_id,
event_type="EVENT_KEEP",
event_data="{}",
origin="LOCAL",
created=timestamp,
time_fired=timestamp,
)
)
# Add states with linked old_state_ids that need to be handled
timestamp = dt_util.utcnow() - timedelta(days=0)
state_1 = States(
entity_id="sensor.linked_old_state_id",
domain="sensor",
state="keep",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
created=timestamp,
old_state_id=1,
)
timestamp = dt_util.utcnow() - timedelta(days=4)
state_2 = States(
entity_id="sensor.linked_old_state_id",
domain="sensor",
state="keep",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
created=timestamp,
old_state_id=2,
)
state_3 = States(
entity_id="sensor.linked_old_state_id",
domain="sensor",
state="keep",
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
created=timestamp,
old_state_id=62, # keep
)
session.add_all((state_1, state_2, state_3))
service_data = {"keep_days": 10, "apply_filter": True}
_add_db_entries(hass)
with session_scope(hass=hass) as session:
events_keep = session.query(Events).filter(Events.event_type == "EVENT_KEEP")
events_purge = session.query(Events).filter(
Events.event_type == EVENT_STATE_CHANGED
)
states = session.query(States)
assert events_keep.count() == 10
assert events_purge.count() == 60
assert states.count() == 63
await hass.services.async_call(
recorder.DOMAIN, recorder.SERVICE_PURGE, service_data
)
await hass.async_block_till_done()
await async_recorder_block_till_done(hass, instance)
await async_wait_recording_done(hass, instance)
await async_recorder_block_till_done(hass, instance)
await async_wait_recording_done(hass, instance)
assert events_keep.count() == 10
assert events_purge.count() == 0
assert states.count() == 3
session.query(States).get(61).old_state_id is None
session.query(States).get(62).old_state_id is None
session.query(States).get(63).old_state_id == 62 # should have been keeped
async def _add_test_states(hass: HomeAssistantType, instance: recorder.Recorder):
"""Add multiple states to the db for testing."""
utcnow = dt_util.utcnow()
@ -260,3 +651,35 @@ async def _add_test_recorder_runs(hass: HomeAssistantType, instance: recorder.Re
end=timestamp + timedelta(days=1),
)
)
def _add_state_and_state_changed_event(
session: Session,
entity_id: str,
state: str,
timestamp: datetime,
event_id: int,
) -> None:
"""Add state and state_changed event to database for testing."""
session.add(
States(
entity_id=entity_id,
domain="sensor",
state=state,
attributes="{}",
last_changed=timestamp,
last_updated=timestamp,
created=timestamp,
event_id=event_id,
)
)
session.add(
Events(
event_id=event_id,
event_type=EVENT_STATE_CHANGED,
event_data="{}",
origin="LOCAL",
created=timestamp,
time_fired=timestamp,
)
)