Fix cpu thrashing during purge after all legacy events were removed (#89923)
* Fix cpu thrashing during purge after all legacy events were removed We now remove the the index of of event ids on the states table when its all NULLs to save space. The purge path needs to avoid checking for legacy rows to purge if the index has been removed since it will result in a full table scan each purge cycle that will always find no legacy rows to purge * one more place * drop the key constraint as well * fixes * more sqlitepull/89941/head^2
parent
7f3e4cb3af
commit
aebe4c66a6
|
@ -221,6 +221,7 @@ class Recorder(threading.Thread):
|
|||
self.async_migration_event = asyncio.Event()
|
||||
self.migration_in_progress = False
|
||||
self.migration_is_live = False
|
||||
self.use_legacy_events_index = False
|
||||
self._database_lock_task: DatabaseLockTask | None = None
|
||||
self._db_executor: DBInterruptibleThreadPoolExecutor | None = None
|
||||
|
||||
|
@ -744,6 +745,7 @@ class Recorder(threading.Thread):
|
|||
session, TABLE_STATES, LEGACY_STATES_EVENT_ID_INDEX
|
||||
):
|
||||
self.queue_task(EventIdMigrationTask())
|
||||
self.use_legacy_events_index = True
|
||||
|
||||
# We must only set the db ready after we have set the table managers
|
||||
# to active if there is no data to migrate.
|
||||
|
|
|
@ -513,11 +513,7 @@ def _drop_foreign_key_constraints(
|
|||
inspector = sqlalchemy.inspect(engine)
|
||||
drops = []
|
||||
for foreign_key in inspector.get_foreign_keys(table):
|
||||
if (
|
||||
foreign_key["name"]
|
||||
and foreign_key.get("options", {}).get("ondelete")
|
||||
and foreign_key["constrained_columns"] == columns
|
||||
):
|
||||
if foreign_key["name"] and foreign_key["constrained_columns"] == columns:
|
||||
drops.append(ForeignKeyConstraint((), (), name=foreign_key["name"]))
|
||||
|
||||
# Bind the ForeignKeyConstraints to the table
|
||||
|
@ -1547,7 +1543,16 @@ def cleanup_legacy_states_event_ids(instance: Recorder) -> bool:
|
|||
if all_gone:
|
||||
# Only drop the index if there are no more event_ids in the states table
|
||||
# ex all NULL
|
||||
_drop_index(session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX)
|
||||
assert instance.engine is not None, "engine should never be None"
|
||||
if instance.dialect_name != SupportedDialect.SQLITE:
|
||||
# SQLite does not support dropping foreign key constraints
|
||||
# so we can't drop the index at this time but we can avoid
|
||||
# looking for legacy rows during purge
|
||||
_drop_foreign_key_constraints(
|
||||
session_maker, instance.engine, TABLE_STATES, ["event_id"]
|
||||
)
|
||||
_drop_index(session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX)
|
||||
instance.use_legacy_events_index = False
|
||||
|
||||
return True
|
||||
|
||||
|
|
|
@ -8,7 +8,6 @@ import logging
|
|||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from sqlalchemy.engine.row import Row
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
@ -74,7 +73,7 @@ def purge_old_data(
|
|||
with session_scope(session=instance.get_session()) as session:
|
||||
# Purge a max of SQLITE_MAX_BIND_VARS, based on the oldest states or events record
|
||||
has_more_to_purge = False
|
||||
if _purging_legacy_format(session):
|
||||
if instance.use_legacy_events_index and _purging_legacy_format(session):
|
||||
_LOGGER.debug(
|
||||
"Purge running in legacy format as there are states with event_id"
|
||||
" remaining"
|
||||
|
@ -671,14 +670,18 @@ def _purge_filtered_events(
|
|||
_LOGGER.debug(
|
||||
"Selected %s event_ids to remove that should be filtered", len(event_ids_set)
|
||||
)
|
||||
states: list[Row[tuple[int]]] = (
|
||||
session.query(States.state_id).filter(States.event_id.in_(event_ids_set)).all()
|
||||
)
|
||||
if states:
|
||||
if (
|
||||
instance.use_legacy_events_index
|
||||
and (
|
||||
states := session.query(States.state_id)
|
||||
.filter(States.event_id.in_(event_ids_set))
|
||||
.all()
|
||||
)
|
||||
and (state_ids := {state.state_id for state in states})
|
||||
):
|
||||
# These are legacy states that are linked to an event that are no longer
|
||||
# created but since we did not remove them when we stopped adding new ones
|
||||
# we will need to purge them here.
|
||||
state_ids: set[int] = {state.state_id for state in states}
|
||||
_purge_state_ids(instance, session, state_ids)
|
||||
_purge_event_ids(session, event_ids_set)
|
||||
if unused_data_ids_set := _select_unused_event_data_ids(
|
||||
|
|
|
@ -10,7 +10,7 @@ from sqlalchemy.exc import DatabaseError, OperationalError
|
|||
from sqlalchemy.orm.session import Session
|
||||
|
||||
from homeassistant.components import recorder
|
||||
from homeassistant.components.recorder import Recorder
|
||||
from homeassistant.components.recorder import Recorder, migration
|
||||
from homeassistant.components.recorder.const import (
|
||||
SQLITE_MAX_BIND_VARS,
|
||||
SupportedDialect,
|
||||
|
@ -1726,6 +1726,18 @@ async def test_purge_can_mix_legacy_and_new_format(
|
|||
) -> None:
|
||||
"""Test purging with legacy a new events."""
|
||||
instance = await async_setup_recorder_instance(hass)
|
||||
await async_wait_recording_done(hass)
|
||||
# New databases are no longer created with the legacy events index
|
||||
assert instance.use_legacy_events_index is False
|
||||
|
||||
def _recreate_legacy_events_index():
|
||||
"""Recreate the legacy events index since its no longer created on new instances."""
|
||||
migration._create_index(instance.get_session, "states", "ix_states_event_id")
|
||||
instance.use_legacy_events_index = True
|
||||
|
||||
await instance.async_add_executor_job(_recreate_legacy_events_index)
|
||||
assert instance.use_legacy_events_index is True
|
||||
|
||||
utcnow = dt_util.utcnow()
|
||||
eleven_days_ago = utcnow - timedelta(days=11)
|
||||
with session_scope(hass=hass) as session:
|
||||
|
|
|
@ -142,6 +142,7 @@ async def test_migrate_times(
|
|||
_get_states_index_names
|
||||
)
|
||||
states_index_names = {index["name"] for index in states_indexes}
|
||||
assert recorder.get_instance(hass).use_legacy_events_index is True
|
||||
|
||||
await hass.async_stop()
|
||||
await hass.async_block_till_done()
|
||||
|
@ -212,7 +213,12 @@ async def test_migrate_times(
|
|||
)
|
||||
states_index_names = {index["name"] for index in states_indexes}
|
||||
|
||||
assert "ix_states_event_id" not in states_index_names
|
||||
# sqlite does not support dropping foreign keys so the
|
||||
# ix_states_event_id index is not dropped in this case
|
||||
# but use_legacy_events_index is still False
|
||||
assert "ix_states_event_id" in states_index_names
|
||||
|
||||
assert recorder.get_instance(hass).use_legacy_events_index is False
|
||||
|
||||
await hass.async_stop()
|
||||
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
|
||||
|
|
Loading…
Reference in New Issue