diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 431bc78ba80..0b8fe9243ba 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -50,7 +50,7 @@ from .tasks import ( PostSchemaMigrationTask, StatisticsTimestampMigrationCleanupTask, ) -from .util import session_scope +from .util import database_job_retry_wrapper, session_scope if TYPE_CHECKING: from . import Recorder @@ -158,7 +158,9 @@ def migrate_schema( hass.add_job(instance.async_set_db_ready) new_version = version + 1 _LOGGER.info("Upgrading recorder db schema to version %s", new_version) - _apply_update(hass, engine, session_maker, new_version, current_version) + _apply_update( + instance, hass, engine, session_maker, new_version, current_version + ) with session_scope(session=session_maker()) as session: session.add(SchemaChanges(schema_version=new_version)) @@ -508,7 +510,9 @@ def _drop_foreign_key_constraints( ) +@database_job_retry_wrapper("Apply migration update", 10) def _apply_update( # noqa: C901 + instance: Recorder, hass: HomeAssistant, engine: Engine, session_maker: Callable[[], Session], @@ -922,7 +926,7 @@ def _apply_update( # noqa: C901 # There may be duplicated statistics entries, delete duplicates # and try again with session_scope(session=session_maker()) as session: - delete_statistics_duplicates(hass, session) + delete_statistics_duplicates(instance, hass, session) _migrate_statistics_columns_to_timestamp(session_maker, engine) # Log at error level to ensure the user sees this message in the log # since we logged the error above. @@ -965,7 +969,7 @@ def post_schema_migration( # since they are no longer used and take up a significant amount of space. assert instance.event_session is not None assert instance.engine is not None - _wipe_old_string_time_columns(instance.engine, instance.event_session) + _wipe_old_string_time_columns(instance, instance.engine, instance.event_session) if old_version < 35 <= new_version: # In version 34 we migrated all the created, start, and last_reset # columns to be timestamps. In version 34 we need to wipe the old columns @@ -978,7 +982,10 @@ def _wipe_old_string_statistics_columns(instance: Recorder) -> None: instance.queue_task(StatisticsTimestampMigrationCleanupTask()) -def _wipe_old_string_time_columns(engine: Engine, session: Session) -> None: +@database_job_retry_wrapper("Wipe old string time columns", 3) +def _wipe_old_string_time_columns( + instance: Recorder, engine: Engine, session: Session +) -> None: """Wipe old string time columns to save space.""" # Wipe Events.time_fired since its been replaced by Events.time_fired_ts # Wipe States.last_updated since its been replaced by States.last_updated_ts @@ -1162,7 +1169,7 @@ def _migrate_statistics_columns_to_timestamp( "last_reset_ts=" "UNIX_TIMESTAMP(last_reset) " "where start_ts is NULL " - "LIMIT 250000;" + "LIMIT 100000;" ) ) elif engine.dialect.name == SupportedDialect.POSTGRESQL: @@ -1180,7 +1187,7 @@ def _migrate_statistics_columns_to_timestamp( "created_ts=EXTRACT(EPOCH FROM created), " "last_reset_ts=EXTRACT(EPOCH FROM last_reset) " "where id IN ( " - f"SELECT id FROM {table} where start_ts is NULL LIMIT 250000 " + f"SELECT id FROM {table} where start_ts is NULL LIMIT 100000 " " );" ) ) diff --git a/homeassistant/components/recorder/statistics.py b/homeassistant/components/recorder/statistics.py index 294c5217623..c90447f1c99 100644 --- a/homeassistant/components/recorder/statistics.py +++ b/homeassistant/components/recorder/statistics.py @@ -75,6 +75,7 @@ from .models import ( datetime_to_timestamp_or_none, ) from .util import ( + database_job_retry_wrapper, execute, execute_stmt_lambda_element, get_instance, @@ -515,7 +516,10 @@ def _delete_duplicates_from_table( return (total_deleted_rows, all_non_identical_duplicates) -def delete_statistics_duplicates(hass: HomeAssistant, session: Session) -> None: +@database_job_retry_wrapper("delete statistics duplicates", 3) +def delete_statistics_duplicates( + instance: Recorder, hass: HomeAssistant, session: Session +) -> None: """Identify and delete duplicated statistics. A backup will be made of duplicated statistics before it is deleted. diff --git a/homeassistant/components/recorder/util.py b/homeassistant/components/recorder/util.py index 3ff6b62b21e..bfdd8ff5b14 100644 --- a/homeassistant/components/recorder/util.py +++ b/homeassistant/components/recorder/util.py @@ -568,6 +568,17 @@ def end_incomplete_runs(session: Session, start_time: datetime) -> None: session.add(run) +def _is_retryable_error(instance: Recorder, err: OperationalError) -> bool: + """Return True if the error is retryable.""" + assert instance.engine is not None + return bool( + instance.engine.dialect.name == SupportedDialect.MYSQL + and isinstance(err.orig, BaseException) + and err.orig.args + and err.orig.args[0] in RETRYABLE_MYSQL_ERRORS + ) + + _FuncType = Callable[Concatenate[_RecorderT, _P], bool] @@ -585,12 +596,8 @@ def retryable_database_job( try: return job(instance, *args, **kwargs) except OperationalError as err: - assert instance.engine is not None - if ( - instance.engine.dialect.name == SupportedDialect.MYSQL - and err.orig - and err.orig.args[0] in RETRYABLE_MYSQL_ERRORS - ): + if _is_retryable_error(instance, err): + assert isinstance(err.orig, BaseException) _LOGGER.info( "%s; %s not completed, retrying", err.orig.args[1], description ) @@ -608,6 +615,46 @@ def retryable_database_job( return decorator +_WrappedFuncType = Callable[Concatenate[_RecorderT, _P], None] + + +def database_job_retry_wrapper( + description: str, attempts: int = 5 +) -> Callable[[_WrappedFuncType[_RecorderT, _P]], _WrappedFuncType[_RecorderT, _P]]: + """Try to execute a database job multiple times. + + This wrapper handles InnoDB deadlocks and lock timeouts. + + This is different from retryable_database_job in that it will retry the job + attempts number of times instead of returning False if the job fails. + """ + + def decorator( + job: _WrappedFuncType[_RecorderT, _P] + ) -> _WrappedFuncType[_RecorderT, _P]: + @functools.wraps(job) + def wrapper(instance: _RecorderT, *args: _P.args, **kwargs: _P.kwargs) -> None: + for attempt in range(attempts): + try: + job(instance, *args, **kwargs) + return + except OperationalError as err: + if attempt == attempts - 1 or not _is_retryable_error( + instance, err + ): + raise + assert isinstance(err.orig, BaseException) + _LOGGER.info( + "%s; %s failed, retrying", err.orig.args[1], description + ) + time.sleep(instance.db_retry_wait) + # Failed with retryable error + + return wrapper + + return decorator + + def periodic_db_cleanups(instance: Recorder) -> None: """Run any database cleanups that need to happen periodically. diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index 44c3ffac99e..19c7e6c6955 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -69,7 +69,7 @@ async def test_schema_update_calls(recorder_db_url: str, hass: HomeAssistant) -> session_maker = instance.get_session update.assert_has_calls( [ - call(hass, engine, session_maker, version + 1, 0) + call(instance, hass, engine, session_maker, version + 1, 0) for version in range(0, db_schema.SCHEMA_VERSION) ] ) @@ -304,6 +304,8 @@ async def test_schema_migrate( migration_version = None real_migrate_schema = recorder.migration.migrate_schema real_apply_update = recorder.migration._apply_update + real_create_index = recorder.migration._create_index + create_calls = 0 def _create_engine_test(*args, **kwargs): """Test version of create_engine that initializes with old schema. @@ -355,6 +357,17 @@ async def test_schema_migrate( migration_stall.wait() real_apply_update(*args) + def _sometimes_failing_create_index(*args): + """Make the first index create raise a retryable error to ensure we retry.""" + if recorder_db_url.startswith("mysql://"): + nonlocal create_calls + if create_calls < 1: + create_calls += 1 + mysql_exception = OperationalError("statement", {}, []) + mysql_exception.orig = Exception(1205, "retryable") + raise mysql_exception + real_create_index(*args) + with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch( "homeassistant.components.recorder.core.create_engine", new=_create_engine_test, @@ -368,6 +381,11 @@ async def test_schema_migrate( ), patch( "homeassistant.components.recorder.migration._apply_update", wraps=_instrument_apply_update, + ) as apply_update_mock, patch( + "homeassistant.components.recorder.util.time.sleep" + ), patch( + "homeassistant.components.recorder.migration._create_index", + wraps=_sometimes_failing_create_index, ), patch( "homeassistant.components.recorder.Recorder._schedule_compile_missing_statistics", ), patch( @@ -394,12 +412,13 @@ async def test_schema_migrate( assert migration_version == db_schema.SCHEMA_VERSION assert setup_run.called assert recorder.util.async_migration_in_progress(hass) is not True + assert apply_update_mock.called def test_invalid_update(hass: HomeAssistant) -> None: """Test that an invalid new version raises an exception.""" with pytest.raises(ValueError): - migration._apply_update(hass, Mock(), Mock(), -1, 0) + migration._apply_update(Mock(), hass, Mock(), Mock(), -1, 0) @pytest.mark.parametrize( diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index c5ce8d272c7..07c935129e9 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -2,7 +2,7 @@ from datetime import datetime, timedelta import json import sqlite3 -from unittest.mock import MagicMock, patch +from unittest.mock import patch import pytest from sqlalchemy.exc import DatabaseError, OperationalError @@ -192,7 +192,7 @@ async def test_purge_old_states_encounters_temporary_mysql_error( await async_wait_recording_done(hass) mysql_exception = OperationalError("statement", {}, []) - mysql_exception.orig = MagicMock(args=(1205, "retryable")) + mysql_exception.orig = Exception(1205, "retryable") with patch( "homeassistant.components.recorder.util.time.sleep" diff --git a/tests/components/recorder/test_statistics.py b/tests/components/recorder/test_statistics.py index 8685985def8..dd51946c86f 100644 --- a/tests/components/recorder/test_statistics.py +++ b/tests/components/recorder/test_statistics.py @@ -1231,8 +1231,9 @@ def test_delete_duplicates_no_duplicates( """Test removal of duplicated statistics.""" hass = hass_recorder() wait_recording_done(hass) + instance = recorder.get_instance(hass) with session_scope(hass=hass) as session: - delete_statistics_duplicates(hass, session) + delete_statistics_duplicates(instance, hass, session) assert "duplicated statistics rows" not in caplog.text assert "Found non identical" not in caplog.text assert "Found duplicated" not in caplog.text