Evict purged states from recorder's old_state cache (#56877)

Co-authored-by: J. Nick Koston <nick@koston.org>
pull/57065/head
Erik Montnemery 2021-10-04 17:21:40 +02:00 committed by GitHub
parent 12c32ac806
commit 8567aa9e13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 81 additions and 50 deletions

View File

@ -38,7 +38,8 @@ def purge_old_data(
event_ids = _select_event_ids_to_purge(session, purge_before)
state_ids = _select_state_ids_to_purge(session, purge_before, event_ids)
if state_ids:
_purge_state_ids(session, state_ids)
_purge_state_ids(instance, session, state_ids)
if event_ids:
_purge_event_ids(session, event_ids)
# If states or events purging isn't processing the purge_before yet,
@ -68,10 +69,10 @@ def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list
def _select_state_ids_to_purge(
session: Session, purge_before: datetime, event_ids: list[int]
) -> list[int]:
) -> set[int]:
"""Return a list of state ids to purge."""
if not event_ids:
return []
return set()
states = (
session.query(States.state_id)
.filter(States.last_updated < purge_before)
@ -79,10 +80,10 @@ def _select_state_ids_to_purge(
.all()
)
_LOGGER.debug("Selected %s state ids to remove", len(states))
return [state.state_id for state in states]
return {state.state_id for state in states}
def _purge_state_ids(session: Session, state_ids: list[int]) -> None:
def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int]) -> None:
"""Disconnect states and delete by state id."""
# Update old_state_id to NULL before deleting to ensure
@ -103,6 +104,26 @@ def _purge_state_ids(session: Session, state_ids: list[int]) -> None:
)
_LOGGER.debug("Deleted %s states", deleted_rows)
# Evict eny entries in the old_states cache referring to a purged state
_evict_purged_states_from_old_states_cache(instance, state_ids)
def _evict_purged_states_from_old_states_cache(
instance: Recorder, purged_state_ids: set[int]
) -> None:
"""Evict purged states from the old states cache."""
# Make a map from old_state_id to entity_id
old_states = instance._old_states # pylint: disable=protected-access
old_state_reversed = {
old_state.state_id: entity_id
for entity_id, old_state in old_states.items()
if old_state.state_id
}
# Evict any purged state from the old states cache
for purged_state_id in purged_state_ids.intersection(old_state_reversed):
old_states.pop(old_state_reversed[purged_state_id], None)
def _purge_event_ids(session: Session, event_ids: list[int]) -> None:
"""Delete by event id."""
@ -139,7 +160,7 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
if not instance.entity_filter(entity_id)
]
if len(excluded_entity_ids) > 0:
_purge_filtered_states(session, excluded_entity_ids)
_purge_filtered_states(instance, session, excluded_entity_ids)
return False
# Check if excluded event_types are in database
@ -149,13 +170,15 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
if event_type in instance.exclude_t
]
if len(excluded_event_types) > 0:
_purge_filtered_events(session, excluded_event_types)
_purge_filtered_events(instance, session, excluded_event_types)
return False
return True
def _purge_filtered_states(session: Session, excluded_entity_ids: list[str]) -> None:
def _purge_filtered_states(
instance: Recorder, session: Session, excluded_entity_ids: list[str]
) -> None:
"""Remove filtered states and linked events."""
state_ids: list[int]
event_ids: list[int | None]
@ -171,11 +194,13 @@ def _purge_filtered_states(session: Session, excluded_entity_ids: list[str]) ->
_LOGGER.debug(
"Selected %s state_ids to remove that should be filtered", len(state_ids)
)
_purge_state_ids(session, state_ids)
_purge_state_ids(instance, session, set(state_ids))
_purge_event_ids(session, event_ids) # type: ignore # type of event_ids already narrowed to 'list[int]'
def _purge_filtered_events(session: Session, excluded_event_types: list[str]) -> None:
def _purge_filtered_events(
instance: Recorder, session: Session, excluded_event_types: list[str]
) -> None:
"""Remove filtered events and linked states."""
events: list[Events] = (
session.query(Events.event_id)
@ -190,8 +215,8 @@ def _purge_filtered_events(session: Session, excluded_event_types: list[str]) ->
states: list[States] = (
session.query(States.state_id).filter(States.event_id.in_(event_ids)).all()
)
state_ids: list[int] = [state.state_id for state in states]
_purge_state_ids(session, state_ids)
state_ids: set[int] = {state.state_id for state in states}
_purge_state_ids(instance, session, state_ids)
_purge_event_ids(session, event_ids)
@ -207,7 +232,7 @@ def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool])
_LOGGER.debug("Purging entity data for %s", selected_entity_ids)
if len(selected_entity_ids) > 0:
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
_purge_filtered_states(session, selected_entity_ids)
_purge_filtered_states(instance, session, selected_entity_ids)
_LOGGER.debug("Purging entity data hasn't fully completed yet")
return False

