Improve purge performance for non-sqlite databases (#70576)

pull/70829/head
J. Nick Koston 2022-04-26 10:11:57 -10:00 committed by GitHub
parent 1c4a785fb3
commit 6ce768465f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 112 additions and 54 deletions

View File

@ -921,7 +921,8 @@ class Recorder(threading.Thread):
self.queue.put(ExternalStatisticsTask(metadata, stats)) self.queue.put(ExternalStatisticsTask(metadata, stats))
@callback @callback
def _using_sqlite(self) -> bool: def using_sqlite(self) -> bool:
"""Return if recorder uses sqlite as the engine."""
return bool(self.engine and self.engine.dialect.name == "sqlite") return bool(self.engine and self.engine.dialect.name == "sqlite")
@callback @callback
@ -933,7 +934,7 @@ class Recorder(threading.Thread):
# If the db is using a socket connection, we need to keep alive # If the db is using a socket connection, we need to keep alive
# to prevent errors from unexpected disconnects # to prevent errors from unexpected disconnects
if not self._using_sqlite(): if not self.using_sqlite():
self._keep_alive_listener = async_track_time_interval( self._keep_alive_listener = async_track_time_interval(
self.hass, self._async_keep_alive, timedelta(seconds=KEEPALIVE_TIME) self.hass, self._async_keep_alive, timedelta(seconds=KEEPALIVE_TIME)
) )
@ -1355,7 +1356,7 @@ class Recorder(threading.Thread):
async def lock_database(self) -> bool: async def lock_database(self) -> bool:
"""Lock database so it can be backed up safely.""" """Lock database so it can be backed up safely."""
if not self._using_sqlite(): if not self.using_sqlite():
_LOGGER.debug( _LOGGER.debug(
"Not a SQLite database or not connected, locking not necessary" "Not a SQLite database or not connected, locking not necessary"
) )
@ -1384,7 +1385,7 @@ class Recorder(threading.Thread):
Returns true if database lock has been held throughout the process. Returns true if database lock has been held throughout the process.
""" """
if not self._using_sqlite(): if not self.using_sqlite():
_LOGGER.debug( _LOGGER.debug(
"Not a SQLite database or not connected, unlocking not necessary" "Not a SQLite database or not connected, unlocking not necessary"
) )

View File

@ -646,7 +646,8 @@ def _apply_update(instance, new_version, old_version): # noqa: C901
big_int = "INTEGER(20)" if dialect == "mysql" else "INTEGER" big_int = "INTEGER(20)" if dialect == "mysql" else "INTEGER"
_add_columns(instance, "states", [f"attributes_id {big_int}"]) _add_columns(instance, "states", [f"attributes_id {big_int}"])
_create_index(instance, "states", "ix_states_attributes_id") _create_index(instance, "states", "ix_states_attributes_id")
elif new_version == 26:
_create_index(instance, "statistics_runs", "ix_statistics_runs_start")
else: else:
raise ValueError(f"No schema migration defined for version {new_version}") raise ValueError(f"No schema migration defined for version {new_version}")

View File

@ -44,7 +44,7 @@ from .const import ALL_DOMAIN_EXCLUDE_ATTRS, JSON_DUMP
# pylint: disable=invalid-name # pylint: disable=invalid-name
Base = declarative_base() Base = declarative_base()
SCHEMA_VERSION = 25 SCHEMA_VERSION = 26
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -484,7 +484,7 @@ class StatisticsRuns(Base): # type: ignore[misc,valid-type]
__tablename__ = TABLE_STATISTICS_RUNS __tablename__ = TABLE_STATISTICS_RUNS
run_id = Column(Integer, Identity(), primary_key=True) run_id = Column(Integer, Identity(), primary_key=True)
start = Column(DateTime(timezone=True)) start = Column(DateTime(timezone=True), index=True)
def __repr__(self) -> str: def __repr__(self) -> str:
"""Return string representation of instance for debugging.""" """Return string representation of instance for debugging."""

View File

@ -1,12 +1,12 @@
"""Purge old data helper.""" """Purge old data helper."""
from __future__ import annotations from __future__ import annotations
from collections.abc import Callable from collections.abc import Callable, Iterable
from datetime import datetime from datetime import datetime
import logging import logging
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from sqlalchemy import func from sqlalchemy import column, func, select, union
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import distinct from sqlalchemy.sql.expression import distinct
@ -42,13 +42,15 @@ def purge_old_data(
"Purging states and events before target %s", "Purging states and events before target %s",
purge_before.isoformat(sep=" ", timespec="seconds"), purge_before.isoformat(sep=" ", timespec="seconds"),
) )
using_sqlite = instance.using_sqlite()
with session_scope(session=instance.get_session()) as session: # type: ignore[misc] with session_scope(session=instance.get_session()) as session: # type: ignore[misc]
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
event_ids = _select_event_ids_to_purge(session, purge_before) (
state_ids, attributes_ids = _select_state_and_attributes_ids_to_purge( event_ids,
session, purge_before, event_ids state_ids,
) attributes_ids,
) = _select_event_state_and_attributes_ids_to_purge(session, purge_before)
statistics_runs = _select_statistics_runs_to_purge(session, purge_before) statistics_runs = _select_statistics_runs_to_purge(session, purge_before)
short_term_statistics = _select_short_term_statistics_to_purge( short_term_statistics = _select_short_term_statistics_to_purge(
session, purge_before session, purge_before
@ -58,7 +60,7 @@ def purge_old_data(
_purge_state_ids(instance, session, state_ids) _purge_state_ids(instance, session, state_ids)
if unused_attribute_ids_set := _select_unused_attributes_ids( if unused_attribute_ids_set := _select_unused_attributes_ids(
session, attributes_ids session, attributes_ids, using_sqlite
): ):
_purge_attributes_ids(instance, session, unused_attribute_ids_set) _purge_attributes_ids(instance, session, unused_attribute_ids_set)
@ -86,52 +88,80 @@ def purge_old_data(
return True return True
def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list[int]: def _select_event_state_and_attributes_ids_to_purge(
"""Return a list of event ids to purge.""" session: Session, purge_before: datetime
) -> tuple[set[int], set[int], set[int]]:
"""Return a list of event, state, and attribute ids to purge."""
events = ( events = (
session.query(Events.event_id) session.query(Events.event_id, States.state_id, States.attributes_id)
.outerjoin(States, Events.event_id == States.event_id)
.filter(Events.time_fired < purge_before) .filter(Events.time_fired < purge_before)
.limit(MAX_ROWS_TO_PURGE) .limit(MAX_ROWS_TO_PURGE)
.all() .all()
) )
_LOGGER.debug("Selected %s event ids to remove", len(events)) _LOGGER.debug("Selected %s event ids to remove", len(events))
return [event.event_id for event in events] event_ids = set()
def _select_state_and_attributes_ids_to_purge(
session: Session, purge_before: datetime, event_ids: list[int]
) -> tuple[set[int], set[int]]:
"""Return a list of state ids to purge."""
if not event_ids:
return set(), set()
states = (
session.query(States.state_id, States.attributes_id)
.filter(States.last_updated < purge_before)
.filter(States.event_id.in_(event_ids))
.all()
)
_LOGGER.debug("Selected %s state ids to remove", len(states))
state_ids = set() state_ids = set()
attributes_ids = set() attributes_ids = set()
for state in states: for event in events:
state_ids.add(state.state_id) event_ids.add(event.event_id)
if state.attributes_id: if event.state_id:
attributes_ids.add(state.attributes_id) state_ids.add(event.state_id)
return state_ids, attributes_ids if event.attributes_id:
attributes_ids.add(event.attributes_id)
return event_ids, state_ids, attributes_ids
def _select_unused_attributes_ids( def _select_unused_attributes_ids(
session: Session, attributes_ids: set[int] session: Session, attributes_ids: set[int], using_sqlite: bool
) -> set[int]: ) -> set[int]:
"""Return a set of attributes ids that are not used by any states in the database.""" """Return a set of attributes ids that are not used by any states in the database."""
if not attributes_ids: if not attributes_ids:
return set() return set()
to_remove = attributes_ids - {
state[0] if using_sqlite:
for state in session.query(distinct(States.attributes_id)) #
.filter(States.attributes_id.in_(attributes_ids)) # SQLite has a superior query optimizer for the distinct query below as it uses the
.all() # covering index without having to examine the rows directly for both of the queries
} # below.
#
# We use the distinct query for SQLite since the query in the other branch can
# generate more than 500 unions which SQLite does not support.
#
# How MariaDB's query optimizer handles this query:
# > explain select distinct attributes_id from states where attributes_id in (136723);
# ...Using index
#
id_query = session.query(distinct(States.attributes_id)).filter(
States.attributes_id.in_(attributes_ids)
)
else:
#
# This branch is for DBMS that cannot optimize the distinct query well and has to examine
# all the rows that match.
#
# This branch uses a union of simple queries, as each query is optimized away as the answer
# to the query can be found in the index.
#
# The below query works for SQLite as long as there are no more than 500 attributes_id
# to be selected. We currently do not have MySQL or PostgreSQL servers running in the
# test suite; we test this path using SQLite when there are less than 500 attributes_id.
#
# How MariaDB's query optimizer handles this query:
# > explain select min(attributes_id) from states where attributes_id = 136723;
# ...Select tables optimized away
#
id_query = session.query(column("id")).from_statement(
union(
*[
select(func.min(States.attributes_id).label("id")).where(
States.attributes_id == attributes_id
)
for attributes_id in attributes_ids
]
)
)
to_remove = attributes_ids - {state[0] for state in id_query.all()}
_LOGGER.debug( _LOGGER.debug(
"Selected %s shared attributes to remove", "Selected %s shared attributes to remove",
len(to_remove), len(to_remove),
@ -276,7 +306,7 @@ def _purge_short_term_statistics(
_LOGGER.debug("Deleted %s short term statistics", deleted_rows) _LOGGER.debug("Deleted %s short term statistics", deleted_rows)
def _purge_event_ids(session: Session, event_ids: list[int]) -> None: def _purge_event_ids(session: Session, event_ids: Iterable[int]) -> None:
"""Delete by event id.""" """Delete by event id."""
deleted_rows = ( deleted_rows = (
session.query(Events) session.query(Events)
@ -303,6 +333,7 @@ def _purge_old_recorder_runs(
def _purge_filtered_data(instance: Recorder, session: Session) -> bool: def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
"""Remove filtered states and events that shouldn't be in the database.""" """Remove filtered states and events that shouldn't be in the database."""
_LOGGER.debug("Cleanup filtered data") _LOGGER.debug("Cleanup filtered data")
using_sqlite = instance.using_sqlite()
# Check if excluded entity_ids are in database # Check if excluded entity_ids are in database
excluded_entity_ids: list[str] = [ excluded_entity_ids: list[str] = [
@ -311,7 +342,7 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
if not instance.entity_filter(entity_id) if not instance.entity_filter(entity_id)
] ]
if len(excluded_entity_ids) > 0: if len(excluded_entity_ids) > 0:
_purge_filtered_states(instance, session, excluded_entity_ids) _purge_filtered_states(instance, session, excluded_entity_ids, using_sqlite)
return False return False
# Check if excluded event_types are in database # Check if excluded event_types are in database
@ -328,12 +359,15 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
def _purge_filtered_states( def _purge_filtered_states(
instance: Recorder, session: Session, excluded_entity_ids: list[str] instance: Recorder,
session: Session,
excluded_entity_ids: list[str],
using_sqlite: bool,
) -> None: ) -> None:
"""Remove filtered states and linked events.""" """Remove filtered states and linked events."""
state_ids: list[int] state_ids: list[int]
attributes_ids: list[int] attributes_ids: list[int]
event_ids: list[int | None] event_ids: list[int]
state_ids, attributes_ids, event_ids = zip( state_ids, attributes_ids, event_ids = zip(
*( *(
session.query(States.state_id, States.attributes_id, States.event_id) session.query(States.state_id, States.attributes_id, States.event_id)
@ -347,9 +381,9 @@ def _purge_filtered_states(
"Selected %s state_ids to remove that should be filtered", len(state_ids) "Selected %s state_ids to remove that should be filtered", len(state_ids)
) )
_purge_state_ids(instance, session, set(state_ids)) _purge_state_ids(instance, session, set(state_ids))
_purge_event_ids(session, event_ids) # type: ignore[arg-type] # type of event_ids already narrowed to 'list[int]' _purge_event_ids(session, event_ids)
unused_attribute_ids_set = _select_unused_attributes_ids( unused_attribute_ids_set = _select_unused_attributes_ids(
session, {id_ for id_ in attributes_ids if id_ is not None} session, {id_ for id_ in attributes_ids if id_ is not None}, using_sqlite
) )
_purge_attributes_ids(instance, session, unused_attribute_ids_set) _purge_attributes_ids(instance, session, unused_attribute_ids_set)
@ -384,6 +418,7 @@ def _purge_filtered_events(
@retryable_database_job("purge") @retryable_database_job("purge")
def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool]) -> bool: def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool]) -> bool:
"""Purge states and events of specified entities.""" """Purge states and events of specified entities."""
using_sqlite = instance.using_sqlite()
with session_scope(session=instance.get_session()) as session: # type: ignore[misc] with session_scope(session=instance.get_session()) as session: # type: ignore[misc]
selected_entity_ids: list[str] = [ selected_entity_ids: list[str] = [
entity_id entity_id
@ -393,7 +428,7 @@ def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool])
_LOGGER.debug("Purging entity data for %s", selected_entity_ids) _LOGGER.debug("Purging entity data for %s", selected_entity_ids)
if len(selected_entity_ids) > 0: if len(selected_entity_ids) > 0:
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
_purge_filtered_states(instance, session, selected_entity_ids) _purge_filtered_states(instance, session, selected_entity_ids, using_sqlite)
_LOGGER.debug("Purging entity data hasn't fully completed yet") _LOGGER.debug("Purging entity data hasn't fully completed yet")
return False return False

View File

@ -1395,7 +1395,7 @@ async def test_database_connection_keep_alive(
): ):
"""Test we keep alive socket based dialects.""" """Test we keep alive socket based dialects."""
with patch( with patch(
"homeassistant.components.recorder.Recorder._using_sqlite", return_value=False "homeassistant.components.recorder.Recorder.using_sqlite", return_value=False
): ):
instance = await async_setup_recorder_instance(hass) instance = await async_setup_recorder_instance(hass)
# We have to mock this since we don't have a mock # We have to mock this since we don't have a mock

View File

@ -4,6 +4,7 @@ import json
import sqlite3 import sqlite3
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import pytest
from sqlalchemy.exc import DatabaseError, OperationalError from sqlalchemy.exc import DatabaseError, OperationalError
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
@ -34,6 +35,16 @@ from .common import (
from tests.common import SetupRecorderInstanceT from tests.common import SetupRecorderInstanceT
@pytest.fixture(name="use_sqlite")
def mock_use_sqlite(request):
"""Pytest fixture to switch purge method."""
with patch(
"homeassistant.components.recorder.Recorder.using_sqlite",
return_value=request.param,
):
yield
async def test_purge_old_states( async def test_purge_old_states(
hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT
): ):
@ -270,10 +281,12 @@ async def test_purge_old_statistics_runs(
assert statistics_runs.count() == 1 assert statistics_runs.count() == 1
@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True)
async def test_purge_method( async def test_purge_method(
hass: HomeAssistant, hass: HomeAssistant,
async_setup_recorder_instance: SetupRecorderInstanceT, async_setup_recorder_instance: SetupRecorderInstanceT,
caplog, caplog: pytest.LogCaptureFixture,
use_sqlite: bool,
): ):
"""Test purge method.""" """Test purge method."""
@ -384,9 +397,11 @@ async def test_purge_method(
assert "Vacuuming SQL DB to free space" in caplog.text assert "Vacuuming SQL DB to free space" in caplog.text
@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True)
async def test_purge_edge_case( async def test_purge_edge_case(
hass: HomeAssistant, hass: HomeAssistant,
async_setup_recorder_instance: SetupRecorderInstanceT, async_setup_recorder_instance: SetupRecorderInstanceT,
use_sqlite: bool,
): ):
"""Test states and events are purged even if they occurred shortly before purge_before.""" """Test states and events are purged even if they occurred shortly before purge_before."""
@ -599,9 +614,11 @@ async def test_purge_cutoff_date(
assert state_attributes.count() == 0 assert state_attributes.count() == 0
@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True)
async def test_purge_filtered_states( async def test_purge_filtered_states(
hass: HomeAssistant, hass: HomeAssistant,
async_setup_recorder_instance: SetupRecorderInstanceT, async_setup_recorder_instance: SetupRecorderInstanceT,
use_sqlite: bool,
): ):
"""Test filtered states are purged.""" """Test filtered states are purged."""
config: ConfigType = {"exclude": {"entities": ["sensor.excluded"]}} config: ConfigType = {"exclude": {"entities": ["sensor.excluded"]}}
@ -792,9 +809,11 @@ async def test_purge_filtered_states(
assert session.query(StateAttributes).count() == 1 assert session.query(StateAttributes).count() == 1
@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True)
async def test_purge_filtered_states_to_empty( async def test_purge_filtered_states_to_empty(
hass: HomeAssistant, hass: HomeAssistant,
async_setup_recorder_instance: SetupRecorderInstanceT, async_setup_recorder_instance: SetupRecorderInstanceT,
use_sqlite: bool,
): ):
"""Test filtered states are purged all the way to an empty db.""" """Test filtered states are purged all the way to an empty db."""
config: ConfigType = {"exclude": {"entities": ["sensor.excluded"]}} config: ConfigType = {"exclude": {"entities": ["sensor.excluded"]}}
@ -847,9 +866,11 @@ async def test_purge_filtered_states_to_empty(
await async_wait_purge_done(hass) await async_wait_purge_done(hass)
@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True)
async def test_purge_without_state_attributes_filtered_states_to_empty( async def test_purge_without_state_attributes_filtered_states_to_empty(
hass: HomeAssistant, hass: HomeAssistant,
async_setup_recorder_instance: SetupRecorderInstanceT, async_setup_recorder_instance: SetupRecorderInstanceT,
use_sqlite: bool,
): ):
"""Test filtered legacy states without state attributes are purged all the way to an empty db.""" """Test filtered legacy states without state attributes are purged all the way to an empty db."""
config: ConfigType = {"exclude": {"entities": ["sensor.old_format"]}} config: ConfigType = {"exclude": {"entities": ["sensor.old_format"]}}