From 9368891b1baef0e38be2a8aa9436c1dc2623bde3 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 11 Apr 2021 20:43:54 -1000 Subject: [PATCH] Live db migrations and recovery (#49036) Co-authored-by: Paulus Schoutsen --- homeassistant/components/recorder/__init__.py | 409 +++++++++++------- .../components/recorder/migration.py | 31 +- homeassistant/components/recorder/purge.py | 5 +- homeassistant/components/recorder/util.py | 41 +- tests/components/recorder/test_init.py | 213 +++++++-- tests/components/recorder/test_migrate.py | 164 ++++++- tests/components/recorder/test_purge.py | 36 ++ tests/components/recorder/test_util.py | 97 +---- 8 files changed, 684 insertions(+), 312 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index f93d965a4b9..10b987b04f7 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -3,7 +3,7 @@ from __future__ import annotations import asyncio import concurrent.futures -from datetime import datetime +from datetime import datetime, timedelta import logging import queue import sqlite3 @@ -12,6 +12,7 @@ import time from typing import Any, Callable, NamedTuple from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select +from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.pool import StaticPool import voluptuous as vol @@ -20,7 +21,7 @@ from homeassistant.components import persistent_notification from homeassistant.const import ( ATTR_ENTITY_ID, CONF_EXCLUDE, - EVENT_HOMEASSISTANT_START, + EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, EVENT_TIME_CHANGED, @@ -33,6 +34,7 @@ from homeassistant.helpers.entityfilter import ( INCLUDE_EXCLUDE_FILTER_SCHEMA_INNER, convert_include_exclude_filter, ) +from homeassistant.helpers.event import async_track_time_interval, track_time_change from homeassistant.helpers.typing import ConfigType import homeassistant.util.dt as dt_util @@ -56,6 +58,8 @@ ATTR_KEEP_DAYS = "keep_days" ATTR_REPACK = "repack" ATTR_APPLY_FILTER = "apply_filter" +MAX_QUEUE_BACKLOG = 30000 + SERVICE_PURGE_SCHEMA = vol.Schema( { vol.Optional(ATTR_KEEP_DAYS): cv.positive_int, @@ -99,6 +103,7 @@ CONFIG_SCHEMA = vol.Schema( { vol.Optional(DOMAIN, default=dict): vol.All( cv.deprecated(CONF_PURGE_INTERVAL), + cv.deprecated(CONF_DB_INTEGRITY_CHECK), FILTER_SCHEMA.extend( { vol.Optional(CONF_AUTO_PURGE, default=True): cv.boolean, @@ -176,11 +181,9 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: commit_interval = conf[CONF_COMMIT_INTERVAL] db_max_retries = conf[CONF_DB_MAX_RETRIES] db_retry_wait = conf[CONF_DB_RETRY_WAIT] - db_integrity_check = conf[CONF_DB_INTEGRITY_CHECK] - - db_url = conf.get(CONF_DB_URL) - if not db_url: - db_url = DEFAULT_URL.format(hass_config_path=hass.config.path(DEFAULT_DB_FILE)) + db_url = conf.get(CONF_DB_URL) or DEFAULT_URL.format( + hass_config_path=hass.config.path(DEFAULT_DB_FILE) + ) exclude = conf[CONF_EXCLUDE] exclude_t = exclude.get(CONF_EVENT_TYPES, []) instance = hass.data[DATA_INSTANCE] = Recorder( @@ -193,10 +196,17 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: db_retry_wait=db_retry_wait, entity_filter=entity_filter, exclude_t=exclude_t, - db_integrity_check=db_integrity_check, ) instance.async_initialize() instance.start() + _async_register_services(hass, instance) + + return await instance.async_db_ready + + +@callback +def _async_register_services(hass, instance): + """Register recorder services.""" async def async_handle_purge_service(service): """Handle calls to the purge service.""" @@ -223,8 +233,6 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: schema=SERVICE_DISABLE_SCHEMA, ) - return await instance.async_db_ready - class PurgeTask(NamedTuple): """Object to store information about purge task.""" @@ -252,7 +260,6 @@ class Recorder(threading.Thread): db_retry_wait: int, entity_filter: Callable[[str], bool], exclude_t: list[str], - db_integrity_check: bool, ) -> None: """Initialize the recorder.""" threading.Thread.__init__(self, name="Recorder") @@ -266,8 +273,8 @@ class Recorder(threading.Thread): self.db_url = uri self.db_max_retries = db_max_retries self.db_retry_wait = db_retry_wait - self.db_integrity_check = db_integrity_check self.async_db_ready = asyncio.Future() + self.async_recorder_ready = asyncio.Event() self._queue_watch = threading.Event() self.engine: Any = None self.run_info: Any = None @@ -283,6 +290,9 @@ class Recorder(threading.Thread): self.event_session = None self.get_session = None self._completed_database_setup = None + self._event_listener = None + + self._queue_watcher = None self.enabled = True @@ -293,9 +303,37 @@ class Recorder(threading.Thread): @callback def async_initialize(self): """Initialize the recorder.""" - self.hass.bus.async_listen( + self._event_listener = self.hass.bus.async_listen( MATCH_ALL, self.event_listener, event_filter=self._async_event_filter ) + self._queue_watcher = async_track_time_interval( + self.hass, self._async_check_queue, timedelta(minutes=10) + ) + + @callback + def _async_check_queue(self, *_): + """Periodic check of the queue size to ensure we do not exaust memory. + + The queue grows during migraton or if something really goes wrong. + """ + size = self.queue.qsize() + _LOGGER.debug("Recorder queue size is: %s", size) + if self.queue.qsize() <= MAX_QUEUE_BACKLOG: + return + _LOGGER.error( + "The recorder queue reached the maximum size of %s; Events are no longer being recorded", + MAX_QUEUE_BACKLOG, + ) + self._stop_queue_watcher_and_event_listener() + + def _stop_queue_watcher_and_event_listener(self): + """Stop watching the queue.""" + if self._queue_watcher: + self._queue_watcher() + self._queue_watcher = None + if self._event_listener: + self._event_listener() + self._event_listener = None @callback def _async_event_filter(self, event): @@ -314,89 +352,152 @@ class Recorder(threading.Thread): self.queue.put(PurgeTask(keep_days, repack, apply_filter)) - def run(self): - """Start processing events to save.""" + @callback + def async_register(self, shutdown_task, hass_started): + """Post connection initialize.""" - if not self._setup_recorder(): + def shutdown(event): + """Shut down the Recorder.""" + if not hass_started.done(): + hass_started.set_result(shutdown_task) + self.queue.put(None) + self.join() + + self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) + + if self.hass.state == CoreState.running: + hass_started.set_result(None) return + @callback + def async_hass_started(event): + """Notify that hass has started.""" + hass_started.set_result(None) + + self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, async_hass_started) + + @callback + def async_connection_failed(self): + """Connect failed tasks.""" + self.async_db_ready.set_result(False) + persistent_notification.async_create( + self.hass, + "The recorder could not start, check [the logs](/config/logs)", + "Recorder", + ) + self._stop_queue_watcher_and_event_listener() + + @callback + def async_connection_success(self): + """Connect success tasks.""" + self.async_db_ready.set_result(True) + + @callback + def _async_recorder_ready(self): + """Mark recorder ready.""" + self.async_recorder_ready.set() + + @callback + def async_purge(self, now): + """Trigger the purge.""" + self.queue.put(PurgeTask(self.keep_days, repack=False, apply_filter=False)) + + def run(self): + """Start processing events to save.""" shutdown_task = object() hass_started = concurrent.futures.Future() - @callback - def register(): - """Post connection initialize.""" - self.async_db_ready.set_result(True) + self.hass.add_job(self.async_register, shutdown_task, hass_started) - def shutdown(event): - """Shut down the Recorder.""" - if not hass_started.done(): - hass_started.set_result(shutdown_task) - self.queue.put(None) - self.join() + current_version = self._setup_recorder() - self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) + if current_version is None: + self.hass.add_job(self.async_connection_failed) + return - if self.hass.state == CoreState.running: - hass_started.set_result(None) - else: + schema_is_current = migration.schema_is_current(current_version) + if schema_is_current: + self._setup_run() - @callback - def notify_hass_started(event): - """Notify that hass has started.""" - hass_started.set_result(None) - - self.hass.bus.async_listen_once( - EVENT_HOMEASSISTANT_START, notify_hass_started - ) - - self.hass.add_job(register) - result = hass_started.result() + self.hass.add_job(self.async_connection_success) # If shutdown happened before Home Assistant finished starting - if result is shutdown_task: + if hass_started.result() is shutdown_task: # Make sure we cleanly close the run if # we restart before startup finishes self._shutdown() return - # Start periodic purge - if self.auto_purge: - - @callback - def async_purge(now): - """Trigger the purge.""" - self.queue.put( - PurgeTask(self.keep_days, repack=False, apply_filter=False) + # We wait to start the migration until startup has finished + # since it can be cpu intensive and we do not want it to compete + # with startup which is also cpu intensive + if not schema_is_current: + if self._migrate_schema_and_setup_run(current_version): + if not self._event_listener: + # If the schema migration takes so longer that the end + # queue watcher safety kicks in because MAX_QUEUE_BACKLOG + # is reached, we need to reinitialize the listener. + self.hass.add_job(self.async_initialize) + else: + persistent_notification.create( + self.hass, + "The database migration failed, check [the logs](/config/logs)." + "Database Migration Failed", + "recorder_database_migration", ) - - # Purge every night at 4:12am - self.hass.helpers.event.track_time_change( - async_purge, hour=4, minute=12, second=0 - ) - - _LOGGER.debug("Recorder processing the queue") - # Use a session for the event read loop - # with a commit every time the event time - # has changed. This reduces the disk io. - while True: - event = self.queue.get() - - if event is None: self._shutdown() return - self._process_one_event(event) + # Start periodic purge + if self.auto_purge: + # Purge every night at 4:12am + track_time_change(self.hass, self.async_purge, hour=4, minute=12, second=0) - def _setup_recorder(self) -> bool: - """Create schema and connect to the database.""" + _LOGGER.debug("Recorder processing the queue") + self.hass.add_job(self._async_recorder_ready) + self._run_event_loop() + + def _run_event_loop(self): + """Run the event loop for the recorder.""" + # Use a session for the event read loop + # with a commit every time the event time + # has changed. This reduces the disk io. + while event := self.queue.get(): + try: + self._process_one_event_or_recover(event) + except Exception as err: # pylint: disable=broad-except + _LOGGER.exception("Error while processing event %s: %s", event, err) + + self._shutdown() + + def _process_one_event_or_recover(self, event): + """Process an event, reconnect, or recover a malformed database.""" + try: + self._process_one_event(event) + return + except exc.DatabaseError as err: + if self._handle_database_error(err): + return + _LOGGER.exception( + "Unhandled database error while processing event %s: %s", event, err + ) + except SQLAlchemyError as err: + _LOGGER.exception( + "SQLAlchemyError error processing event %s: %s", event, err + ) + + # Reset the session if an SQLAlchemyError (including DatabaseError) + # happens to rollback and recover + self._reopen_event_session() + + def _setup_recorder(self) -> None | int: + """Create connect to the database and get the schema version.""" tries = 1 while tries <= self.db_max_retries: try: self._setup_connection() - migration.migrate_schema(self) - self._setup_run() + return migration.get_schema_version(self) except Exception as err: # pylint: disable=broad-except _LOGGER.exception( "Error during connection setup to %s: %s (retrying in %s seconds)", @@ -404,37 +505,47 @@ class Recorder(threading.Thread): err, self.db_retry_wait, ) - else: - _LOGGER.debug("Connected to recorder database") - self._open_event_session() - return True - tries += 1 time.sleep(self.db_retry_wait) - @callback - def connection_failed(): - """Connect failed tasks.""" - self.async_db_ready.set_result(False) - persistent_notification.async_create( - self.hass, - "The recorder could not start, please check the log", - "Recorder", - ) + return None - self.hass.add_job(connection_failed) - return False + def _migrate_schema_and_setup_run(self, current_version) -> bool: + """Migrate schema to the latest version.""" + persistent_notification.create( + self.hass, + "System performance will temporarily degrade during the database upgrade. Do not power down or restart the system until the upgrade completes. Integrations that read the database, such as logbook and history, may return inconsistent results until the upgrade completes.", + "Database upgrade in progress", + "recorder_database_migration", + ) + + try: + migration.migrate_schema(self, current_version) + except exc.DatabaseError as err: + if self._handle_database_error(err): + return True + _LOGGER.exception("Database error during schema migration") + return False + except Exception: # pylint: disable=broad-except + _LOGGER.exception("Error during schema migration") + return False + else: + self._setup_run() + return True + finally: + persistent_notification.dismiss(self.hass, "recorder_database_migration") + + def _run_purge(self, keep_days, repack, apply_filter): + """Purge the database.""" + if purge.purge_old_data(self, keep_days, repack, apply_filter): + return + # Schedule a new purge task if this one didn't finish + self.queue.put(PurgeTask(keep_days, repack, apply_filter)) def _process_one_event(self, event): """Process one event.""" if isinstance(event, PurgeTask): - # Schedule a new purge task if this one didn't finish - if not purge.purge_old_data( - self, event.keep_days, event.repack, event.apply_filter - ): - self.queue.put( - PurgeTask(event.keep_days, event.repack, event.apply_filter) - ) + self._run_purge(event.keep_days, event.repack, event.apply_filter) return if isinstance(event, WaitTask): self._queue_watch.set() @@ -448,7 +559,7 @@ class Recorder(threading.Thread): self._timechanges_seen += 1 if self._timechanges_seen >= self.commit_interval: self._timechanges_seen = 0 - self._commit_event_session_or_recover() + self._commit_event_session_or_retry() return if not self.enabled: @@ -464,10 +575,6 @@ class Recorder(threading.Thread): except (TypeError, ValueError): _LOGGER.warning("Event is not JSON serializable: %s", event) return - except Exception as err: # pylint: disable=broad-except - # Must catch the exception to prevent the loop from collapsing - _LOGGER.exception("Error adding event: %s", err) - return if event.event_type == EVENT_STATE_CHANGED: try: @@ -492,34 +599,21 @@ class Recorder(threading.Thread): "State is not JSON serializable: %s", event.data.get("new_state"), ) - except Exception as err: # pylint: disable=broad-except - # Must catch the exception to prevent the loop from collapsing - _LOGGER.exception("Error adding state change: %s", err) # If they do not have a commit interval # than we commit right away if not self.commit_interval: - self._commit_event_session_or_recover() - - def _commit_event_session_or_recover(self): - """Commit changes to the database and recover if the database fails when possible.""" - try: self._commit_event_session_or_retry() - return - except exc.DatabaseError as err: - if isinstance(err.__cause__, sqlite3.DatabaseError): - _LOGGER.exception( - "Unrecoverable sqlite3 database corruption detected: %s", err - ) - self._handle_sqlite_corruption() - return - _LOGGER.exception("Unexpected error saving events: %s", err) - except Exception as err: # pylint: disable=broad-except - # Must catch the exception to prevent the loop from collapsing - _LOGGER.exception("Unexpected error saving events: %s", err) - self._reopen_event_session() - return + def _handle_database_error(self, err): + """Handle a database error that may result in moving away the corrupt db.""" + if isinstance(err.__cause__, sqlite3.DatabaseError): + _LOGGER.exception( + "Unrecoverable sqlite3 database corruption detected: %s", err + ) + self._handle_sqlite_corruption() + return True + return False def _commit_event_session_or_retry(self): tries = 1 @@ -566,44 +660,41 @@ class Recorder(threading.Thread): def _handle_sqlite_corruption(self): """Handle the sqlite3 database being corrupt.""" + self._close_event_session() self._close_connection() move_away_broken_database(dburl_to_path(self.db_url)) self._setup_recorder() + self._setup_run() - def _reopen_event_session(self): - """Rollback the event session and reopen it after a failure.""" + def _close_event_session(self): + """Close the event session.""" self._old_states = {} + if not self.event_session: + return + try: self.event_session.rollback() self.event_session.close() - except Exception as err: # pylint: disable=broad-except - # Must catch the exception to prevent the loop from collapsing + except SQLAlchemyError as err: _LOGGER.exception( "Error while rolling back and closing the event session: %s", err ) + def _reopen_event_session(self): + """Rollback the event session and reopen it after a failure.""" + self._close_event_session() self._open_event_session() def _open_event_session(self): """Open the event session.""" - try: - self.event_session = self.get_session() - self.event_session.expire_on_commit = False - except Exception as err: # pylint: disable=broad-except - _LOGGER.exception("Error while creating new event session: %s", err) + self.event_session = self.get_session() + self.event_session.expire_on_commit = False def _send_keep_alive(self): - try: - _LOGGER.debug("Sending keepalive") - self.event_session.connection().scalar(select([1])) - return - except Exception as err: # pylint: disable=broad-except - _LOGGER.error( - "Error in database connectivity during keepalive: %s", - err, - ) - self._reopen_event_session() + """Send a keep alive to keep the db connection open.""" + _LOGGER.debug("Sending keepalive") + self.event_session.connection().scalar(select([1])) @callback def event_listener(self, event): @@ -663,20 +754,7 @@ class Recorder(threading.Thread): kwargs["echo"] = False if self._using_file_sqlite: - with self.hass.timeout.freeze(DOMAIN): - # - # Here we run an sqlite3 quick_check. In the majority - # of cases, the quick_check takes under 10 seconds. - # - # On systems with very large databases and - # very slow disk or cpus, this can take a while. - # - validate_or_move_away_sqlite_database( - self.db_url, self.db_integrity_check - ) - - if self.engine is not None: - self.engine.dispose() + validate_or_move_away_sqlite_database(self.db_url) self.engine = create_engine(self.db_url, **kwargs) @@ -684,6 +762,7 @@ class Recorder(threading.Thread): Base.metadata.create_all(self.engine) self.get_session = scoped_session(sessionmaker(bind=self.engine)) + _LOGGER.debug("Connected to recorder database") @property def _using_file_sqlite(self): @@ -716,18 +795,24 @@ class Recorder(threading.Thread): session.flush() session.expunge(self.run_info) - def _shutdown(self): - """Save end time for current run.""" - if self.event_session is not None: + self._open_event_session() + + def _end_session(self): + """End the recorder session.""" + if self.event_session is None: + return + try: self.run_info.end = dt_util.utcnow() self.event_session.add(self.run_info) - try: - self._commit_event_session_or_retry() - self.event_session.close() - except Exception as err: # pylint: disable=broad-except - _LOGGER.exception( - "Error saving the event session during shutdown: %s", err - ) + self._commit_event_session_or_retry() + self.event_session.close() + except Exception as err: # pylint: disable=broad-except + _LOGGER.exception("Error saving the event session during shutdown: %s", err) self.run_info = None + + def _shutdown(self): + """Save end time for current run.""" + self._stop_queue_watcher_and_event_listener() + self._end_session() self._close_connection() diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index fa93f615561..5f138d01f17 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -11,15 +11,14 @@ from sqlalchemy.exc import ( ) from sqlalchemy.schema import AddConstraint, DropConstraint -from .const import DOMAIN from .models import SCHEMA_VERSION, TABLE_STATES, Base, SchemaChanges from .util import session_scope _LOGGER = logging.getLogger(__name__) -def migrate_schema(instance): - """Check if the schema needs to be upgraded.""" +def get_schema_version(instance): + """Get the schema version.""" with session_scope(session=instance.get_session()) as session: res = ( session.query(SchemaChanges) @@ -34,21 +33,27 @@ def migrate_schema(instance): "No schema version found. Inspected version: %s", current_version ) - if current_version == SCHEMA_VERSION: - return + return current_version + +def schema_is_current(current_version): + """Check if the schema is current.""" + return current_version == SCHEMA_VERSION + + +def migrate_schema(instance, current_version): + """Check if the schema needs to be upgraded.""" + with session_scope(session=instance.get_session()) as session: _LOGGER.warning( "Database is about to upgrade. Schema version: %s", current_version ) + for version in range(current_version, SCHEMA_VERSION): + new_version = version + 1 + _LOGGER.info("Upgrading recorder db schema to version %s", new_version) + _apply_update(instance.engine, new_version, current_version) + session.add(SchemaChanges(schema_version=new_version)) - with instance.hass.timeout.freeze(DOMAIN): - for version in range(current_version, SCHEMA_VERSION): - new_version = version + 1 - _LOGGER.info("Upgrading recorder db schema to version %s", new_version) - _apply_update(instance.engine, new_version, current_version) - session.add(SchemaChanges(schema_version=new_version)) - - _LOGGER.info("Upgrade to version %s done", new_version) + _LOGGER.info("Upgrade to version %s done", new_version) def _create_index(engine, table_name, index_name): diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index ef626a744c4..424070156b0 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -6,7 +6,7 @@ import logging import time from typing import TYPE_CHECKING -from sqlalchemy.exc import OperationalError, SQLAlchemyError +from sqlalchemy.exc import OperationalError from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import distinct @@ -69,8 +69,7 @@ def purge_old_data( return False _LOGGER.warning("Error purging history: %s", err) - except SQLAlchemyError as err: - _LOGGER.warning("Error purging history: %s", err) + return True diff --git a/homeassistant/components/recorder/util.py b/homeassistant/components/recorder/util.py index c17fb33d365..89f74c44f4e 100644 --- a/homeassistant/components/recorder/util.py +++ b/homeassistant/components/recorder/util.py @@ -14,8 +14,13 @@ from sqlalchemy.orm.session import Session from homeassistant.helpers.typing import HomeAssistantType import homeassistant.util.dt as dt_util -from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, SQLITE_URL_PREFIX -from .models import ALL_TABLES, process_timestamp +from .const import DATA_INSTANCE, SQLITE_URL_PREFIX +from .models import ( + ALL_TABLES, + TABLE_RECORDER_RUNS, + TABLE_SCHEMA_CHANGES, + process_timestamp, +) _LOGGER = logging.getLogger(__name__) @@ -117,7 +122,7 @@ def execute(qry, to_native=False, validate_entity_ids=True): time.sleep(QUERY_RETRY_WAIT) -def validate_or_move_away_sqlite_database(dburl: str, db_integrity_check: bool) -> bool: +def validate_or_move_away_sqlite_database(dburl: str) -> bool: """Ensure that the database is valid or move it away.""" dbpath = dburl_to_path(dburl) @@ -125,7 +130,7 @@ def validate_or_move_away_sqlite_database(dburl: str, db_integrity_check: bool) # Database does not exist yet, this is OK return True - if not validate_sqlite_database(dbpath, db_integrity_check): + if not validate_sqlite_database(dbpath): move_away_broken_database(dbpath) return False @@ -161,18 +166,21 @@ def basic_sanity_check(cursor): """Check tables to make sure select does not fail.""" for table in ALL_TABLES: - cursor.execute(f"SELECT * FROM {table} LIMIT 1;") # nosec # not injection + if table in (TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES): + cursor.execute(f"SELECT * FROM {table};") # nosec # not injection + else: + cursor.execute(f"SELECT * FROM {table} LIMIT 1;") # nosec # not injection return True -def validate_sqlite_database(dbpath: str, db_integrity_check: bool) -> bool: +def validate_sqlite_database(dbpath: str) -> bool: """Run a quick check on an sqlite database to see if it is corrupt.""" import sqlite3 # pylint: disable=import-outside-toplevel try: conn = sqlite3.connect(dbpath) - run_checks_on_open_db(dbpath, conn.cursor(), db_integrity_check) + run_checks_on_open_db(dbpath, conn.cursor()) conn.close() except sqlite3.DatabaseError: _LOGGER.exception("The database at %s is corrupt or malformed", dbpath) @@ -181,24 +189,14 @@ def validate_sqlite_database(dbpath: str, db_integrity_check: bool) -> bool: return True -def run_checks_on_open_db(dbpath, cursor, db_integrity_check): +def run_checks_on_open_db(dbpath, cursor): """Run checks that will generate a sqlite3 exception if there is corruption.""" sanity_check_passed = basic_sanity_check(cursor) last_run_was_clean = last_run_was_recently_clean(cursor) if sanity_check_passed and last_run_was_clean: _LOGGER.debug( - "The quick_check will be skipped as the system was restarted cleanly and passed the basic sanity check" - ) - return - - if not db_integrity_check: - # Always warn so when it does fail they remember it has - # been manually disabled - _LOGGER.warning( - "The quick_check on the sqlite3 database at %s was skipped because %s was disabled", - dbpath, - CONF_DB_INTEGRITY_CHECK, + "The system was restarted cleanly and passed the basic sanity check" ) return @@ -214,11 +212,6 @@ def run_checks_on_open_db(dbpath, cursor, db_integrity_check): dbpath, ) - _LOGGER.info( - "A quick_check is being performed on the sqlite3 database at %s", dbpath - ) - cursor.execute("PRAGMA QUICK_CHECK") - def move_away_broken_database(dbfile: str) -> None: """Move away a broken sqlite3 database.""" diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index b3c58995b37..67032e9f077 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -3,13 +3,14 @@ from datetime import datetime, timedelta from unittest.mock import patch -from sqlalchemy.exc import OperationalError +from sqlalchemy.exc import OperationalError, SQLAlchemyError +from homeassistant.components import recorder from homeassistant.components.recorder import ( CONF_DB_URL, CONFIG_SCHEMA, - DATA_INSTANCE, DOMAIN, + KEEPALIVE_TIME, SERVICE_DISABLE, SERVICE_ENABLE, SERVICE_PURGE, @@ -19,15 +20,17 @@ from homeassistant.components.recorder import ( run_information_from_instance, run_information_with_session, ) +from homeassistant.components.recorder.const import DATA_INSTANCE from homeassistant.components.recorder.models import Events, RecorderRuns, States from homeassistant.components.recorder.util import session_scope from homeassistant.const import ( + EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP, MATCH_ALL, STATE_LOCKED, STATE_UNLOCKED, ) -from homeassistant.core import Context, CoreState, callback +from homeassistant.core import Context, CoreState, HomeAssistant, callback from homeassistant.helpers.typing import HomeAssistantType from homeassistant.setup import async_setup_component, setup_component from homeassistant.util import dt as dt_util @@ -41,18 +44,35 @@ from .common import ( from .conftest import SetupRecorderInstanceT from tests.common import ( + async_fire_time_changed, async_init_recorder_component, fire_time_changed, get_test_home_assistant, ) +def _default_recorder(hass): + """Return a recorder with reasonable defaults.""" + return Recorder( + hass, + auto_purge=True, + keep_days=7, + commit_interval=1, + uri="sqlite://", + db_max_retries=10, + db_retry_wait=3, + entity_filter=CONFIG_SCHEMA({DOMAIN: {}}), + exclude_t=[], + ) + + async def test_shutdown_before_startup_finishes(hass): """Test shutdown before recorder starts is clean.""" hass.state = CoreState.not_running await async_init_recorder_component(hass) + await hass.data[DATA_INSTANCE].async_db_ready await hass.async_block_till_done() session = await hass.async_add_executor_job(hass.data[DATA_INSTANCE].get_session) @@ -69,6 +89,31 @@ async def test_shutdown_before_startup_finishes(hass): assert run_info.end is not None +async def test_state_gets_saved_when_set_before_start_event( + hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT +): + """Test we can record an event when starting with not running.""" + + hass.state = CoreState.not_running + + await async_init_recorder_component(hass) + + entity_id = "test.recorder" + state = "restoring_from_db" + attributes = {"test_attr": 5, "test_attr_10": "nice"} + + hass.states.async_set(entity_id, state, attributes) + + hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) + + await async_wait_recording_done_without_instance(hass) + + with session_scope(hass=hass) as session: + db_states = list(session.query(States)) + assert len(db_states) == 1 + assert db_states[0].event_id > 0 + + async def test_saving_state( hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT ): @@ -92,6 +137,58 @@ async def test_saving_state( assert state == _state_empty_context(hass, entity_id) +async def test_saving_many_states( + hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT +): + """Test we expire after many commits.""" + instance = await async_setup_recorder_instance(hass) + + entity_id = "test.recorder" + attributes = {"test_attr": 5, "test_attr_10": "nice"} + + with patch.object( + hass.data[DATA_INSTANCE].event_session, "expire_all" + ) as expire_all, patch.object(recorder, "EXPIRE_AFTER_COMMITS", 2): + for _ in range(3): + hass.states.async_set(entity_id, "on", attributes) + await async_wait_recording_done(hass, instance) + hass.states.async_set(entity_id, "off", attributes) + await async_wait_recording_done(hass, instance) + + assert expire_all.called + + with session_scope(hass=hass) as session: + db_states = list(session.query(States)) + assert len(db_states) == 6 + assert db_states[0].event_id > 0 + + +async def test_saving_state_with_intermixed_time_changes( + hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT +): + """Test saving states with intermixed time changes.""" + instance = await async_setup_recorder_instance(hass) + + entity_id = "test.recorder" + state = "restoring_from_db" + attributes = {"test_attr": 5, "test_attr_10": "nice"} + attributes2 = {"test_attr": 10, "test_attr_10": "mean"} + + for _ in range(KEEPALIVE_TIME + 1): + async_fire_time_changed(hass, dt_util.utcnow()) + hass.states.async_set(entity_id, state, attributes) + for _ in range(KEEPALIVE_TIME + 1): + async_fire_time_changed(hass, dt_util.utcnow()) + hass.states.async_set(entity_id, state, attributes2) + + await async_wait_recording_done(hass, instance) + + with session_scope(hass=hass) as session: + db_states = list(session.query(States)) + assert len(db_states) == 2 + assert db_states[0].event_id > 0 + + def test_saving_state_with_exception(hass, hass_recorder, caplog): """Test saving and restoring a state.""" hass = hass_recorder() @@ -130,6 +227,44 @@ def test_saving_state_with_exception(hass, hass_recorder, caplog): assert "Error saving events" not in caplog.text +def test_saving_state_with_sqlalchemy_exception(hass, hass_recorder, caplog): + """Test saving state when there is an SQLAlchemyError.""" + hass = hass_recorder() + + entity_id = "test.recorder" + state = "restoring_from_db" + attributes = {"test_attr": 5, "test_attr_10": "nice"} + + def _throw_if_state_in_session(*args, **kwargs): + for obj in hass.data[DATA_INSTANCE].event_session: + if isinstance(obj, States): + raise SQLAlchemyError( + "insert the state", "fake params", "forced to fail" + ) + + with patch("time.sleep"), patch.object( + hass.data[DATA_INSTANCE].event_session, + "flush", + side_effect=_throw_if_state_in_session, + ): + hass.states.set(entity_id, "fail", attributes) + wait_recording_done(hass) + + assert "SQLAlchemyError error processing event" in caplog.text + + caplog.clear() + hass.states.set(entity_id, state, attributes) + wait_recording_done(hass) + + with session_scope(hass=hass) as session: + db_states = list(session.query(States)) + assert len(db_states) >= 1 + + assert "Error executing query" not in caplog.text + assert "Error saving events" not in caplog.text + assert "SQLAlchemyError error processing event" not in caplog.text + + def test_saving_event(hass, hass_recorder): """Test saving and restoring an event.""" hass = hass_recorder() @@ -171,6 +306,25 @@ def test_saving_event(hass, hass_recorder): ) +def test_saving_state_with_commit_interval_zero(hass_recorder): + """Test saving a state with a commit interval of zero.""" + hass = hass_recorder({"commit_interval": 0}) + assert hass.data[DATA_INSTANCE].commit_interval == 0 + + entity_id = "test.recorder" + state = "restoring_from_db" + attributes = {"test_attr": 5, "test_attr_10": "nice"} + + hass.states.set(entity_id, state, attributes) + + wait_recording_done(hass) + + with session_scope(hass=hass) as session: + db_states = list(session.query(States)) + assert len(db_states) == 1 + assert db_states[0].event_id > 0 + + def _add_entities(hass, entity_ids): """Add entities.""" attributes = {"test_attr": 5, "test_attr_10": "nice"} @@ -351,26 +505,27 @@ def test_saving_state_and_removing_entity(hass, hass_recorder): assert states[2].state is None -def test_recorder_setup_failure(): +def test_recorder_setup_failure(hass): """Test some exceptions.""" - hass = get_test_home_assistant() - with patch.object(Recorder, "_setup_connection") as setup, patch( "homeassistant.components.recorder.time.sleep" ): setup.side_effect = ImportError("driver not found") - rec = Recorder( - hass, - auto_purge=True, - keep_days=7, - commit_interval=1, - uri="sqlite://", - db_max_retries=10, - db_retry_wait=3, - entity_filter=CONFIG_SCHEMA({DOMAIN: {}}), - exclude_t=[], - db_integrity_check=False, - ) + rec = _default_recorder(hass) + rec.async_initialize() + rec.start() + rec.join() + + hass.stop() + + +def test_recorder_setup_failure_without_event_listener(hass): + """Test recorder setup failure when the event listener is not setup.""" + with patch.object(Recorder, "_setup_connection") as setup, patch( + "homeassistant.components.recorder.time.sleep" + ): + setup.side_effect = ImportError("driver not found") + rec = _default_recorder(hass) rec.start() rec.join() @@ -481,6 +636,7 @@ def test_saving_state_with_serializable_data(hass_recorder, caplog): """Test saving data that cannot be serialized does not crash.""" hass = hass_recorder() + hass.bus.fire("bad_event", {"fail": CannotSerializeMe()}) hass.states.set("test.one", "on", {"fail": CannotSerializeMe()}) wait_recording_done(hass) hass.states.set("test.two", "on", {}) @@ -699,15 +855,20 @@ async def test_database_corruption_while_running(hass, tmpdir, caplog): hass.states.async_set("test.lost", "on", {}) - await async_wait_recording_done_without_instance(hass) - await hass.async_add_executor_job(corrupt_db_file, test_db_file) - await async_wait_recording_done_without_instance(hass) + with patch.object( + hass.data[DATA_INSTANCE].event_session, + "close", + side_effect=OperationalError("statement", {}, []), + ): + await async_wait_recording_done_without_instance(hass) + await hass.async_add_executor_job(corrupt_db_file, test_db_file) + await async_wait_recording_done_without_instance(hass) - # This state will not be recorded because - # the database corruption will be discovered - # and we will have to rollback to recover - hass.states.async_set("test.one", "off", {}) - await async_wait_recording_done_without_instance(hass) + # This state will not be recorded because + # the database corruption will be discovered + # and we will have to rollback to recover + hass.states.async_set("test.one", "off", {}) + await async_wait_recording_done_without_instance(hass) assert "Unrecoverable sqlite3 database corruption detected" in caplog.text assert "The system will rename the corrupt database file" in caplog.text diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index c4e0d32adcf..113598ff6de 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -1,19 +1,41 @@ """The tests for the Recorder component.""" # pylint: disable=protected-access +import datetime +import sqlite3 from unittest.mock import Mock, PropertyMock, call, patch import pytest from sqlalchemy import create_engine -from sqlalchemy.exc import InternalError, OperationalError, ProgrammingError +from sqlalchemy.exc import ( + DatabaseError, + InternalError, + OperationalError, + ProgrammingError, +) from sqlalchemy.pool import StaticPool from homeassistant.bootstrap import async_setup_component -from homeassistant.components.recorder import RecorderRuns, const, migration, models +from homeassistant.components import recorder +from homeassistant.components.recorder import RecorderRuns, migration, models +from homeassistant.components.recorder.const import DATA_INSTANCE +from homeassistant.components.recorder.models import States +from homeassistant.components.recorder.util import session_scope import homeassistant.util.dt as dt_util +from .common import async_wait_recording_done_without_instance + +from tests.common import async_fire_time_changed, async_mock_service from tests.components.recorder import models_original +def _get_native_states(hass, entity_id): + with session_scope(hass=hass) as session: + return [ + state.to_native() + for state in session.query(States).filter(States.entity_id == entity_id) + ] + + def create_engine_test(*args, **kwargs): """Test version of create_engine that initializes with old schema. @@ -26,6 +48,7 @@ def create_engine_test(*args, **kwargs): async def test_schema_update_calls(hass): """Test that schema migrations occur in correct order.""" + await async_setup_component(hass, "persistent_notification", {}) with patch( "homeassistant.components.recorder.create_engine", new=create_engine_test ), patch( @@ -35,16 +58,147 @@ async def test_schema_update_calls(hass): await async_setup_component( hass, "recorder", {"recorder": {"db_url": "sqlite://"}} ) - await hass.async_block_till_done() + await async_wait_recording_done_without_instance(hass) update.assert_has_calls( [ - call(hass.data[const.DATA_INSTANCE].engine, version + 1, 0) + call(hass.data[DATA_INSTANCE].engine, version + 1, 0) for version in range(0, models.SCHEMA_VERSION) ] ) +async def test_database_migration_failed(hass): + """Test we notify if the migration fails.""" + await async_setup_component(hass, "persistent_notification", {}) + create_calls = async_mock_service(hass, "persistent_notification", "create") + dismiss_calls = async_mock_service(hass, "persistent_notification", "dismiss") + + with patch( + "homeassistant.components.recorder.create_engine", new=create_engine_test + ), patch( + "homeassistant.components.recorder.migration._apply_update", + side_effect=ValueError, + ): + await async_setup_component( + hass, "recorder", {"recorder": {"db_url": "sqlite://"}} + ) + hass.states.async_set("my.entity", "on", {}) + hass.states.async_set("my.entity", "off", {}) + await hass.async_block_till_done() + await hass.async_add_executor_job(hass.data[DATA_INSTANCE].join) + await hass.async_block_till_done() + + assert len(create_calls) == 2 + assert len(dismiss_calls) == 1 + + +async def test_database_migration_encounters_corruption(hass): + """Test we move away the database if its corrupt.""" + await async_setup_component(hass, "persistent_notification", {}) + + sqlite3_exception = DatabaseError("statement", {}, []) + sqlite3_exception.__cause__ = sqlite3.DatabaseError() + + with patch( + "homeassistant.components.recorder.migration.schema_is_current", + side_effect=[False, True], + ), patch( + "homeassistant.components.recorder.migration.migrate_schema", + side_effect=sqlite3_exception, + ), patch( + "homeassistant.components.recorder.move_away_broken_database" + ) as move_away: + await async_setup_component( + hass, "recorder", {"recorder": {"db_url": "sqlite://"}} + ) + hass.states.async_set("my.entity", "on", {}) + hass.states.async_set("my.entity", "off", {}) + await async_wait_recording_done_without_instance(hass) + + assert move_away.called + + +async def test_database_migration_encounters_corruption_not_sqlite(hass): + """Test we fail on database error when we cannot recover.""" + await async_setup_component(hass, "persistent_notification", {}) + create_calls = async_mock_service(hass, "persistent_notification", "create") + dismiss_calls = async_mock_service(hass, "persistent_notification", "dismiss") + + with patch( + "homeassistant.components.recorder.migration.schema_is_current", + side_effect=[False, True], + ), patch( + "homeassistant.components.recorder.migration.migrate_schema", + side_effect=DatabaseError("statement", {}, []), + ), patch( + "homeassistant.components.recorder.move_away_broken_database" + ) as move_away: + await async_setup_component( + hass, "recorder", {"recorder": {"db_url": "sqlite://"}} + ) + hass.states.async_set("my.entity", "on", {}) + hass.states.async_set("my.entity", "off", {}) + await hass.async_block_till_done() + await hass.async_add_executor_job(hass.data[DATA_INSTANCE].join) + await hass.async_block_till_done() + + assert not move_away.called + assert len(create_calls) == 2 + assert len(dismiss_calls) == 1 + + +async def test_events_during_migration_are_queued(hass): + """Test that events during migration are queued.""" + + await async_setup_component(hass, "persistent_notification", {}) + with patch( + "homeassistant.components.recorder.create_engine", new=create_engine_test + ): + await async_setup_component( + hass, "recorder", {"recorder": {"db_url": "sqlite://"}} + ) + hass.states.async_set("my.entity", "on", {}) + hass.states.async_set("my.entity", "off", {}) + await hass.async_block_till_done() + async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=2)) + await hass.async_block_till_done() + async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=4)) + await hass.data[DATA_INSTANCE].async_recorder_ready.wait() + await async_wait_recording_done_without_instance(hass) + + db_states = await hass.async_add_executor_job(_get_native_states, hass, "my.entity") + assert len(db_states) == 2 + + +async def test_events_during_migration_queue_exhausted(hass): + """Test that events during migration takes so long the queue is exhausted.""" + await async_setup_component(hass, "persistent_notification", {}) + + with patch( + "homeassistant.components.recorder.create_engine", new=create_engine_test + ), patch.object(recorder, "MAX_QUEUE_BACKLOG", 1): + await async_setup_component( + hass, "recorder", {"recorder": {"db_url": "sqlite://"}} + ) + hass.states.async_set("my.entity", "on", {}) + await hass.async_block_till_done() + async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=2)) + await hass.async_block_till_done() + async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=4)) + await hass.async_block_till_done() + hass.states.async_set("my.entity", "off", {}) + await hass.data[DATA_INSTANCE].async_recorder_ready.wait() + await async_wait_recording_done_without_instance(hass) + + db_states = await hass.async_add_executor_job(_get_native_states, hass, "my.entity") + assert len(db_states) == 1 + hass.states.async_set("my.entity", "on", {}) + await async_wait_recording_done_without_instance(hass) + db_states = await hass.async_add_executor_job(_get_native_states, hass, "my.entity") + assert len(db_states) == 2 + + async def test_schema_migrate(hass): """Test the full schema migration logic. @@ -53,6 +207,8 @@ async def test_schema_migrate(hass): inspection could quickly become quite cumbersome. """ + await async_setup_component(hass, "persistent_notification", {}) + def _mock_setup_run(self): self.run_info = RecorderRuns( start=self.recording_start, created=dt_util.utcnow() diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index f2fa9bf6400..b97873df62e 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -1,7 +1,10 @@ """Test data purging.""" from datetime import datetime, timedelta import json +import sqlite3 +from unittest.mock import patch +from sqlalchemy.exc import DatabaseError from sqlalchemy.orm.session import Session from homeassistant.components import recorder @@ -16,6 +19,7 @@ from .common import ( async_recorder_block_till_done, async_wait_purge_done, async_wait_recording_done, + async_wait_recording_done_without_instance, ) from .conftest import SetupRecorderInstanceT @@ -52,6 +56,38 @@ async def test_purge_old_states( assert states.count() == 2 +async def test_purge_old_states_encouters_database_corruption( + hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT +): + """Test database image image is malformed while deleting old states.""" + instance = await async_setup_recorder_instance(hass) + + await _add_test_states(hass, instance) + await async_wait_recording_done_without_instance(hass) + + sqlite3_exception = DatabaseError("statement", {}, []) + sqlite3_exception.__cause__ = sqlite3.DatabaseError() + + with patch( + "homeassistant.components.recorder.move_away_broken_database" + ) as move_away, patch( + "homeassistant.components.recorder.purge.purge_old_data", + side_effect=sqlite3_exception, + ): + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, {"keep_days": 0} + ) + await hass.async_block_till_done() + await async_wait_recording_done_without_instance(hass) + + assert move_away.called + + # Ensure the whole database was reset due to the database error + with session_scope(hass=hass) as session: + states_after_purge = session.query(States) + assert states_after_purge.count() == 0 + + async def test_purge_old_events( hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT ): diff --git a/tests/components/recorder/test_util.py b/tests/components/recorder/test_util.py index c814570416c..4da635209b3 100644 --- a/tests/components/recorder/test_util.py +++ b/tests/components/recorder/test_util.py @@ -74,75 +74,28 @@ def test_recorder_bad_execute(hass_recorder): assert e_mock.call_count == 2 -def test_validate_or_move_away_sqlite_database_with_integrity_check( - hass, tmpdir, caplog -): - """Ensure a malformed sqlite database is moved away. - - A quick_check is run here - """ - - db_integrity_check = True +def test_validate_or_move_away_sqlite_database(hass, tmpdir, caplog): + """Ensure a malformed sqlite database is moved away.""" test_dir = tmpdir.mkdir("test_validate_or_move_away_sqlite_database") test_db_file = f"{test_dir}/broken.db" dburl = f"{SQLITE_URL_PREFIX}{test_db_file}" - assert util.validate_sqlite_database(test_db_file, db_integrity_check) is False + assert util.validate_sqlite_database(test_db_file) is False assert os.path.exists(test_db_file) is True - assert ( - util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is False - ) + assert util.validate_or_move_away_sqlite_database(dburl) is False corrupt_db_file(test_db_file) - assert util.validate_sqlite_database(dburl, db_integrity_check) is False + assert util.validate_sqlite_database(dburl) is False - assert ( - util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is False - ) + assert util.validate_or_move_away_sqlite_database(dburl) is False assert "corrupt or malformed" in caplog.text - assert util.validate_sqlite_database(dburl, db_integrity_check) is False + assert util.validate_sqlite_database(dburl) is False - assert util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is True - - -def test_validate_or_move_away_sqlite_database_without_integrity_check( - hass, tmpdir, caplog -): - """Ensure a malformed sqlite database is moved away. - - The quick_check is skipped, but we can still find - corruption if the whole database is unreadable - """ - - db_integrity_check = False - - test_dir = tmpdir.mkdir("test_validate_or_move_away_sqlite_database") - test_db_file = f"{test_dir}/broken.db" - dburl = f"{SQLITE_URL_PREFIX}{test_db_file}" - - assert util.validate_sqlite_database(test_db_file, db_integrity_check) is False - assert os.path.exists(test_db_file) is True - assert ( - util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is False - ) - - corrupt_db_file(test_db_file) - - assert util.validate_sqlite_database(dburl, db_integrity_check) is False - - assert ( - util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is False - ) - - assert "corrupt or malformed" in caplog.text - - assert util.validate_sqlite_database(dburl, db_integrity_check) is False - - assert util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is True + assert util.validate_or_move_away_sqlite_database(dburl) is True async def test_last_run_was_recently_clean(hass): @@ -197,12 +150,10 @@ def test_combined_checks(hass_recorder, caplog): cursor = hass.data[DATA_INSTANCE].engine.raw_connection().cursor() - assert util.run_checks_on_open_db("fake_db_path", cursor, False) is None - assert "skipped because db_integrity_check was disabled" in caplog.text + assert util.run_checks_on_open_db("fake_db_path", cursor) is None + assert "could not validate that the sqlite3 database" in caplog.text caplog.clear() - assert util.run_checks_on_open_db("fake_db_path", cursor, True) is None - assert "could not validate that the sqlite3 database" in caplog.text # We are patching recorder.util here in order # to avoid creating the full database on disk @@ -210,50 +161,36 @@ def test_combined_checks(hass_recorder, caplog): "homeassistant.components.recorder.util.basic_sanity_check", return_value=False ): caplog.clear() - assert util.run_checks_on_open_db("fake_db_path", cursor, False) is None - assert "skipped because db_integrity_check was disabled" in caplog.text - - caplog.clear() - assert util.run_checks_on_open_db("fake_db_path", cursor, True) is None + assert util.run_checks_on_open_db("fake_db_path", cursor) is None assert "could not validate that the sqlite3 database" in caplog.text # We are patching recorder.util here in order # to avoid creating the full database on disk with patch("homeassistant.components.recorder.util.last_run_was_recently_clean"): caplog.clear() - assert util.run_checks_on_open_db("fake_db_path", cursor, False) is None - assert ( - "system was restarted cleanly and passed the basic sanity check" - in caplog.text - ) - - caplog.clear() - assert util.run_checks_on_open_db("fake_db_path", cursor, True) is None - assert ( - "system was restarted cleanly and passed the basic sanity check" - in caplog.text - ) + assert util.run_checks_on_open_db("fake_db_path", cursor) is None + assert "restarted cleanly and passed the basic sanity check" in caplog.text caplog.clear() with patch( "homeassistant.components.recorder.util.last_run_was_recently_clean", side_effect=sqlite3.DatabaseError, ), pytest.raises(sqlite3.DatabaseError): - util.run_checks_on_open_db("fake_db_path", cursor, False) + util.run_checks_on_open_db("fake_db_path", cursor) caplog.clear() with patch( "homeassistant.components.recorder.util.last_run_was_recently_clean", side_effect=sqlite3.DatabaseError, ), pytest.raises(sqlite3.DatabaseError): - util.run_checks_on_open_db("fake_db_path", cursor, True) + util.run_checks_on_open_db("fake_db_path", cursor) cursor.execute("DROP TABLE events;") caplog.clear() with pytest.raises(sqlite3.DatabaseError): - util.run_checks_on_open_db("fake_db_path", cursor, False) + util.run_checks_on_open_db("fake_db_path", cursor) caplog.clear() with pytest.raises(sqlite3.DatabaseError): - util.run_checks_on_open_db("fake_db_path", cursor, True) + util.run_checks_on_open_db("fake_db_path", cursor)