View File

@ -44,6 +44,7 @@ async def test_purge_old_states(
events = session.query(Events).filter(Events.event_type == "state_changed")
assert events.count() == 6
assert "test.recorder2" in instance._old_states
purge_before = dt_util.utcnow() - timedelta(days=4)
@ -51,6 +52,7 @@ async def test_purge_old_states(
finished = purge_old_data(instance, purge_before, repack=False)
assert not finished
assert states.count() == 2
assert "test.recorder2" in instance._old_states
states_after_purge = session.query(States)
assert states_after_purge[1].old_state_id == states_after_purge[0].state_id
@ -59,6 +61,28 @@ async def test_purge_old_states(
finished = purge_old_data(instance, purge_before, repack=False)
assert finished
assert states.count() == 2
assert "test.recorder2" in instance._old_states
# run purge_old_data again
purge_before = dt_util.utcnow()
finished = purge_old_data(instance, purge_before, repack=False)
assert not finished
assert states.count() == 0
assert "test.recorder2" not in instance._old_states
# Add some more states
await _add_test_states(hass, instance)
# make sure we start with 6 states
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 6
assert states[0].old_state_id is None
assert states[-1].old_state_id == states[-2].state_id
events = session.query(Events).filter(Events.event_type == "state_changed")
assert events.count() == 6
assert "test.recorder2" in instance._old_states
async def test_purge_old_states_encouters_database_corruption(
@ -872,45 +896,27 @@ async def _add_test_states(hass: HomeAssistant, instance: recorder.Recorder):
eleven_days_ago = utcnow - timedelta(days=11)
attributes = {"test_attr": 5, "test_attr_10": "nice"}
await hass.async_block_till_done()
await async_wait_recording_done(hass, instance)
async def set_state(entity_id, state, **kwargs):
"""Set the state."""
hass.states.async_set(entity_id, state, **kwargs)
await hass.async_block_till_done()
await async_wait_recording_done(hass, instance)
with recorder.session_scope(hass=hass) as session:
old_state_id = None
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
state = "autopurgeme"
elif event_id < 4:
timestamp = five_days_ago
state = "purgeme"
else:
timestamp = utcnow
state = "dontpurgeme"
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
state = f"autopurgeme_{event_id}"
elif event_id < 4:
timestamp = five_days_ago
state = f"purgeme_{event_id}"
else:
timestamp = utcnow
state = f"dontpurgeme_{event_id}"
event = Events(
event_type="state_changed",
event_data="{}",
origin="LOCAL",
created=timestamp,
time_fired=timestamp,
)
session.add(event)
session.flush()
state = States(
entity_id="test.recorder2",
domain="sensor",
state=state,
attributes=json.dumps(attributes),
last_changed=timestamp,
last_updated=timestamp,
created=timestamp,
event_id=event.event_id,
old_state_id=old_state_id,
)
session.add(state)
session.flush()
old_state_id = state.state_id
with patch(
"homeassistant.components.recorder.dt_util.utcnow", return_value=timestamp
):
await set_state("test.recorder2", state, attributes=attributes)
async def _add_test_events(hass: HomeAssistant, instance: recorder.Recorder):