Handle InnoDB deadlocks during migration (#89073)
* Handle slow InnoDB rollback when encountering duplicates during migration fixes #89069 * adjust * fix mock * tests * return on successpull/89381/head
parent
d4c28a1f4a
commit
c27a69ef85
|
@ -50,7 +50,7 @@ from .tasks import (
|
|||
PostSchemaMigrationTask,
|
||||
StatisticsTimestampMigrationCleanupTask,
|
||||
)
|
||||
from .util import session_scope
|
||||
from .util import database_job_retry_wrapper, session_scope
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from . import Recorder
|
||||
|
@ -158,7 +158,9 @@ def migrate_schema(
|
|||
hass.add_job(instance.async_set_db_ready)
|
||||
new_version = version + 1
|
||||
_LOGGER.info("Upgrading recorder db schema to version %s", new_version)
|
||||
_apply_update(hass, engine, session_maker, new_version, current_version)
|
||||
_apply_update(
|
||||
instance, hass, engine, session_maker, new_version, current_version
|
||||
)
|
||||
with session_scope(session=session_maker()) as session:
|
||||
session.add(SchemaChanges(schema_version=new_version))
|
||||
|
||||
|
@ -508,7 +510,9 @@ def _drop_foreign_key_constraints(
|
|||
)
|
||||
|
||||
|
||||
@database_job_retry_wrapper("Apply migration update", 10)
|
||||
def _apply_update( # noqa: C901
|
||||
instance: Recorder,
|
||||
hass: HomeAssistant,
|
||||
engine: Engine,
|
||||
session_maker: Callable[[], Session],
|
||||
|
@ -922,7 +926,7 @@ def _apply_update( # noqa: C901
|
|||
# There may be duplicated statistics entries, delete duplicates
|
||||
# and try again
|
||||
with session_scope(session=session_maker()) as session:
|
||||
delete_statistics_duplicates(hass, session)
|
||||
delete_statistics_duplicates(instance, hass, session)
|
||||
_migrate_statistics_columns_to_timestamp(session_maker, engine)
|
||||
# Log at error level to ensure the user sees this message in the log
|
||||
# since we logged the error above.
|
||||
|
@ -965,7 +969,7 @@ def post_schema_migration(
|
|||
# since they are no longer used and take up a significant amount of space.
|
||||
assert instance.event_session is not None
|
||||
assert instance.engine is not None
|
||||
_wipe_old_string_time_columns(instance.engine, instance.event_session)
|
||||
_wipe_old_string_time_columns(instance, instance.engine, instance.event_session)
|
||||
if old_version < 35 <= new_version:
|
||||
# In version 34 we migrated all the created, start, and last_reset
|
||||
# columns to be timestamps. In version 34 we need to wipe the old columns
|
||||
|
@ -978,7 +982,10 @@ def _wipe_old_string_statistics_columns(instance: Recorder) -> None:
|
|||
instance.queue_task(StatisticsTimestampMigrationCleanupTask())
|
||||
|
||||
|
||||
def _wipe_old_string_time_columns(engine: Engine, session: Session) -> None:
|
||||
@database_job_retry_wrapper("Wipe old string time columns", 3)
|
||||
def _wipe_old_string_time_columns(
|
||||
instance: Recorder, engine: Engine, session: Session
|
||||
) -> None:
|
||||
"""Wipe old string time columns to save space."""
|
||||
# Wipe Events.time_fired since its been replaced by Events.time_fired_ts
|
||||
# Wipe States.last_updated since its been replaced by States.last_updated_ts
|
||||
|
@ -1162,7 +1169,7 @@ def _migrate_statistics_columns_to_timestamp(
|
|||
"last_reset_ts="
|
||||
"UNIX_TIMESTAMP(last_reset) "
|
||||
"where start_ts is NULL "
|
||||
"LIMIT 250000;"
|
||||
"LIMIT 100000;"
|
||||
)
|
||||
)
|
||||
elif engine.dialect.name == SupportedDialect.POSTGRESQL:
|
||||
|
@ -1180,7 +1187,7 @@ def _migrate_statistics_columns_to_timestamp(
|
|||
"created_ts=EXTRACT(EPOCH FROM created), "
|
||||
"last_reset_ts=EXTRACT(EPOCH FROM last_reset) "
|
||||
"where id IN ( "
|
||||
f"SELECT id FROM {table} where start_ts is NULL LIMIT 250000 "
|
||||
f"SELECT id FROM {table} where start_ts is NULL LIMIT 100000 "
|
||||
" );"
|
||||
)
|
||||
)
|
||||
|
|
|
@ -75,6 +75,7 @@ from .models import (
|
|||
datetime_to_timestamp_or_none,
|
||||
)
|
||||
from .util import (
|
||||
database_job_retry_wrapper,
|
||||
execute,
|
||||
execute_stmt_lambda_element,
|
||||
get_instance,
|
||||
|
@ -515,7 +516,10 @@ def _delete_duplicates_from_table(
|
|||
return (total_deleted_rows, all_non_identical_duplicates)
|
||||
|
||||
|
||||
def delete_statistics_duplicates(hass: HomeAssistant, session: Session) -> None:
|
||||
@database_job_retry_wrapper("delete statistics duplicates", 3)
|
||||
def delete_statistics_duplicates(
|
||||
instance: Recorder, hass: HomeAssistant, session: Session
|
||||
) -> None:
|
||||
"""Identify and delete duplicated statistics.
|
||||
|
||||
A backup will be made of duplicated statistics before it is deleted.
|
||||
|
|
|
@ -568,6 +568,17 @@ def end_incomplete_runs(session: Session, start_time: datetime) -> None:
|
|||
session.add(run)
|
||||
|
||||
|
||||
def _is_retryable_error(instance: Recorder, err: OperationalError) -> bool:
|
||||
"""Return True if the error is retryable."""
|
||||
assert instance.engine is not None
|
||||
return bool(
|
||||
instance.engine.dialect.name == SupportedDialect.MYSQL
|
||||
and isinstance(err.orig, BaseException)
|
||||
and err.orig.args
|
||||
and err.orig.args[0] in RETRYABLE_MYSQL_ERRORS
|
||||
)
|
||||
|
||||
|
||||
_FuncType = Callable[Concatenate[_RecorderT, _P], bool]
|
||||
|
||||
|
||||
|
@ -585,12 +596,8 @@ def retryable_database_job(
|
|||
try:
|
||||
return job(instance, *args, **kwargs)
|
||||
except OperationalError as err:
|
||||
assert instance.engine is not None
|
||||
if (
|
||||
instance.engine.dialect.name == SupportedDialect.MYSQL
|
||||
and err.orig
|
||||
and err.orig.args[0] in RETRYABLE_MYSQL_ERRORS
|
||||
):
|
||||
if _is_retryable_error(instance, err):
|
||||
assert isinstance(err.orig, BaseException)
|
||||
_LOGGER.info(
|
||||
"%s; %s not completed, retrying", err.orig.args[1], description
|
||||
)
|
||||
|
@ -608,6 +615,46 @@ def retryable_database_job(
|
|||
return decorator
|
||||
|
||||
|
||||
_WrappedFuncType = Callable[Concatenate[_RecorderT, _P], None]
|
||||
|
||||
|
||||
def database_job_retry_wrapper(
|
||||
description: str, attempts: int = 5
|
||||
) -> Callable[[_WrappedFuncType[_RecorderT, _P]], _WrappedFuncType[_RecorderT, _P]]:
|
||||
"""Try to execute a database job multiple times.
|
||||
|
||||
This wrapper handles InnoDB deadlocks and lock timeouts.
|
||||
|
||||
This is different from retryable_database_job in that it will retry the job
|
||||
attempts number of times instead of returning False if the job fails.
|
||||
"""
|
||||
|
||||
def decorator(
|
||||
job: _WrappedFuncType[_RecorderT, _P]
|
||||
) -> _WrappedFuncType[_RecorderT, _P]:
|
||||
@functools.wraps(job)
|
||||
def wrapper(instance: _RecorderT, *args: _P.args, **kwargs: _P.kwargs) -> None:
|
||||
for attempt in range(attempts):
|
||||
try:
|
||||
job(instance, *args, **kwargs)
|
||||
return
|
||||
except OperationalError as err:
|
||||
if attempt == attempts - 1 or not _is_retryable_error(
|
||||
instance, err
|
||||
):
|
||||
raise
|
||||
assert isinstance(err.orig, BaseException)
|
||||
_LOGGER.info(
|
||||
"%s; %s failed, retrying", err.orig.args[1], description
|
||||
)
|
||||
time.sleep(instance.db_retry_wait)
|
||||
# Failed with retryable error
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def periodic_db_cleanups(instance: Recorder) -> None:
|
||||
"""Run any database cleanups that need to happen periodically.
|
||||
|
||||
|
|
|
@ -69,7 +69,7 @@ async def test_schema_update_calls(recorder_db_url: str, hass: HomeAssistant) ->
|
|||
session_maker = instance.get_session
|
||||
update.assert_has_calls(
|
||||
[
|
||||
call(hass, engine, session_maker, version + 1, 0)
|
||||
call(instance, hass, engine, session_maker, version + 1, 0)
|
||||
for version in range(0, db_schema.SCHEMA_VERSION)
|
||||
]
|
||||
)
|
||||
|
@ -304,6 +304,8 @@ async def test_schema_migrate(
|
|||
migration_version = None
|
||||
real_migrate_schema = recorder.migration.migrate_schema
|
||||
real_apply_update = recorder.migration._apply_update
|
||||
real_create_index = recorder.migration._create_index
|
||||
create_calls = 0
|
||||
|
||||
def _create_engine_test(*args, **kwargs):
|
||||
"""Test version of create_engine that initializes with old schema.
|
||||
|
@ -355,6 +357,17 @@ async def test_schema_migrate(
|
|||
migration_stall.wait()
|
||||
real_apply_update(*args)
|
||||
|
||||
def _sometimes_failing_create_index(*args):
|
||||
"""Make the first index create raise a retryable error to ensure we retry."""
|
||||
if recorder_db_url.startswith("mysql://"):
|
||||
nonlocal create_calls
|
||||
if create_calls < 1:
|
||||
create_calls += 1
|
||||
mysql_exception = OperationalError("statement", {}, [])
|
||||
mysql_exception.orig = Exception(1205, "retryable")
|
||||
raise mysql_exception
|
||||
real_create_index(*args)
|
||||
|
||||
with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch(
|
||||
"homeassistant.components.recorder.core.create_engine",
|
||||
new=_create_engine_test,
|
||||
|
@ -368,6 +381,11 @@ async def test_schema_migrate(
|
|||
), patch(
|
||||
"homeassistant.components.recorder.migration._apply_update",
|
||||
wraps=_instrument_apply_update,
|
||||
) as apply_update_mock, patch(
|
||||
"homeassistant.components.recorder.util.time.sleep"
|
||||
), patch(
|
||||
"homeassistant.components.recorder.migration._create_index",
|
||||
wraps=_sometimes_failing_create_index,
|
||||
), patch(
|
||||
"homeassistant.components.recorder.Recorder._schedule_compile_missing_statistics",
|
||||
), patch(
|
||||
|
@ -394,12 +412,13 @@ async def test_schema_migrate(
|
|||
assert migration_version == db_schema.SCHEMA_VERSION
|
||||
assert setup_run.called
|
||||
assert recorder.util.async_migration_in_progress(hass) is not True
|
||||
assert apply_update_mock.called
|
||||
|
||||
|
||||
def test_invalid_update(hass: HomeAssistant) -> None:
|
||||
"""Test that an invalid new version raises an exception."""
|
||||
with pytest.raises(ValueError):
|
||||
migration._apply_update(hass, Mock(), Mock(), -1, 0)
|
||||
migration._apply_update(Mock(), hass, Mock(), Mock(), -1, 0)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
from datetime import datetime, timedelta
|
||||
import json
|
||||
import sqlite3
|
||||
from unittest.mock import MagicMock, patch
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.exc import DatabaseError, OperationalError
|
||||
|
@ -192,7 +192,7 @@ async def test_purge_old_states_encounters_temporary_mysql_error(
|
|||
await async_wait_recording_done(hass)
|
||||
|
||||
mysql_exception = OperationalError("statement", {}, [])
|
||||
mysql_exception.orig = MagicMock(args=(1205, "retryable"))
|
||||
mysql_exception.orig = Exception(1205, "retryable")
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.recorder.util.time.sleep"
|
||||
|
|
|
@ -1231,8 +1231,9 @@ def test_delete_duplicates_no_duplicates(
|
|||
"""Test removal of duplicated statistics."""
|
||||
hass = hass_recorder()
|
||||
wait_recording_done(hass)
|
||||
instance = recorder.get_instance(hass)
|
||||
with session_scope(hass=hass) as session:
|
||||
delete_statistics_duplicates(hass, session)
|
||||
delete_statistics_duplicates(instance, hass, session)
|
||||
assert "duplicated statistics rows" not in caplog.text
|
||||
assert "Found non identical" not in caplog.text
|
||||
assert "Found duplicated" not in caplog.text
|
||||
|
|
Loading…
Reference in New Issue