Live db migrations and recovery (#49036)

Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>
pull/49119/head
J. Nick Koston 2021-04-11 20:43:54 -10:00 committed by GitHub
parent 2d5edeb1ef
commit 9368891b1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 684 additions and 312 deletions

View File

@ -3,7 +3,7 @@ from __future__ import annotations
import asyncio
import concurrent.futures
from datetime import datetime
from datetime import datetime, timedelta
import logging
import queue
import sqlite3
@ -12,6 +12,7 @@ import time
from typing import Any, Callable, NamedTuple
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import StaticPool
import voluptuous as vol
@ -20,7 +21,7 @@ from homeassistant.components import persistent_notification
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_EXCLUDE,
EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STARTED,
EVENT_HOMEASSISTANT_STOP,
EVENT_STATE_CHANGED,
EVENT_TIME_CHANGED,
@ -33,6 +34,7 @@ from homeassistant.helpers.entityfilter import (
INCLUDE_EXCLUDE_FILTER_SCHEMA_INNER,
convert_include_exclude_filter,
)
from homeassistant.helpers.event import async_track_time_interval, track_time_change
from homeassistant.helpers.typing import ConfigType
import homeassistant.util.dt as dt_util
@ -56,6 +58,8 @@ ATTR_KEEP_DAYS = "keep_days"
ATTR_REPACK = "repack"
ATTR_APPLY_FILTER = "apply_filter"
MAX_QUEUE_BACKLOG = 30000
SERVICE_PURGE_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_KEEP_DAYS): cv.positive_int,
@ -99,6 +103,7 @@ CONFIG_SCHEMA = vol.Schema(
{
vol.Optional(DOMAIN, default=dict): vol.All(
cv.deprecated(CONF_PURGE_INTERVAL),
cv.deprecated(CONF_DB_INTEGRITY_CHECK),
FILTER_SCHEMA.extend(
{
vol.Optional(CONF_AUTO_PURGE, default=True): cv.boolean,
@ -176,11 +181,9 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
commit_interval = conf[CONF_COMMIT_INTERVAL]
db_max_retries = conf[CONF_DB_MAX_RETRIES]
db_retry_wait = conf[CONF_DB_RETRY_WAIT]
db_integrity_check = conf[CONF_DB_INTEGRITY_CHECK]
db_url = conf.get(CONF_DB_URL)
if not db_url:
db_url = DEFAULT_URL.format(hass_config_path=hass.config.path(DEFAULT_DB_FILE))
db_url = conf.get(CONF_DB_URL) or DEFAULT_URL.format(
hass_config_path=hass.config.path(DEFAULT_DB_FILE)
)
exclude = conf[CONF_EXCLUDE]
exclude_t = exclude.get(CONF_EVENT_TYPES, [])
instance = hass.data[DATA_INSTANCE] = Recorder(
@ -193,10 +196,17 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
db_retry_wait=db_retry_wait,
entity_filter=entity_filter,
exclude_t=exclude_t,
db_integrity_check=db_integrity_check,
)
instance.async_initialize()
instance.start()
_async_register_services(hass, instance)
return await instance.async_db_ready
@callback
def _async_register_services(hass, instance):
"""Register recorder services."""
async def async_handle_purge_service(service):
"""Handle calls to the purge service."""
@ -223,8 +233,6 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
schema=SERVICE_DISABLE_SCHEMA,
)
return await instance.async_db_ready
class PurgeTask(NamedTuple):
"""Object to store information about purge task."""
@ -252,7 +260,6 @@ class Recorder(threading.Thread):
db_retry_wait: int,
entity_filter: Callable[[str], bool],
exclude_t: list[str],
db_integrity_check: bool,
) -> None:
"""Initialize the recorder."""
threading.Thread.__init__(self, name="Recorder")
@ -266,8 +273,8 @@ class Recorder(threading.Thread):
self.db_url = uri
self.db_max_retries = db_max_retries
self.db_retry_wait = db_retry_wait
self.db_integrity_check = db_integrity_check
self.async_db_ready = asyncio.Future()
self.async_recorder_ready = asyncio.Event()
self._queue_watch = threading.Event()
self.engine: Any = None
self.run_info: Any = None
@ -283,6 +290,9 @@ class Recorder(threading.Thread):
self.event_session = None
self.get_session = None
self._completed_database_setup = None
self._event_listener = None
self._queue_watcher = None
self.enabled = True
@ -293,9 +303,37 @@ class Recorder(threading.Thread):
@callback
def async_initialize(self):
"""Initialize the recorder."""
self.hass.bus.async_listen(
self._event_listener = self.hass.bus.async_listen(
MATCH_ALL, self.event_listener, event_filter=self._async_event_filter
)
self._queue_watcher = async_track_time_interval(
self.hass, self._async_check_queue, timedelta(minutes=10)
)
@callback
def _async_check_queue(self, *_):
"""Periodic check of the queue size to ensure we do not exaust memory.
The queue grows during migraton or if something really goes wrong.
"""
size = self.queue.qsize()
_LOGGER.debug("Recorder queue size is: %s", size)
if self.queue.qsize() <= MAX_QUEUE_BACKLOG:
return
_LOGGER.error(
"The recorder queue reached the maximum size of %s; Events are no longer being recorded",
MAX_QUEUE_BACKLOG,
)
self._stop_queue_watcher_and_event_listener()
def _stop_queue_watcher_and_event_listener(self):
"""Stop watching the queue."""
if self._queue_watcher:
self._queue_watcher()
self._queue_watcher = None
if self._event_listener:
self._event_listener()
self._event_listener = None
@callback
def _async_event_filter(self, event):
@ -314,89 +352,152 @@ class Recorder(threading.Thread):
self.queue.put(PurgeTask(keep_days, repack, apply_filter))
def run(self):
"""Start processing events to save."""
@callback
def async_register(self, shutdown_task, hass_started):
"""Post connection initialize."""
if not self._setup_recorder():
def shutdown(event):
"""Shut down the Recorder."""
if not hass_started.done():
hass_started.set_result(shutdown_task)
self.queue.put(None)
self.join()
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
if self.hass.state == CoreState.running:
hass_started.set_result(None)
return
@callback
def async_hass_started(event):
"""Notify that hass has started."""
hass_started.set_result(None)
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, async_hass_started)
@callback
def async_connection_failed(self):
"""Connect failed tasks."""
self.async_db_ready.set_result(False)
persistent_notification.async_create(
self.hass,
"The recorder could not start, check [the logs](/config/logs)",
"Recorder",
)
self._stop_queue_watcher_and_event_listener()
@callback
def async_connection_success(self):
"""Connect success tasks."""
self.async_db_ready.set_result(True)
@callback
def _async_recorder_ready(self):
"""Mark recorder ready."""
self.async_recorder_ready.set()
@callback
def async_purge(self, now):
"""Trigger the purge."""
self.queue.put(PurgeTask(self.keep_days, repack=False, apply_filter=False))
def run(self):
"""Start processing events to save."""
shutdown_task = object()
hass_started = concurrent.futures.Future()
@callback
def register():
"""Post connection initialize."""
self.async_db_ready.set_result(True)
self.hass.add_job(self.async_register, shutdown_task, hass_started)
def shutdown(event):
"""Shut down the Recorder."""
if not hass_started.done():
hass_started.set_result(shutdown_task)
self.queue.put(None)
self.join()
current_version = self._setup_recorder()
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
if current_version is None:
self.hass.add_job(self.async_connection_failed)
return
if self.hass.state == CoreState.running:
hass_started.set_result(None)
else:
schema_is_current = migration.schema_is_current(current_version)
if schema_is_current:
self._setup_run()
@callback
def notify_hass_started(event):
"""Notify that hass has started."""
hass_started.set_result(None)
self.hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_START, notify_hass_started
)
self.hass.add_job(register)
result = hass_started.result()
self.hass.add_job(self.async_connection_success)
# If shutdown happened before Home Assistant finished starting
if result is shutdown_task:
if hass_started.result() is shutdown_task:
# Make sure we cleanly close the run if
# we restart before startup finishes
self._shutdown()
return
# Start periodic purge
if self.auto_purge:
@callback
def async_purge(now):
"""Trigger the purge."""
self.queue.put(
PurgeTask(self.keep_days, repack=False, apply_filter=False)
# We wait to start the migration until startup has finished
# since it can be cpu intensive and we do not want it to compete
# with startup which is also cpu intensive
if not schema_is_current:
if self._migrate_schema_and_setup_run(current_version):
if not self._event_listener:
# If the schema migration takes so longer that the end
# queue watcher safety kicks in because MAX_QUEUE_BACKLOG
# is reached, we need to reinitialize the listener.
self.hass.add_job(self.async_initialize)
else:
persistent_notification.create(
self.hass,
"The database migration failed, check [the logs](/config/logs)."
"Database Migration Failed",
"recorder_database_migration",
)
# Purge every night at 4:12am
self.hass.helpers.event.track_time_change(
async_purge, hour=4, minute=12, second=0
)
_LOGGER.debug("Recorder processing the queue")
# Use a session for the event read loop
# with a commit every time the event time
# has changed. This reduces the disk io.
while True:
event = self.queue.get()
if event is None:
self._shutdown()
return
self._process_one_event(event)
# Start periodic purge
if self.auto_purge:
# Purge every night at 4:12am
track_time_change(self.hass, self.async_purge, hour=4, minute=12, second=0)
def _setup_recorder(self) -> bool:
"""Create schema and connect to the database."""
_LOGGER.debug("Recorder processing the queue")
self.hass.add_job(self._async_recorder_ready)
self._run_event_loop()
def _run_event_loop(self):
"""Run the event loop for the recorder."""
# Use a session for the event read loop
# with a commit every time the event time
# has changed. This reduces the disk io.
while event := self.queue.get():
try:
self._process_one_event_or_recover(event)
except Exception as err: # pylint: disable=broad-except
_LOGGER.exception("Error while processing event %s: %s", event, err)
self._shutdown()
def _process_one_event_or_recover(self, event):
"""Process an event, reconnect, or recover a malformed database."""
try:
self._process_one_event(event)
return
except exc.DatabaseError as err:
if self._handle_database_error(err):
return
_LOGGER.exception(
"Unhandled database error while processing event %s: %s", event, err
)
except SQLAlchemyError as err:
_LOGGER.exception(
"SQLAlchemyError error processing event %s: %s", event, err
)
# Reset the session if an SQLAlchemyError (including DatabaseError)
# happens to rollback and recover
self._reopen_event_session()
def _setup_recorder(self) -> None | int:
"""Create connect to the database and get the schema version."""
tries = 1
while tries <= self.db_max_retries:
try:
self._setup_connection()
migration.migrate_schema(self)
self._setup_run()
return migration.get_schema_version(self)
except Exception as err: # pylint: disable=broad-except
_LOGGER.exception(
"Error during connection setup to %s: %s (retrying in %s seconds)",
@ -404,37 +505,47 @@ class Recorder(threading.Thread):
err,
self.db_retry_wait,
)
else:
_LOGGER.debug("Connected to recorder database")
self._open_event_session()
return True
tries += 1
time.sleep(self.db_retry_wait)
@callback
def connection_failed():
"""Connect failed tasks."""
self.async_db_ready.set_result(False)
persistent_notification.async_create(
self.hass,
"The recorder could not start, please check the log",
"Recorder",
)
return None
self.hass.add_job(connection_failed)
return False
def _migrate_schema_and_setup_run(self, current_version) -> bool:
"""Migrate schema to the latest version."""
persistent_notification.create(
self.hass,
"System performance will temporarily degrade during the database upgrade. Do not power down or restart the system until the upgrade completes. Integrations that read the database, such as logbook and history, may return inconsistent results until the upgrade completes.",
"Database upgrade in progress",
"recorder_database_migration",
)
try:
migration.migrate_schema(self, current_version)
except exc.DatabaseError as err:
if self._handle_database_error(err):
return True
_LOGGER.exception("Database error during schema migration")
return False
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Error during schema migration")
return False
else:
self._setup_run()
return True
finally:
persistent_notification.dismiss(self.hass, "recorder_database_migration")
def _run_purge(self, keep_days, repack, apply_filter):
"""Purge the database."""
if purge.purge_old_data(self, keep_days, repack, apply_filter):
return
# Schedule a new purge task if this one didn't finish
self.queue.put(PurgeTask(keep_days, repack, apply_filter))
def _process_one_event(self, event):
"""Process one event."""
if isinstance(event, PurgeTask):
# Schedule a new purge task if this one didn't finish
if not purge.purge_old_data(
self, event.keep_days, event.repack, event.apply_filter
):
self.queue.put(
PurgeTask(event.keep_days, event.repack, event.apply_filter)
)
self._run_purge(event.keep_days, event.repack, event.apply_filter)
return
if isinstance(event, WaitTask):
self._queue_watch.set()
@ -448,7 +559,7 @@ class Recorder(threading.Thread):
self._timechanges_seen += 1
if self._timechanges_seen >= self.commit_interval:
self._timechanges_seen = 0
self._commit_event_session_or_recover()
self._commit_event_session_or_retry()
return
if not self.enabled:
@ -464,10 +575,6 @@ class Recorder(threading.Thread):
except (TypeError, ValueError):
_LOGGER.warning("Event is not JSON serializable: %s", event)
return
except Exception as err: # pylint: disable=broad-except
# Must catch the exception to prevent the loop from collapsing
_LOGGER.exception("Error adding event: %s", err)
return
if event.event_type == EVENT_STATE_CHANGED:
try:
@ -492,34 +599,21 @@ class Recorder(threading.Thread):
"State is not JSON serializable: %s",
event.data.get("new_state"),
)
except Exception as err: # pylint: disable=broad-except
# Must catch the exception to prevent the loop from collapsing
_LOGGER.exception("Error adding state change: %s", err)
# If they do not have a commit interval
# than we commit right away
if not self.commit_interval:
self._commit_event_session_or_recover()
def _commit_event_session_or_recover(self):
"""Commit changes to the database and recover if the database fails when possible."""
try:
self._commit_event_session_or_retry()
return
except exc.DatabaseError as err:
if isinstance(err.__cause__, sqlite3.DatabaseError):
_LOGGER.exception(
"Unrecoverable sqlite3 database corruption detected: %s", err
)
self._handle_sqlite_corruption()
return
_LOGGER.exception("Unexpected error saving events: %s", err)
except Exception as err: # pylint: disable=broad-except
# Must catch the exception to prevent the loop from collapsing
_LOGGER.exception("Unexpected error saving events: %s", err)
self._reopen_event_session()
return
def _handle_database_error(self, err):
"""Handle a database error that may result in moving away the corrupt db."""
if isinstance(err.__cause__, sqlite3.DatabaseError):
_LOGGER.exception(
"Unrecoverable sqlite3 database corruption detected: %s", err
)
self._handle_sqlite_corruption()
return True
return False
def _commit_event_session_or_retry(self):
tries = 1
@ -566,44 +660,41 @@ class Recorder(threading.Thread):
def _handle_sqlite_corruption(self):
"""Handle the sqlite3 database being corrupt."""
self._close_event_session()
self._close_connection()
move_away_broken_database(dburl_to_path(self.db_url))
self._setup_recorder()
self._setup_run()
def _reopen_event_session(self):
"""Rollback the event session and reopen it after a failure."""
def _close_event_session(self):
"""Close the event session."""
self._old_states = {}
if not self.event_session:
return
try:
self.event_session.rollback()
self.event_session.close()
except Exception as err: # pylint: disable=broad-except
# Must catch the exception to prevent the loop from collapsing
except SQLAlchemyError as err:
_LOGGER.exception(
"Error while rolling back and closing the event session: %s", err
)
def _reopen_event_session(self):
"""Rollback the event session and reopen it after a failure."""
self._close_event_session()
self._open_event_session()
def _open_event_session(self):
"""Open the event session."""
try:
self.event_session = self.get_session()
self.event_session.expire_on_commit = False
except Exception as err: # pylint: disable=broad-except
_LOGGER.exception("Error while creating new event session: %s", err)
self.event_session = self.get_session()
self.event_session.expire_on_commit = False
def _send_keep_alive(self):
try:
_LOGGER.debug("Sending keepalive")
self.event_session.connection().scalar(select([1]))
return
except Exception as err: # pylint: disable=broad-except
_LOGGER.error(
"Error in database connectivity during keepalive: %s",
err,
)
self._reopen_event_session()
"""Send a keep alive to keep the db connection open."""
_LOGGER.debug("Sending keepalive")
self.event_session.connection().scalar(select([1]))
@callback
def event_listener(self, event):
@ -663,20 +754,7 @@ class Recorder(threading.Thread):
kwargs["echo"] = False
if self._using_file_sqlite:
with self.hass.timeout.freeze(DOMAIN):
#
# Here we run an sqlite3 quick_check. In the majority
# of cases, the quick_check takes under 10 seconds.
#
# On systems with very large databases and
# very slow disk or cpus, this can take a while.
#
validate_or_move_away_sqlite_database(
self.db_url, self.db_integrity_check
)
if self.engine is not None:
self.engine.dispose()
validate_or_move_away_sqlite_database(self.db_url)
self.engine = create_engine(self.db_url, **kwargs)
@ -684,6 +762,7 @@ class Recorder(threading.Thread):
Base.metadata.create_all(self.engine)
self.get_session = scoped_session(sessionmaker(bind=self.engine))
_LOGGER.debug("Connected to recorder database")
@property
def _using_file_sqlite(self):
@ -716,18 +795,24 @@ class Recorder(threading.Thread):
session.flush()
session.expunge(self.run_info)
def _shutdown(self):
"""Save end time for current run."""
if self.event_session is not None:
self._open_event_session()
def _end_session(self):
"""End the recorder session."""
if self.event_session is None:
return
try:
self.run_info.end = dt_util.utcnow()
self.event_session.add(self.run_info)
try:
self._commit_event_session_or_retry()
self.event_session.close()
except Exception as err: # pylint: disable=broad-except
_LOGGER.exception(
"Error saving the event session during shutdown: %s", err
)
self._commit_event_session_or_retry()
self.event_session.close()
except Exception as err: # pylint: disable=broad-except
_LOGGER.exception("Error saving the event session during shutdown: %s", err)
self.run_info = None
def _shutdown(self):
"""Save end time for current run."""
self._stop_queue_watcher_and_event_listener()
self._end_session()
self._close_connection()

View File

@ -11,15 +11,14 @@ from sqlalchemy.exc import (
)
from sqlalchemy.schema import AddConstraint, DropConstraint
from .const import DOMAIN
from .models import SCHEMA_VERSION, TABLE_STATES, Base, SchemaChanges
from .util import session_scope
_LOGGER = logging.getLogger(__name__)
def migrate_schema(instance):
"""Check if the schema needs to be upgraded."""
def get_schema_version(instance):
"""Get the schema version."""
with session_scope(session=instance.get_session()) as session:
res = (
session.query(SchemaChanges)
@ -34,21 +33,27 @@ def migrate_schema(instance):
"No schema version found. Inspected version: %s", current_version
)
if current_version == SCHEMA_VERSION:
return
return current_version
def schema_is_current(current_version):
"""Check if the schema is current."""
return current_version == SCHEMA_VERSION
def migrate_schema(instance, current_version):
"""Check if the schema needs to be upgraded."""
with session_scope(session=instance.get_session()) as session:
_LOGGER.warning(
"Database is about to upgrade. Schema version: %s", current_version
)
for version in range(current_version, SCHEMA_VERSION):
new_version = version + 1
_LOGGER.info("Upgrading recorder db schema to version %s", new_version)
_apply_update(instance.engine, new_version, current_version)
session.add(SchemaChanges(schema_version=new_version))
with instance.hass.timeout.freeze(DOMAIN):
for version in range(current_version, SCHEMA_VERSION):
new_version = version + 1
_LOGGER.info("Upgrading recorder db schema to version %s", new_version)
_apply_update(instance.engine, new_version, current_version)
session.add(SchemaChanges(schema_version=new_version))
_LOGGER.info("Upgrade to version %s done", new_version)
_LOGGER.info("Upgrade to version %s done", new_version)
def _create_index(engine, table_name, index_name):

View File

@ -6,7 +6,7 @@ import logging
import time
from typing import TYPE_CHECKING
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import distinct
@ -69,8 +69,7 @@ def purge_old_data(
return False
_LOGGER.warning("Error purging history: %s", err)
except SQLAlchemyError as err:
_LOGGER.warning("Error purging history: %s", err)
return True

View File

@ -14,8 +14,13 @@ from sqlalchemy.orm.session import Session
from homeassistant.helpers.typing import HomeAssistantType
import homeassistant.util.dt as dt_util
from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, SQLITE_URL_PREFIX
from .models import ALL_TABLES, process_timestamp
from .const import DATA_INSTANCE, SQLITE_URL_PREFIX
from .models import (
ALL_TABLES,
TABLE_RECORDER_RUNS,
TABLE_SCHEMA_CHANGES,
process_timestamp,
)
_LOGGER = logging.getLogger(__name__)
@ -117,7 +122,7 @@ def execute(qry, to_native=False, validate_entity_ids=True):
time.sleep(QUERY_RETRY_WAIT)
def validate_or_move_away_sqlite_database(dburl: str, db_integrity_check: bool) -> bool:
def validate_or_move_away_sqlite_database(dburl: str) -> bool:
"""Ensure that the database is valid or move it away."""
dbpath = dburl_to_path(dburl)
@ -125,7 +130,7 @@ def validate_or_move_away_sqlite_database(dburl: str, db_integrity_check: bool)
# Database does not exist yet, this is OK
return True
if not validate_sqlite_database(dbpath, db_integrity_check):
if not validate_sqlite_database(dbpath):
move_away_broken_database(dbpath)
return False
@ -161,18 +166,21 @@ def basic_sanity_check(cursor):
"""Check tables to make sure select does not fail."""
for table in ALL_TABLES:
cursor.execute(f"SELECT * FROM {table} LIMIT 1;") # nosec # not injection
if table in (TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES):
cursor.execute(f"SELECT * FROM {table};") # nosec # not injection
else:
cursor.execute(f"SELECT * FROM {table} LIMIT 1;") # nosec # not injection
return True
def validate_sqlite_database(dbpath: str, db_integrity_check: bool) -> bool:
def validate_sqlite_database(dbpath: str) -> bool:
"""Run a quick check on an sqlite database to see if it is corrupt."""
import sqlite3 # pylint: disable=import-outside-toplevel
try:
conn = sqlite3.connect(dbpath)
run_checks_on_open_db(dbpath, conn.cursor(), db_integrity_check)
run_checks_on_open_db(dbpath, conn.cursor())
conn.close()
except sqlite3.DatabaseError:
_LOGGER.exception("The database at %s is corrupt or malformed", dbpath)
@ -181,24 +189,14 @@ def validate_sqlite_database(dbpath: str, db_integrity_check: bool) -> bool:
return True
def run_checks_on_open_db(dbpath, cursor, db_integrity_check):
def run_checks_on_open_db(dbpath, cursor):
"""Run checks that will generate a sqlite3 exception if there is corruption."""
sanity_check_passed = basic_sanity_check(cursor)
last_run_was_clean = last_run_was_recently_clean(cursor)
if sanity_check_passed and last_run_was_clean:
_LOGGER.debug(
"The quick_check will be skipped as the system was restarted cleanly and passed the basic sanity check"
)
return
if not db_integrity_check:
# Always warn so when it does fail they remember it has
# been manually disabled
_LOGGER.warning(
"The quick_check on the sqlite3 database at %s was skipped because %s was disabled",
dbpath,
CONF_DB_INTEGRITY_CHECK,
"The system was restarted cleanly and passed the basic sanity check"
)
return
@ -214,11 +212,6 @@ def run_checks_on_open_db(dbpath, cursor, db_integrity_check):
dbpath,
)
_LOGGER.info(
"A quick_check is being performed on the sqlite3 database at %s", dbpath
)
cursor.execute("PRAGMA QUICK_CHECK")
def move_away_broken_database(dbfile: str) -> None:
"""Move away a broken sqlite3 database."""

View File

@ -3,13 +3,14 @@
from datetime import datetime, timedelta
from unittest.mock import patch
from sqlalchemy.exc import OperationalError
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from homeassistant.components import recorder
from homeassistant.components.recorder import (
CONF_DB_URL,
CONFIG_SCHEMA,
DATA_INSTANCE,
DOMAIN,
KEEPALIVE_TIME,
SERVICE_DISABLE,
SERVICE_ENABLE,
SERVICE_PURGE,
@ -19,15 +20,17 @@ from homeassistant.components.recorder import (
run_information_from_instance,
run_information_with_session,
)
from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.components.recorder.models import Events, RecorderRuns, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import (
EVENT_HOMEASSISTANT_STARTED,
EVENT_HOMEASSISTANT_STOP,
MATCH_ALL,
STATE_LOCKED,
STATE_UNLOCKED,
)
from homeassistant.core import Context, CoreState, callback
from homeassistant.core import Context, CoreState, HomeAssistant, callback
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.setup import async_setup_component, setup_component
from homeassistant.util import dt as dt_util
@ -41,18 +44,35 @@ from .common import (
from .conftest import SetupRecorderInstanceT
from tests.common import (
async_fire_time_changed,
async_init_recorder_component,
fire_time_changed,
get_test_home_assistant,
)
def _default_recorder(hass):
"""Return a recorder with reasonable defaults."""
return Recorder(
hass,
auto_purge=True,
keep_days=7,
commit_interval=1,
uri="sqlite://",
db_max_retries=10,
db_retry_wait=3,
entity_filter=CONFIG_SCHEMA({DOMAIN: {}}),
exclude_t=[],
)
async def test_shutdown_before_startup_finishes(hass):
"""Test shutdown before recorder starts is clean."""
hass.state = CoreState.not_running
await async_init_recorder_component(hass)
await hass.data[DATA_INSTANCE].async_db_ready
await hass.async_block_till_done()
session = await hass.async_add_executor_job(hass.data[DATA_INSTANCE].get_session)
@ -69,6 +89,31 @@ async def test_shutdown_before_startup_finishes(hass):
assert run_info.end is not None
async def test_state_gets_saved_when_set_before_start_event(
hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT
):
"""Test we can record an event when starting with not running."""
hass.state = CoreState.not_running
await async_init_recorder_component(hass)
entity_id = "test.recorder"
state = "restoring_from_db"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
hass.states.async_set(entity_id, state, attributes)
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await async_wait_recording_done_without_instance(hass)
with session_scope(hass=hass) as session:
db_states = list(session.query(States))
assert len(db_states) == 1
assert db_states[0].event_id > 0
async def test_saving_state(
hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT
):
@ -92,6 +137,58 @@ async def test_saving_state(
assert state == _state_empty_context(hass, entity_id)
async def test_saving_many_states(
hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT
):
"""Test we expire after many commits."""
instance = await async_setup_recorder_instance(hass)
entity_id = "test.recorder"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
with patch.object(
hass.data[DATA_INSTANCE].event_session, "expire_all"
) as expire_all, patch.object(recorder, "EXPIRE_AFTER_COMMITS", 2):
for _ in range(3):
hass.states.async_set(entity_id, "on", attributes)
await async_wait_recording_done(hass, instance)
hass.states.async_set(entity_id, "off", attributes)
await async_wait_recording_done(hass, instance)
assert expire_all.called
with session_scope(hass=hass) as session:
db_states = list(session.query(States))
assert len(db_states) == 6
assert db_states[0].event_id > 0
async def test_saving_state_with_intermixed_time_changes(
hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT
):
"""Test saving states with intermixed time changes."""
instance = await async_setup_recorder_instance(hass)
entity_id = "test.recorder"
state = "restoring_from_db"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
attributes2 = {"test_attr": 10, "test_attr_10": "mean"}
for _ in range(KEEPALIVE_TIME + 1):
async_fire_time_changed(hass, dt_util.utcnow())
hass.states.async_set(entity_id, state, attributes)
for _ in range(KEEPALIVE_TIME + 1):
async_fire_time_changed(hass, dt_util.utcnow())
hass.states.async_set(entity_id, state, attributes2)
await async_wait_recording_done(hass, instance)
with session_scope(hass=hass) as session:
db_states = list(session.query(States))
assert len(db_states) == 2
assert db_states[0].event_id > 0
def test_saving_state_with_exception(hass, hass_recorder, caplog):
"""Test saving and restoring a state."""
hass = hass_recorder()
@ -130,6 +227,44 @@ def test_saving_state_with_exception(hass, hass_recorder, caplog):
assert "Error saving events" not in caplog.text
def test_saving_state_with_sqlalchemy_exception(hass, hass_recorder, caplog):
"""Test saving state when there is an SQLAlchemyError."""
hass = hass_recorder()
entity_id = "test.recorder"
state = "restoring_from_db"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
def _throw_if_state_in_session(*args, **kwargs):
for obj in hass.data[DATA_INSTANCE].event_session:
if isinstance(obj, States):
raise SQLAlchemyError(
"insert the state", "fake params", "forced to fail"
)
with patch("time.sleep"), patch.object(
hass.data[DATA_INSTANCE].event_session,
"flush",
side_effect=_throw_if_state_in_session,
):
hass.states.set(entity_id, "fail", attributes)
wait_recording_done(hass)
assert "SQLAlchemyError error processing event" in caplog.text
caplog.clear()
hass.states.set(entity_id, state, attributes)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
db_states = list(session.query(States))
assert len(db_states) >= 1
assert "Error executing query" not in caplog.text
assert "Error saving events" not in caplog.text
assert "SQLAlchemyError error processing event" not in caplog.text
def test_saving_event(hass, hass_recorder):
"""Test saving and restoring an event."""
hass = hass_recorder()
@ -171,6 +306,25 @@ def test_saving_event(hass, hass_recorder):
)
def test_saving_state_with_commit_interval_zero(hass_recorder):
"""Test saving a state with a commit interval of zero."""
hass = hass_recorder({"commit_interval": 0})
assert hass.data[DATA_INSTANCE].commit_interval == 0
entity_id = "test.recorder"
state = "restoring_from_db"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
hass.states.set(entity_id, state, attributes)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
db_states = list(session.query(States))
assert len(db_states) == 1
assert db_states[0].event_id > 0
def _add_entities(hass, entity_ids):
"""Add entities."""
attributes = {"test_attr": 5, "test_attr_10": "nice"}
@ -351,26 +505,27 @@ def test_saving_state_and_removing_entity(hass, hass_recorder):
assert states[2].state is None
def test_recorder_setup_failure():
def test_recorder_setup_failure(hass):
"""Test some exceptions."""
hass = get_test_home_assistant()
with patch.object(Recorder, "_setup_connection") as setup, patch(
"homeassistant.components.recorder.time.sleep"
):
setup.side_effect = ImportError("driver not found")
rec = Recorder(
hass,
auto_purge=True,
keep_days=7,
commit_interval=1,
uri="sqlite://",
db_max_retries=10,
db_retry_wait=3,
entity_filter=CONFIG_SCHEMA({DOMAIN: {}}),
exclude_t=[],
db_integrity_check=False,
)
rec = _default_recorder(hass)
rec.async_initialize()
rec.start()
rec.join()
hass.stop()
def test_recorder_setup_failure_without_event_listener(hass):
"""Test recorder setup failure when the event listener is not setup."""
with patch.object(Recorder, "_setup_connection") as setup, patch(
"homeassistant.components.recorder.time.sleep"
):
setup.side_effect = ImportError("driver not found")
rec = _default_recorder(hass)
rec.start()
rec.join()
@ -481,6 +636,7 @@ def test_saving_state_with_serializable_data(hass_recorder, caplog):
"""Test saving data that cannot be serialized does not crash."""
hass = hass_recorder()
hass.bus.fire("bad_event", {"fail": CannotSerializeMe()})
hass.states.set("test.one", "on", {"fail": CannotSerializeMe()})
wait_recording_done(hass)
hass.states.set("test.two", "on", {})
@ -699,15 +855,20 @@ async def test_database_corruption_while_running(hass, tmpdir, caplog):
hass.states.async_set("test.lost", "on", {})
await async_wait_recording_done_without_instance(hass)
await hass.async_add_executor_job(corrupt_db_file, test_db_file)
await async_wait_recording_done_without_instance(hass)
with patch.object(
hass.data[DATA_INSTANCE].event_session,
"close",
side_effect=OperationalError("statement", {}, []),
):
await async_wait_recording_done_without_instance(hass)
await hass.async_add_executor_job(corrupt_db_file, test_db_file)
await async_wait_recording_done_without_instance(hass)
# This state will not be recorded because
# the database corruption will be discovered
# and we will have to rollback to recover
hass.states.async_set("test.one", "off", {})
await async_wait_recording_done_without_instance(hass)
# This state will not be recorded because
# the database corruption will be discovered
# and we will have to rollback to recover
hass.states.async_set("test.one", "off", {})
await async_wait_recording_done_without_instance(hass)
assert "Unrecoverable sqlite3 database corruption detected" in caplog.text
assert "The system will rename the corrupt database file" in caplog.text

View File

@ -1,19 +1,41 @@
"""The tests for the Recorder component."""
# pylint: disable=protected-access
import datetime
import sqlite3
from unittest.mock import Mock, PropertyMock, call, patch
import pytest
from sqlalchemy import create_engine
from sqlalchemy.exc import InternalError, OperationalError, ProgrammingError
from sqlalchemy.exc import (
DatabaseError,
InternalError,
OperationalError,
ProgrammingError,
)
from sqlalchemy.pool import StaticPool
from homeassistant.bootstrap import async_setup_component
from homeassistant.components.recorder import RecorderRuns, const, migration, models
from homeassistant.components import recorder
from homeassistant.components.recorder import RecorderRuns, migration, models
from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.components.recorder.models import States
from homeassistant.components.recorder.util import session_scope
import homeassistant.util.dt as dt_util
from .common import async_wait_recording_done_without_instance
from tests.common import async_fire_time_changed, async_mock_service
from tests.components.recorder import models_original
def _get_native_states(hass, entity_id):
with session_scope(hass=hass) as session:
return [
state.to_native()
for state in session.query(States).filter(States.entity_id == entity_id)
]
def create_engine_test(*args, **kwargs):
"""Test version of create_engine that initializes with old schema.
@ -26,6 +48,7 @@ def create_engine_test(*args, **kwargs):
async def test_schema_update_calls(hass):
"""Test that schema migrations occur in correct order."""
await async_setup_component(hass, "persistent_notification", {})
with patch(
"homeassistant.components.recorder.create_engine", new=create_engine_test
), patch(
@ -35,16 +58,147 @@ async def test_schema_update_calls(hass):
await async_setup_component(
hass, "recorder", {"recorder": {"db_url": "sqlite://"}}
)
await hass.async_block_till_done()
await async_wait_recording_done_without_instance(hass)
update.assert_has_calls(
[
call(hass.data[const.DATA_INSTANCE].engine, version + 1, 0)
call(hass.data[DATA_INSTANCE].engine, version + 1, 0)
for version in range(0, models.SCHEMA_VERSION)
]
)
async def test_database_migration_failed(hass):
"""Test we notify if the migration fails."""
await async_setup_component(hass, "persistent_notification", {})
create_calls = async_mock_service(hass, "persistent_notification", "create")
dismiss_calls = async_mock_service(hass, "persistent_notification", "dismiss")
with patch(
"homeassistant.components.recorder.create_engine", new=create_engine_test
), patch(
"homeassistant.components.recorder.migration._apply_update",
side_effect=ValueError,
):
await async_setup_component(
hass, "recorder", {"recorder": {"db_url": "sqlite://"}}
)
hass.states.async_set("my.entity", "on", {})
hass.states.async_set("my.entity", "off", {})
await hass.async_block_till_done()
await hass.async_add_executor_job(hass.data[DATA_INSTANCE].join)
await hass.async_block_till_done()
assert len(create_calls) == 2
assert len(dismiss_calls) == 1
async def test_database_migration_encounters_corruption(hass):
"""Test we move away the database if its corrupt."""
await async_setup_component(hass, "persistent_notification", {})
sqlite3_exception = DatabaseError("statement", {}, [])
sqlite3_exception.__cause__ = sqlite3.DatabaseError()
with patch(
"homeassistant.components.recorder.migration.schema_is_current",
side_effect=[False, True],
), patch(
"homeassistant.components.recorder.migration.migrate_schema",
side_effect=sqlite3_exception,
), patch(
"homeassistant.components.recorder.move_away_broken_database"
) as move_away:
await async_setup_component(
hass, "recorder", {"recorder": {"db_url": "sqlite://"}}
)
hass.states.async_set("my.entity", "on", {})
hass.states.async_set("my.entity", "off", {})
await async_wait_recording_done_without_instance(hass)
assert move_away.called
async def test_database_migration_encounters_corruption_not_sqlite(hass):
"""Test we fail on database error when we cannot recover."""
await async_setup_component(hass, "persistent_notification", {})
create_calls = async_mock_service(hass, "persistent_notification", "create")
dismiss_calls = async_mock_service(hass, "persistent_notification", "dismiss")
with patch(
"homeassistant.components.recorder.migration.schema_is_current",
side_effect=[False, True],
), patch(
"homeassistant.components.recorder.migration.migrate_schema",
side_effect=DatabaseError("statement", {}, []),
), patch(
"homeassistant.components.recorder.move_away_broken_database"
) as move_away:
await async_setup_component(
hass, "recorder", {"recorder": {"db_url": "sqlite://"}}
)
hass.states.async_set("my.entity", "on", {})
hass.states.async_set("my.entity", "off", {})
await hass.async_block_till_done()
await hass.async_add_executor_job(hass.data[DATA_INSTANCE].join)
await hass.async_block_till_done()
assert not move_away.called
assert len(create_calls) == 2
assert len(dismiss_calls) == 1
async def test_events_during_migration_are_queued(hass):
"""Test that events during migration are queued."""
await async_setup_component(hass, "persistent_notification", {})
with patch(
"homeassistant.components.recorder.create_engine", new=create_engine_test
):
await async_setup_component(
hass, "recorder", {"recorder": {"db_url": "sqlite://"}}
)
hass.states.async_set("my.entity", "on", {})
hass.states.async_set("my.entity", "off", {})
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=2))
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=4))
await hass.data[DATA_INSTANCE].async_recorder_ready.wait()
await async_wait_recording_done_without_instance(hass)
db_states = await hass.async_add_executor_job(_get_native_states, hass, "my.entity")
assert len(db_states) == 2
async def test_events_during_migration_queue_exhausted(hass):
"""Test that events during migration takes so long the queue is exhausted."""
await async_setup_component(hass, "persistent_notification", {})
with patch(
"homeassistant.components.recorder.create_engine", new=create_engine_test
), patch.object(recorder, "MAX_QUEUE_BACKLOG", 1):
await async_setup_component(
hass, "recorder", {"recorder": {"db_url": "sqlite://"}}
)
hass.states.async_set("my.entity", "on", {})
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=2))
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=4))
await hass.async_block_till_done()
hass.states.async_set("my.entity", "off", {})
await hass.data[DATA_INSTANCE].async_recorder_ready.wait()
await async_wait_recording_done_without_instance(hass)
db_states = await hass.async_add_executor_job(_get_native_states, hass, "my.entity")
assert len(db_states) == 1
hass.states.async_set("my.entity", "on", {})
await async_wait_recording_done_without_instance(hass)
db_states = await hass.async_add_executor_job(_get_native_states, hass, "my.entity")
assert len(db_states) == 2
async def test_schema_migrate(hass):
"""Test the full schema migration logic.
@ -53,6 +207,8 @@ async def test_schema_migrate(hass):
inspection could quickly become quite cumbersome.
"""
await async_setup_component(hass, "persistent_notification", {})
def _mock_setup_run(self):
self.run_info = RecorderRuns(
start=self.recording_start, created=dt_util.utcnow()

View File

@ -1,7 +1,10 @@
"""Test data purging."""
from datetime import datetime, timedelta
import json
import sqlite3
from unittest.mock import patch
from sqlalchemy.exc import DatabaseError
from sqlalchemy.orm.session import Session
from homeassistant.components import recorder
@ -16,6 +19,7 @@ from .common import (
async_recorder_block_till_done,
async_wait_purge_done,
async_wait_recording_done,
async_wait_recording_done_without_instance,
)
from .conftest import SetupRecorderInstanceT
@ -52,6 +56,38 @@ async def test_purge_old_states(
assert states.count() == 2
async def test_purge_old_states_encouters_database_corruption(
hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT
):
"""Test database image image is malformed while deleting old states."""
instance = await async_setup_recorder_instance(hass)
await _add_test_states(hass, instance)
await async_wait_recording_done_without_instance(hass)
sqlite3_exception = DatabaseError("statement", {}, [])
sqlite3_exception.__cause__ = sqlite3.DatabaseError()
with patch(
"homeassistant.components.recorder.move_away_broken_database"
) as move_away, patch(
"homeassistant.components.recorder.purge.purge_old_data",
side_effect=sqlite3_exception,
):
await hass.services.async_call(
recorder.DOMAIN, recorder.SERVICE_PURGE, {"keep_days": 0}
)
await hass.async_block_till_done()
await async_wait_recording_done_without_instance(hass)
assert move_away.called
# Ensure the whole database was reset due to the database error
with session_scope(hass=hass) as session:
states_after_purge = session.query(States)
assert states_after_purge.count() == 0
async def test_purge_old_events(
hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT
):

View File

@ -74,75 +74,28 @@ def test_recorder_bad_execute(hass_recorder):
assert e_mock.call_count == 2
def test_validate_or_move_away_sqlite_database_with_integrity_check(
hass, tmpdir, caplog
):
"""Ensure a malformed sqlite database is moved away.
A quick_check is run here
"""
db_integrity_check = True
def test_validate_or_move_away_sqlite_database(hass, tmpdir, caplog):
"""Ensure a malformed sqlite database is moved away."""
test_dir = tmpdir.mkdir("test_validate_or_move_away_sqlite_database")
test_db_file = f"{test_dir}/broken.db"
dburl = f"{SQLITE_URL_PREFIX}{test_db_file}"
assert util.validate_sqlite_database(test_db_file, db_integrity_check) is False
assert util.validate_sqlite_database(test_db_file) is False
assert os.path.exists(test_db_file) is True
assert (
util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is False
)
assert util.validate_or_move_away_sqlite_database(dburl) is False
corrupt_db_file(test_db_file)
assert util.validate_sqlite_database(dburl, db_integrity_check) is False
assert util.validate_sqlite_database(dburl) is False
assert (
util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is False
)
assert util.validate_or_move_away_sqlite_database(dburl) is False
assert "corrupt or malformed" in caplog.text
assert util.validate_sqlite_database(dburl, db_integrity_check) is False
assert util.validate_sqlite_database(dburl) is False
assert util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is True
def test_validate_or_move_away_sqlite_database_without_integrity_check(
hass, tmpdir, caplog
):
"""Ensure a malformed sqlite database is moved away.
The quick_check is skipped, but we can still find
corruption if the whole database is unreadable
"""
db_integrity_check = False
test_dir = tmpdir.mkdir("test_validate_or_move_away_sqlite_database")
test_db_file = f"{test_dir}/broken.db"
dburl = f"{SQLITE_URL_PREFIX}{test_db_file}"
assert util.validate_sqlite_database(test_db_file, db_integrity_check) is False
assert os.path.exists(test_db_file) is True
assert (
util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is False
)
corrupt_db_file(test_db_file)
assert util.validate_sqlite_database(dburl, db_integrity_check) is False
assert (
util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is False
)
assert "corrupt or malformed" in caplog.text
assert util.validate_sqlite_database(dburl, db_integrity_check) is False
assert util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is True
assert util.validate_or_move_away_sqlite_database(dburl) is True
async def test_last_run_was_recently_clean(hass):
@ -197,12 +150,10 @@ def test_combined_checks(hass_recorder, caplog):
cursor = hass.data[DATA_INSTANCE].engine.raw_connection().cursor()
assert util.run_checks_on_open_db("fake_db_path", cursor, False) is None
assert "skipped because db_integrity_check was disabled" in caplog.text
assert util.run_checks_on_open_db("fake_db_path", cursor) is None
assert "could not validate that the sqlite3 database" in caplog.text
caplog.clear()
assert util.run_checks_on_open_db("fake_db_path", cursor, True) is None
assert "could not validate that the sqlite3 database" in caplog.text
# We are patching recorder.util here in order
# to avoid creating the full database on disk
@ -210,50 +161,36 @@ def test_combined_checks(hass_recorder, caplog):
"homeassistant.components.recorder.util.basic_sanity_check", return_value=False
):
caplog.clear()
assert util.run_checks_on_open_db("fake_db_path", cursor, False) is None
assert "skipped because db_integrity_check was disabled" in caplog.text
caplog.clear()
assert util.run_checks_on_open_db("fake_db_path", cursor, True) is None
assert util.run_checks_on_open_db("fake_db_path", cursor) is None
assert "could not validate that the sqlite3 database" in caplog.text
# We are patching recorder.util here in order
# to avoid creating the full database on disk
with patch("homeassistant.components.recorder.util.last_run_was_recently_clean"):
caplog.clear()
assert util.run_checks_on_open_db("fake_db_path", cursor, False) is None
assert (
"system was restarted cleanly and passed the basic sanity check"
in caplog.text
)
caplog.clear()
assert util.run_checks_on_open_db("fake_db_path", cursor, True) is None
assert (
"system was restarted cleanly and passed the basic sanity check"
in caplog.text
)
assert util.run_checks_on_open_db("fake_db_path", cursor) is None
assert "restarted cleanly and passed the basic sanity check" in caplog.text
caplog.clear()
with patch(
"homeassistant.components.recorder.util.last_run_was_recently_clean",
side_effect=sqlite3.DatabaseError,
), pytest.raises(sqlite3.DatabaseError):
util.run_checks_on_open_db("fake_db_path", cursor, False)
util.run_checks_on_open_db("fake_db_path", cursor)
caplog.clear()
with patch(
"homeassistant.components.recorder.util.last_run_was_recently_clean",
side_effect=sqlite3.DatabaseError,
), pytest.raises(sqlite3.DatabaseError):
util.run_checks_on_open_db("fake_db_path", cursor, True)
util.run_checks_on_open_db("fake_db_path", cursor)
cursor.execute("DROP TABLE events;")
caplog.clear()
with pytest.raises(sqlite3.DatabaseError):
util.run_checks_on_open_db("fake_db_path", cursor, False)
util.run_checks_on_open_db("fake_db_path", cursor)
caplog.clear()
with pytest.raises(sqlite3.DatabaseError):
util.run_checks_on_open_db("fake_db_path", cursor, True)
util.run_checks_on_open_db("fake_db_path", cursor)