diff --git a/tests/components/recorder/conftest.py b/tests/components/recorder/conftest.py index 1a3c25ec727..b0e648befcf 100644 --- a/tests/components/recorder/conftest.py +++ b/tests/components/recorder/conftest.py @@ -1,11 +1,15 @@ """Fixtures for the recorder component tests.""" -from collections.abc import Generator -from unittest.mock import patch +from dataclasses import dataclass +import threading +from unittest.mock import Mock, patch import pytest +from typing_extensions import AsyncGenerator, Generator from homeassistant.components import recorder +from homeassistant.components.recorder import db_schema +from homeassistant.components.recorder.util import session_scope from homeassistant.core import HomeAssistant @@ -46,3 +50,70 @@ def recorder_dialect_name(hass: HomeAssistant, db_engine: str) -> Generator[None "homeassistant.components.recorder.Recorder.dialect_name", db_engine ): yield + + +@dataclass(slots=True) +class InstrumentedMigration: + """Container to aid controlling migration progress.""" + + migration_done: threading.Event + migration_stall: threading.Event + migration_started: threading.Event + migration_version: int | None + apply_update_mock: Mock + + +@pytest.fixture +async def instrument_migration( + hass: HomeAssistant, +) -> AsyncGenerator[InstrumentedMigration]: + """Instrument recorder migration.""" + + real_migrate_schema = recorder.migration.migrate_schema + real_apply_update = recorder.migration._apply_update + + def _instrument_migrate_schema(*args): + """Control migration progress and check results.""" + instrumented_migration.migration_started.set() + + try: + real_migrate_schema(*args) + except Exception: + instrumented_migration.migration_done.set() + raise + + # Check and report the outcome of the migration; if migration fails + # the recorder will silently create a new database. + with session_scope(hass=hass, read_only=True) as session: + res = ( + session.query(db_schema.SchemaChanges) + .order_by(db_schema.SchemaChanges.change_id.desc()) + .first() + ) + instrumented_migration.migration_version = res.schema_version + instrumented_migration.migration_done.set() + + def _instrument_apply_update(*args): + """Control migration progress.""" + instrumented_migration.migration_stall.wait() + real_apply_update(*args) + + with ( + patch( + "homeassistant.components.recorder.migration.migrate_schema", + wraps=_instrument_migrate_schema, + ), + patch( + "homeassistant.components.recorder.migration._apply_update", + wraps=_instrument_apply_update, + ) as apply_update_mock, + ): + instrumented_migration = InstrumentedMigration( + migration_done=threading.Event(), + migration_stall=threading.Event(), + migration_started=threading.Event(), + migration_version=None, + apply_update_mock=apply_update_mock, + ) + + yield instrumented_migration diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index 423462f333f..cd9650779b5 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -4,7 +4,6 @@ import datetime import importlib import sqlite3 import sys -import threading from unittest.mock import Mock, PropertyMock, call, patch import pytest @@ -33,6 +32,7 @@ from homeassistant.helpers import recorder as recorder_helper import homeassistant.util.dt as dt_util from .common import async_wait_recording_done, create_engine_test +from .conftest import InstrumentedMigration from tests.common import async_fire_time_changed from tests.typing import RecorderInstanceGenerator @@ -91,6 +91,7 @@ async def test_migration_in_progress( hass: HomeAssistant, recorder_db_url: str, async_setup_recorder_instance: RecorderInstanceGenerator, + instrument_migration: InstrumentedMigration, ) -> None: """Test that we can check for migration in progress.""" if recorder_db_url.startswith("mysql://"): @@ -110,8 +111,11 @@ async def test_migration_in_progress( ), ): await async_setup_recorder_instance(hass, wait_recorder=False) - await recorder.get_instance(hass).async_migration_event.wait() + await hass.async_add_executor_job(instrument_migration.migration_started.wait) assert recorder.util.async_migration_in_progress(hass) is True + + # Let migration finish + instrument_migration.migration_stall.set() await async_wait_recording_done(hass) assert recorder.util.async_migration_in_progress(hass) is False @@ -235,7 +239,9 @@ async def test_database_migration_encounters_corruption_not_sqlite( async def test_events_during_migration_are_queued( - hass: HomeAssistant, async_setup_recorder_instance: RecorderInstanceGenerator + hass: HomeAssistant, + async_setup_recorder_instance: RecorderInstanceGenerator, + instrument_migration: InstrumentedMigration, ) -> None: """Test that events during migration are queued.""" @@ -247,13 +253,20 @@ async def test_events_during_migration_are_queued( new=create_engine_test, ), ): - await async_setup_recorder_instance(hass, {"commit_interval": 0}) + await async_setup_recorder_instance( + hass, {"commit_interval": 0}, wait_recorder=False + ) + await hass.async_add_executor_job(instrument_migration.migration_started.wait) + assert recorder.util.async_migration_in_progress(hass) is True hass.states.async_set("my.entity", "on", {}) hass.states.async_set("my.entity", "off", {}) await hass.async_block_till_done() async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=2)) await hass.async_block_till_done() async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=4)) + + # Let migration finish + instrument_migration.migration_stall.set() await recorder.get_instance(hass).async_recorder_ready.wait() await async_wait_recording_done(hass) @@ -265,7 +278,9 @@ async def test_events_during_migration_are_queued( async def test_events_during_migration_queue_exhausted( - hass: HomeAssistant, async_setup_recorder_instance: RecorderInstanceGenerator + hass: HomeAssistant, + async_setup_recorder_instance: RecorderInstanceGenerator, + instrument_migration: InstrumentedMigration, ) -> None: """Test that events during migration takes so long the queue is exhausted.""" @@ -282,6 +297,8 @@ async def test_events_during_migration_queue_exhausted( await async_setup_recorder_instance( hass, {"commit_interval": 0}, wait_recorder=False ) + await hass.async_add_executor_job(instrument_migration.migration_started.wait) + assert recorder.util.async_migration_in_progress(hass) is True hass.states.async_set("my.entity", "on", {}) await hass.async_block_till_done() async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=2)) @@ -289,6 +306,9 @@ async def test_events_during_migration_queue_exhausted( async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=4)) await hass.async_block_till_done() hass.states.async_set("my.entity", "off", {}) + + # Let migration finish + instrument_migration.migration_stall.set() await recorder.get_instance(hass).async_recorder_ready.wait() await async_wait_recording_done(hass) @@ -313,6 +333,7 @@ async def test_schema_migrate( hass: HomeAssistant, recorder_db_url: str, async_setup_recorder_instance: RecorderInstanceGenerator, + instrument_migration: InstrumentedMigration, start_version, live, ) -> None: @@ -323,11 +344,6 @@ async def test_schema_migrate( inspection could quickly become quite cumbersome. """ - migration_done = threading.Event() - migration_stall = threading.Event() - migration_version = None - real_migrate_schema = recorder.migration.migrate_schema - real_apply_update = recorder.migration._apply_update real_create_index = recorder.migration._create_index create_calls = 0 @@ -354,33 +370,6 @@ async def test_schema_migrate( start=self.recorder_runs_manager.recording_start, created=dt_util.utcnow() ) - def _instrument_migrate_schema(*args): - """Control migration progress and check results.""" - nonlocal migration_done - nonlocal migration_version - try: - real_migrate_schema(*args) - except Exception: - migration_done.set() - raise - - # Check and report the outcome of the migration; if migration fails - # the recorder will silently create a new database. - with session_scope(hass=hass, read_only=True) as session: - res = ( - session.query(db_schema.SchemaChanges) - .order_by(db_schema.SchemaChanges.change_id.desc()) - .first() - ) - migration_version = res.schema_version - migration_done.set() - - def _instrument_apply_update(*args): - """Control migration progress.""" - nonlocal migration_stall - migration_stall.wait() - real_apply_update(*args) - def _sometimes_failing_create_index(*args): """Make the first index create raise a retryable error to ensure we retry.""" if recorder_db_url.startswith("mysql://"): @@ -402,14 +391,6 @@ async def test_schema_migrate( side_effect=_mock_setup_run, autospec=True, ) as setup_run, - patch( - "homeassistant.components.recorder.migration.migrate_schema", - wraps=_instrument_migrate_schema, - ), - patch( - "homeassistant.components.recorder.migration._apply_update", - wraps=_instrument_apply_update, - ) as apply_update_mock, patch("homeassistant.components.recorder.util.time.sleep"), patch( "homeassistant.components.recorder.migration._create_index", @@ -426,18 +407,20 @@ async def test_schema_migrate( ), ): await async_setup_recorder_instance(hass, wait_recorder=False) + await hass.async_add_executor_job(instrument_migration.migration_started.wait) + assert recorder.util.async_migration_in_progress(hass) is True await recorder_helper.async_wait_recorder(hass) assert recorder.util.async_migration_in_progress(hass) is True assert recorder.util.async_migration_is_live(hass) == live - migration_stall.set() + instrument_migration.migration_stall.set() await hass.async_block_till_done() - await hass.async_add_executor_job(migration_done.wait) + await hass.async_add_executor_job(instrument_migration.migration_done.wait) await async_wait_recording_done(hass) - assert migration_version == db_schema.SCHEMA_VERSION + assert instrument_migration.migration_version == db_schema.SCHEMA_VERSION assert setup_run.called assert recorder.util.async_migration_in_progress(hass) is not True - assert apply_update_mock.called + assert instrument_migration.apply_update_mock.called def test_invalid_update(hass: HomeAssistant) -> None: diff --git a/tests/components/recorder/test_websocket_api.py b/tests/components/recorder/test_websocket_api.py index cc187a1e6ad..508848b9cc7 100644 --- a/tests/components/recorder/test_websocket_api.py +++ b/tests/components/recorder/test_websocket_api.py @@ -3,7 +3,6 @@ import datetime from datetime import timedelta from statistics import fmean -import threading from unittest.mock import ANY, patch from freezegun import freeze_time @@ -37,9 +36,18 @@ from .common import ( do_adhoc_statistics, statistics_during_period, ) +from .conftest import InstrumentedMigration from tests.common import async_fire_time_changed -from tests.typing import WebSocketGenerator +from tests.typing import RecorderInstanceGenerator, WebSocketGenerator + + +@pytest.fixture +async def mock_recorder_before_hass( + async_setup_recorder_instance: RecorderInstanceGenerator, +) -> None: + """Set up recorder.""" + DISTANCE_SENSOR_FT_ATTRIBUTES = { "device_class": "distance", @@ -2493,70 +2501,56 @@ async def test_recorder_info_no_instance( async def test_recorder_info_migration_queue_exhausted( - hass: HomeAssistant, hass_ws_client: WebSocketGenerator + hass: HomeAssistant, + hass_ws_client: WebSocketGenerator, + async_test_recorder: RecorderInstanceGenerator, + instrument_migration: InstrumentedMigration, ) -> None: """Test getting recorder status when recorder queue is exhausted.""" assert recorder.util.async_migration_in_progress(hass) is False - migration_done = threading.Event() - - real_migration = recorder.migration._apply_update - - def stalled_migration(*args): - """Make migration stall.""" - nonlocal migration_done - migration_done.wait() - return real_migration(*args) - with ( - patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), - patch("homeassistant.components.recorder.Recorder.async_periodic_statistics"), patch( "homeassistant.components.recorder.core.create_engine", new=create_engine_test, ), patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1), patch.object(recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 0), - patch( - "homeassistant.components.recorder.migration._apply_update", - wraps=stalled_migration, - ), ): - recorder_helper.async_initialize_recorder(hass) - hass.create_task( - async_setup_component( - hass, "recorder", {"recorder": {"db_url": "sqlite://"}} + async with async_test_recorder(hass, wait_recorder=False): + await hass.async_add_executor_job( + instrument_migration.migration_started.wait ) - ) - await recorder_helper.async_wait_recorder(hass) - hass.states.async_set("my.entity", "on", {}) - await hass.async_block_till_done() + assert recorder.util.async_migration_in_progress(hass) is True + await recorder_helper.async_wait_recorder(hass) + hass.states.async_set("my.entity", "on", {}) + await hass.async_block_till_done() - # Detect queue full - async_fire_time_changed(hass, dt_util.utcnow() + timedelta(hours=2)) - await hass.async_block_till_done() + # Detect queue full + async_fire_time_changed(hass, dt_util.utcnow() + timedelta(hours=2)) + await hass.async_block_till_done() - client = await hass_ws_client() + client = await hass_ws_client() - # Check the status - await client.send_json_auto_id({"type": "recorder/info"}) - response = await client.receive_json() - assert response["success"] - assert response["result"]["migration_in_progress"] is True - assert response["result"]["recording"] is False - assert response["result"]["thread_running"] is True + # Check the status + await client.send_json_auto_id({"type": "recorder/info"}) + response = await client.receive_json() + assert response["success"] + assert response["result"]["migration_in_progress"] is True + assert response["result"]["recording"] is False + assert response["result"]["thread_running"] is True - # Let migration finish - migration_done.set() - await async_wait_recording_done(hass) + # Let migration finish + instrument_migration.migration_stall.set() + await async_wait_recording_done(hass) - # Check the status after migration finished - await client.send_json_auto_id({"type": "recorder/info"}) - response = await client.receive_json() - assert response["success"] - assert response["result"]["migration_in_progress"] is False - assert response["result"]["recording"] is True - assert response["result"]["thread_running"] is True + # Check the status after migration finished + await client.send_json_auto_id({"type": "recorder/info"}) + response = await client.receive_json() + assert response["success"] + assert response["result"]["migration_in_progress"] is False + assert response["result"]["recording"] is True + assert response["result"]["thread_running"] is True async def test_backup_start_no_recorder(