From 6ce768465f7762df87b611d046e27c7a0b3160f6 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 26 Apr 2022 10:11:57 -1000 Subject: [PATCH] Improve purge performance for non-sqlite databases (#70576) --- homeassistant/components/recorder/__init__.py | 9 +- .../components/recorder/migration.py | 3 +- homeassistant/components/recorder/models.py | 4 +- homeassistant/components/recorder/purge.py | 125 +++++++++++------- tests/components/recorder/test_init.py | 2 +- tests/components/recorder/test_purge.py | 23 +++- 6 files changed, 112 insertions(+), 54 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 9aeed3b6d12..a0d2f1c8702 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -921,7 +921,8 @@ class Recorder(threading.Thread): self.queue.put(ExternalStatisticsTask(metadata, stats)) @callback - def _using_sqlite(self) -> bool: + def using_sqlite(self) -> bool: + """Return if recorder uses sqlite as the engine.""" return bool(self.engine and self.engine.dialect.name == "sqlite") @callback @@ -933,7 +934,7 @@ class Recorder(threading.Thread): # If the db is using a socket connection, we need to keep alive # to prevent errors from unexpected disconnects - if not self._using_sqlite(): + if not self.using_sqlite(): self._keep_alive_listener = async_track_time_interval( self.hass, self._async_keep_alive, timedelta(seconds=KEEPALIVE_TIME) ) @@ -1355,7 +1356,7 @@ class Recorder(threading.Thread): async def lock_database(self) -> bool: """Lock database so it can be backed up safely.""" - if not self._using_sqlite(): + if not self.using_sqlite(): _LOGGER.debug( "Not a SQLite database or not connected, locking not necessary" ) @@ -1384,7 +1385,7 @@ class Recorder(threading.Thread): Returns true if database lock has been held throughout the process. """ - if not self._using_sqlite(): + if not self.using_sqlite(): _LOGGER.debug( "Not a SQLite database or not connected, unlocking not necessary" ) diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 6af03575619..94614fe8cff 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -646,7 +646,8 @@ def _apply_update(instance, new_version, old_version): # noqa: C901 big_int = "INTEGER(20)" if dialect == "mysql" else "INTEGER" _add_columns(instance, "states", [f"attributes_id {big_int}"]) _create_index(instance, "states", "ix_states_attributes_id") - + elif new_version == 26: + _create_index(instance, "statistics_runs", "ix_statistics_runs_start") else: raise ValueError(f"No schema migration defined for version {new_version}") diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py index b5255ea3bf2..5b402dec7a3 100644 --- a/homeassistant/components/recorder/models.py +++ b/homeassistant/components/recorder/models.py @@ -44,7 +44,7 @@ from .const import ALL_DOMAIN_EXCLUDE_ATTRS, JSON_DUMP # pylint: disable=invalid-name Base = declarative_base() -SCHEMA_VERSION = 25 +SCHEMA_VERSION = 26 _LOGGER = logging.getLogger(__name__) @@ -484,7 +484,7 @@ class StatisticsRuns(Base): # type: ignore[misc,valid-type] __tablename__ = TABLE_STATISTICS_RUNS run_id = Column(Integer, Identity(), primary_key=True) - start = Column(DateTime(timezone=True)) + start = Column(DateTime(timezone=True), index=True) def __repr__(self) -> str: """Return string representation of instance for debugging.""" diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index 21f4bef0d3f..e6f30b1c62d 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -1,12 +1,12 @@ """Purge old data helper.""" from __future__ import annotations -from collections.abc import Callable +from collections.abc import Callable, Iterable from datetime import datetime import logging from typing import TYPE_CHECKING -from sqlalchemy import func +from sqlalchemy import column, func, select, union from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import distinct @@ -42,13 +42,15 @@ def purge_old_data( "Purging states and events before target %s", purge_before.isoformat(sep=" ", timespec="seconds"), ) + using_sqlite = instance.using_sqlite() with session_scope(session=instance.get_session()) as session: # type: ignore[misc] # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record - event_ids = _select_event_ids_to_purge(session, purge_before) - state_ids, attributes_ids = _select_state_and_attributes_ids_to_purge( - session, purge_before, event_ids - ) + ( + event_ids, + state_ids, + attributes_ids, + ) = _select_event_state_and_attributes_ids_to_purge(session, purge_before) statistics_runs = _select_statistics_runs_to_purge(session, purge_before) short_term_statistics = _select_short_term_statistics_to_purge( session, purge_before @@ -58,7 +60,7 @@ def purge_old_data( _purge_state_ids(instance, session, state_ids) if unused_attribute_ids_set := _select_unused_attributes_ids( - session, attributes_ids + session, attributes_ids, using_sqlite ): _purge_attributes_ids(instance, session, unused_attribute_ids_set) @@ -86,52 +88,80 @@ def purge_old_data( return True -def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list[int]: - """Return a list of event ids to purge.""" +def _select_event_state_and_attributes_ids_to_purge( + session: Session, purge_before: datetime +) -> tuple[set[int], set[int], set[int]]: + """Return a list of event, state, and attribute ids to purge.""" events = ( - session.query(Events.event_id) + session.query(Events.event_id, States.state_id, States.attributes_id) + .outerjoin(States, Events.event_id == States.event_id) .filter(Events.time_fired < purge_before) .limit(MAX_ROWS_TO_PURGE) .all() ) _LOGGER.debug("Selected %s event ids to remove", len(events)) - return [event.event_id for event in events] - - -def _select_state_and_attributes_ids_to_purge( - session: Session, purge_before: datetime, event_ids: list[int] -) -> tuple[set[int], set[int]]: - """Return a list of state ids to purge.""" - if not event_ids: - return set(), set() - states = ( - session.query(States.state_id, States.attributes_id) - .filter(States.last_updated < purge_before) - .filter(States.event_id.in_(event_ids)) - .all() - ) - _LOGGER.debug("Selected %s state ids to remove", len(states)) + event_ids = set() state_ids = set() attributes_ids = set() - for state in states: - state_ids.add(state.state_id) - if state.attributes_id: - attributes_ids.add(state.attributes_id) - return state_ids, attributes_ids + for event in events: + event_ids.add(event.event_id) + if event.state_id: + state_ids.add(event.state_id) + if event.attributes_id: + attributes_ids.add(event.attributes_id) + return event_ids, state_ids, attributes_ids def _select_unused_attributes_ids( - session: Session, attributes_ids: set[int] + session: Session, attributes_ids: set[int], using_sqlite: bool ) -> set[int]: """Return a set of attributes ids that are not used by any states in the database.""" if not attributes_ids: return set() - to_remove = attributes_ids - { - state[0] - for state in session.query(distinct(States.attributes_id)) - .filter(States.attributes_id.in_(attributes_ids)) - .all() - } + + if using_sqlite: + # + # SQLite has a superior query optimizer for the distinct query below as it uses the + # covering index without having to examine the rows directly for both of the queries + # below. + # + # We use the distinct query for SQLite since the query in the other branch can + # generate more than 500 unions which SQLite does not support. + # + # How MariaDB's query optimizer handles this query: + # > explain select distinct attributes_id from states where attributes_id in (136723); + # ...Using index + # + id_query = session.query(distinct(States.attributes_id)).filter( + States.attributes_id.in_(attributes_ids) + ) + else: + # + # This branch is for DBMS that cannot optimize the distinct query well and has to examine + # all the rows that match. + # + # This branch uses a union of simple queries, as each query is optimized away as the answer + # to the query can be found in the index. + # + # The below query works for SQLite as long as there are no more than 500 attributes_id + # to be selected. We currently do not have MySQL or PostgreSQL servers running in the + # test suite; we test this path using SQLite when there are less than 500 attributes_id. + # + # How MariaDB's query optimizer handles this query: + # > explain select min(attributes_id) from states where attributes_id = 136723; + # ...Select tables optimized away + # + id_query = session.query(column("id")).from_statement( + union( + *[ + select(func.min(States.attributes_id).label("id")).where( + States.attributes_id == attributes_id + ) + for attributes_id in attributes_ids + ] + ) + ) + to_remove = attributes_ids - {state[0] for state in id_query.all()} _LOGGER.debug( "Selected %s shared attributes to remove", len(to_remove), @@ -276,7 +306,7 @@ def _purge_short_term_statistics( _LOGGER.debug("Deleted %s short term statistics", deleted_rows) -def _purge_event_ids(session: Session, event_ids: list[int]) -> None: +def _purge_event_ids(session: Session, event_ids: Iterable[int]) -> None: """Delete by event id.""" deleted_rows = ( session.query(Events) @@ -303,6 +333,7 @@ def _purge_old_recorder_runs( def _purge_filtered_data(instance: Recorder, session: Session) -> bool: """Remove filtered states and events that shouldn't be in the database.""" _LOGGER.debug("Cleanup filtered data") + using_sqlite = instance.using_sqlite() # Check if excluded entity_ids are in database excluded_entity_ids: list[str] = [ @@ -311,7 +342,7 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool: if not instance.entity_filter(entity_id) ] if len(excluded_entity_ids) > 0: - _purge_filtered_states(instance, session, excluded_entity_ids) + _purge_filtered_states(instance, session, excluded_entity_ids, using_sqlite) return False # Check if excluded event_types are in database @@ -328,12 +359,15 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool: def _purge_filtered_states( - instance: Recorder, session: Session, excluded_entity_ids: list[str] + instance: Recorder, + session: Session, + excluded_entity_ids: list[str], + using_sqlite: bool, ) -> None: """Remove filtered states and linked events.""" state_ids: list[int] attributes_ids: list[int] - event_ids: list[int | None] + event_ids: list[int] state_ids, attributes_ids, event_ids = zip( *( session.query(States.state_id, States.attributes_id, States.event_id) @@ -347,9 +381,9 @@ def _purge_filtered_states( "Selected %s state_ids to remove that should be filtered", len(state_ids) ) _purge_state_ids(instance, session, set(state_ids)) - _purge_event_ids(session, event_ids) # type: ignore[arg-type] # type of event_ids already narrowed to 'list[int]' + _purge_event_ids(session, event_ids) unused_attribute_ids_set = _select_unused_attributes_ids( - session, {id_ for id_ in attributes_ids if id_ is not None} + session, {id_ for id_ in attributes_ids if id_ is not None}, using_sqlite ) _purge_attributes_ids(instance, session, unused_attribute_ids_set) @@ -384,6 +418,7 @@ def _purge_filtered_events( @retryable_database_job("purge") def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool]) -> bool: """Purge states and events of specified entities.""" + using_sqlite = instance.using_sqlite() with session_scope(session=instance.get_session()) as session: # type: ignore[misc] selected_entity_ids: list[str] = [ entity_id @@ -393,7 +428,7 @@ def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool]) _LOGGER.debug("Purging entity data for %s", selected_entity_ids) if len(selected_entity_ids) > 0: # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record - _purge_filtered_states(instance, session, selected_entity_ids) + _purge_filtered_states(instance, session, selected_entity_ids, using_sqlite) _LOGGER.debug("Purging entity data hasn't fully completed yet") return False diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 78a3f808d03..77bd3992e72 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -1395,7 +1395,7 @@ async def test_database_connection_keep_alive( ): """Test we keep alive socket based dialects.""" with patch( - "homeassistant.components.recorder.Recorder._using_sqlite", return_value=False + "homeassistant.components.recorder.Recorder.using_sqlite", return_value=False ): instance = await async_setup_recorder_instance(hass) # We have to mock this since we don't have a mock diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index b2c6201b162..1f0f3c87ea5 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -4,6 +4,7 @@ import json import sqlite3 from unittest.mock import MagicMock, patch +import pytest from sqlalchemy.exc import DatabaseError, OperationalError from sqlalchemy.orm.session import Session @@ -34,6 +35,16 @@ from .common import ( from tests.common import SetupRecorderInstanceT +@pytest.fixture(name="use_sqlite") +def mock_use_sqlite(request): + """Pytest fixture to switch purge method.""" + with patch( + "homeassistant.components.recorder.Recorder.using_sqlite", + return_value=request.param, + ): + yield + + async def test_purge_old_states( hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT ): @@ -270,10 +281,12 @@ async def test_purge_old_statistics_runs( assert statistics_runs.count() == 1 +@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True) async def test_purge_method( hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT, - caplog, + caplog: pytest.LogCaptureFixture, + use_sqlite: bool, ): """Test purge method.""" @@ -384,9 +397,11 @@ async def test_purge_method( assert "Vacuuming SQL DB to free space" in caplog.text +@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True) async def test_purge_edge_case( hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT, + use_sqlite: bool, ): """Test states and events are purged even if they occurred shortly before purge_before.""" @@ -599,9 +614,11 @@ async def test_purge_cutoff_date( assert state_attributes.count() == 0 +@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True) async def test_purge_filtered_states( hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT, + use_sqlite: bool, ): """Test filtered states are purged.""" config: ConfigType = {"exclude": {"entities": ["sensor.excluded"]}} @@ -792,9 +809,11 @@ async def test_purge_filtered_states( assert session.query(StateAttributes).count() == 1 +@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True) async def test_purge_filtered_states_to_empty( hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT, + use_sqlite: bool, ): """Test filtered states are purged all the way to an empty db.""" config: ConfigType = {"exclude": {"entities": ["sensor.excluded"]}} @@ -847,9 +866,11 @@ async def test_purge_filtered_states_to_empty( await async_wait_purge_done(hass) +@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True) async def test_purge_without_state_attributes_filtered_states_to_empty( hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT, + use_sqlite: bool, ): """Test filtered legacy states without state attributes are purged all the way to an empty db.""" config: ConfigType = {"exclude": {"entities": ["sensor.old_format"]}}