core/homeassistant/components/recorder/core.py

1514 lines
57 KiB
Python

"""Support for recording details."""
from __future__ import annotations
import asyncio
from collections.abc import Callable, Iterable
from concurrent.futures import CancelledError
import contextlib
from datetime import datetime, timedelta
from functools import cached_property
import logging
import queue
import sqlite3
import threading
import time
from typing import TYPE_CHECKING, Any, cast
import psutil_home_assistant as ha_psutil
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select, update
from sqlalchemy.engine import Engine
from sqlalchemy.engine.interfaces import DBAPIConnection
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.orm.session import Session
from homeassistant.components import persistent_notification
from homeassistant.const import (
ATTR_ENTITY_ID,
EVENT_HOMEASSISTANT_CLOSE,
EVENT_HOMEASSISTANT_FINAL_WRITE,
EVENT_STATE_CHANGED,
MATCH_ALL,
)
from homeassistant.core import (
CALLBACK_TYPE,
Event,
EventStateChangedData,
HomeAssistant,
callback,
)
from homeassistant.helpers.event import (
async_track_time_change,
async_track_time_interval,
async_track_utc_time_change,
)
from homeassistant.helpers.start import async_at_started
from homeassistant.helpers.typing import UNDEFINED, UndefinedType
import homeassistant.util.dt as dt_util
from homeassistant.util.enum import try_parse_enum
from homeassistant.util.event_type import EventType
from . import migration, statistics
from .const import (
DB_WORKER_PREFIX,
DOMAIN,
ESTIMATED_QUEUE_ITEM_SIZE,
KEEPALIVE_TIME,
LAST_REPORTED_SCHEMA_VERSION,
LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION,
MARIADB_PYMYSQL_URL_PREFIX,
MARIADB_URL_PREFIX,
MAX_QUEUE_BACKLOG_MIN_VALUE,
MYSQLDB_PYMYSQL_URL_PREFIX,
MYSQLDB_URL_PREFIX,
QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY,
SQLITE_MAX_BIND_VARS,
SQLITE_URL_PREFIX,
STATISTICS_ROWS_SCHEMA_VERSION,
SupportedDialect,
)
from .db_schema import (
LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX,
LEGACY_STATES_EVENT_ID_INDEX,
SCHEMA_VERSION,
TABLE_STATES,
Base,
EventData,
Events,
EventTypes,
StateAttributes,
States,
StatesMeta,
Statistics,
StatisticsShortTerm,
)
from .executor import DBInterruptibleThreadPoolExecutor
from .migration import (
EntityIDMigration,
EventsContextIDMigration,
EventTypeIDMigration,
StatesContextIDMigration,
)
from .models import DatabaseEngine, StatisticData, StatisticMetaData, UnsupportedDialect
from .pool import POOL_SIZE, MutexPool, RecorderPool
from .queries import get_migration_changes
from .table_managers.event_data import EventDataManager
from .table_managers.event_types import EventTypeManager
from .table_managers.recorder_runs import RecorderRunsManager
from .table_managers.state_attributes import StateAttributesManager
from .table_managers.states import StatesManager
from .table_managers.states_meta import StatesMetaManager
from .table_managers.statistics_meta import StatisticsMetaManager
from .tasks import (
AdjustLRUSizeTask,
AdjustStatisticsTask,
ChangeStatisticsUnitTask,
ClearStatisticsTask,
CommitTask,
CompileMissingStatisticsTask,
DatabaseLockTask,
EntityIDPostMigrationTask,
EventIdMigrationTask,
ImportStatisticsTask,
KeepAliveTask,
PerodicCleanupTask,
PurgeTask,
RecorderTask,
StatisticsTask,
StopTask,
SynchronizeTask,
UpdateStatesMetadataTask,
UpdateStatisticsMetadataTask,
WaitTask,
)
from .util import (
async_create_backup_failure_issue,
build_mysqldb_conv,
dburl_to_path,
end_incomplete_runs,
execute_stmt_lambda_element,
get_index_by_name,
is_second_sunday,
move_away_broken_database,
session_scope,
setup_connection_for_dialect,
validate_or_move_away_sqlite_database,
write_lock_db_sqlite,
)
_LOGGER = logging.getLogger(__name__)
DEFAULT_URL = "sqlite:///{hass_config_path}"
# Controls how often we clean up
# States and Events objects
EXPIRE_AFTER_COMMITS = 120
SHUTDOWN_TASK = object()
COMMIT_TASK = CommitTask()
KEEP_ALIVE_TASK = KeepAliveTask()
WAIT_TASK = WaitTask()
ADJUST_LRU_SIZE_TASK = AdjustLRUSizeTask()
DB_LOCK_TIMEOUT = 30
DB_LOCK_QUEUE_CHECK_TIMEOUT = 10 # check every 10 seconds
INVALIDATED_ERR = "Database connection invalidated"
CONNECTIVITY_ERR = "Error in database connectivity during commit"
# Pool size must accommodate Recorder thread + All db executors
MAX_DB_EXECUTOR_WORKERS = POOL_SIZE - 1
class Recorder(threading.Thread):
"""A threaded recorder class."""
stop_requested: bool
def __init__(
self,
hass: HomeAssistant,
auto_purge: bool,
auto_repack: bool,
keep_days: int,
commit_interval: int,
uri: str,
db_max_retries: int,
db_retry_wait: int,
entity_filter: Callable[[str], bool] | None,
exclude_event_types: set[EventType[Any] | str],
) -> None:
"""Initialize the recorder."""
threading.Thread.__init__(self, name="Recorder")
self.hass = hass
self.thread_id: int | None = None
self.recorder_and_worker_thread_ids: set[int] = set()
self.auto_purge = auto_purge
self.auto_repack = auto_repack
self.keep_days = keep_days
self.is_running: bool = False
self._hass_started: asyncio.Future[object] = hass.loop.create_future()
self.commit_interval = commit_interval
self._queue: queue.SimpleQueue[RecorderTask | Event] = queue.SimpleQueue()
self.db_url = uri
self.db_max_retries = db_max_retries
self.db_retry_wait = db_retry_wait
self.database_engine: DatabaseEngine | None = None
# Database connection is ready, but non-live migration may be in progress
db_connected: asyncio.Future[bool] = hass.data[DOMAIN].db_connected
self.async_db_connected: asyncio.Future[bool] = db_connected
# Database is ready to use but live migration may be in progress
self.async_db_ready: asyncio.Future[bool] = hass.loop.create_future()
# Database is ready to use and all migration steps completed (used by tests)
self.async_recorder_ready = asyncio.Event()
self._queue_watch = threading.Event()
self.engine: Engine | None = None
self.max_backlog: int = MAX_QUEUE_BACKLOG_MIN_VALUE
self._psutil: ha_psutil.PsutilWrapper | None = None
# The entity_filter is exposed on the recorder instance so that
# it can be used to see if an entity is being recorded and is called
# by is_entity_recorder and the sensor recorder.
self.entity_filter = entity_filter
self.exclude_event_types = exclude_event_types
self.schema_version = 0
self._commits_without_expire = 0
self._event_session_has_pending_writes = False
self.recorder_runs_manager = RecorderRunsManager()
self.states_manager = StatesManager()
self.event_data_manager = EventDataManager(self)
self.event_type_manager = EventTypeManager(self)
self.states_meta_manager = StatesMetaManager(self)
self.state_attributes_manager = StateAttributesManager(self)
self.statistics_meta_manager = StatisticsMetaManager(self)
self.event_session: Session | None = None
self._get_session: Callable[[], Session] | None = None
self._completed_first_database_setup: bool | None = None
self.async_migration_event = asyncio.Event()
self.migration_in_progress = False
self.migration_is_live = False
self.use_legacy_events_index = False
self._database_lock_task: DatabaseLockTask | None = None
self._db_executor: DBInterruptibleThreadPoolExecutor | None = None
self._event_listener: CALLBACK_TYPE | None = None
self._queue_watcher: CALLBACK_TYPE | None = None
self._keep_alive_listener: CALLBACK_TYPE | None = None
self._commit_listener: CALLBACK_TYPE | None = None
self._periodic_listener: CALLBACK_TYPE | None = None
self._nightly_listener: CALLBACK_TYPE | None = None
self._dialect_name: SupportedDialect | None = None
self.enabled = True
# For safety we default to the lowest value for max_bind_vars
# of all the DB types (SQLITE_MAX_BIND_VARS).
#
# We update the value once we connect to the DB
# and determine what is actually supported.
self.max_bind_vars = SQLITE_MAX_BIND_VARS
@property
def backlog(self) -> int:
"""Return the number of items in the recorder backlog."""
return self._queue.qsize()
@cached_property
def dialect_name(self) -> SupportedDialect | None:
"""Return the dialect the recorder uses."""
return self._dialect_name
@property
def _using_file_sqlite(self) -> bool:
"""Short version to check if we are using sqlite3 as a file."""
return self.db_url != SQLITE_URL_PREFIX and self.db_url.startswith(
SQLITE_URL_PREFIX
)
@property
def recording(self) -> bool:
"""Return if the recorder is recording."""
return self._event_listener is not None
def get_session(self) -> Session:
"""Get a new sqlalchemy session."""
if self._get_session is None:
raise RuntimeError("The database connection has not been established")
return self._get_session()
def queue_task(self, task: RecorderTask | Event) -> None:
"""Add a task to the recorder queue."""
self._queue.put(task)
def set_enable(self, enable: bool) -> None:
"""Enable or disable recording events and states."""
self.enabled = enable
@callback
def async_start_executor(self) -> None:
"""Start the executor."""
self._db_executor = DBInterruptibleThreadPoolExecutor(
self.recorder_and_worker_thread_ids,
thread_name_prefix=DB_WORKER_PREFIX,
max_workers=MAX_DB_EXECUTOR_WORKERS,
shutdown_hook=self._shutdown_pool,
)
def _shutdown_pool(self) -> None:
"""Close the dbpool connections in the current thread."""
if self.engine and hasattr(self.engine.pool, "shutdown"):
self.engine.pool.shutdown()
@callback
def async_initialize(self) -> None:
"""Initialize the recorder."""
entity_filter = self.entity_filter
exclude_event_types = self.exclude_event_types
queue_put = self._queue.put_nowait
@callback
def _event_listener(event: Event) -> None:
"""Listen for new events and put them in the process queue."""
if event.event_type in exclude_event_types:
return
if (
entity_filter is None
or (entity_id := event.data.get(ATTR_ENTITY_ID)) is None
):
queue_put(event)
return
if isinstance(entity_id, str):
if entity_filter(entity_id):
queue_put(event)
return
if isinstance(entity_id, list):
for eid in entity_id:
if entity_filter(eid):
queue_put(event)
return
return
# Unknown what it is.
queue_put(event)
self._event_listener = self.hass.bus.async_listen(
MATCH_ALL,
_event_listener,
)
self._queue_watcher = async_track_time_interval(
self.hass,
self._async_check_queue,
timedelta(minutes=10),
name="Recorder queue watcher",
)
@callback
def _async_keep_alive(self, now: datetime) -> None:
"""Queue a keep alive."""
if self._event_listener:
self.queue_task(KEEP_ALIVE_TASK)
@callback
def _async_commit(self, now: datetime) -> None:
"""Queue a commit."""
if (
self._event_listener
and not self._database_lock_task
and self._event_session_has_pending_writes
):
self.queue_task(COMMIT_TASK)
@callback
def async_add_executor_job[_T](
self, target: Callable[..., _T], *args: Any
) -> asyncio.Future[_T]:
"""Add an executor job from within the event loop."""
return self.hass.loop.run_in_executor(self._db_executor, target, *args)
def _stop_executor(self) -> None:
"""Stop the executor."""
if self._db_executor is None:
return
self._db_executor.shutdown()
self._db_executor = None
@callback
def _async_check_queue(self, *_: Any) -> None:
"""Periodic check of the queue size to ensure we do not exhaust memory.
The queue grows during migration or if something really goes wrong.
"""
size = self.backlog
_LOGGER.debug("Recorder queue size is: %s", size)
if not self._reached_max_backlog_percentage(100):
return
_LOGGER.error(
(
"The recorder backlog queue reached the maximum size of %s events; "
"usually, the system is CPU bound, I/O bound, or the database "
"is corrupt due to a disk problem; The recorder will stop "
"recording events to avoid running out of memory"
),
self.backlog,
)
self._async_stop_queue_watcher_and_event_listener()
def _available_memory(self) -> int:
"""Return the available memory in bytes."""
if not self._psutil:
self._psutil = ha_psutil.PsutilWrapper()
return cast(int, self._psutil.psutil.virtual_memory().available)
def _reached_max_backlog_percentage(self, percentage: int) -> bool:
"""Check if the system has reached the max queue backlog and return the maximum if it has."""
percentage_modifier = percentage / 100
current_backlog = self.backlog
# First check the minimum value since its cheap
if current_backlog < (MAX_QUEUE_BACKLOG_MIN_VALUE * percentage_modifier):
return False
# If they have more RAM available, keep filling the backlog
# since we do not want to stop recording events or give the
# user a bad backup when they have plenty of RAM available.
max_queue_backlog = int(
QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY
* (self._available_memory() / ESTIMATED_QUEUE_ITEM_SIZE)
)
self.max_backlog = max(max_queue_backlog, MAX_QUEUE_BACKLOG_MIN_VALUE)
return current_backlog >= (max_queue_backlog * percentage_modifier)
@callback
def _async_stop_queue_watcher_and_event_listener(self) -> None:
"""Stop watching the queue and listening for events."""
if self._queue_watcher:
self._queue_watcher()
self._queue_watcher = None
if self._event_listener:
self._event_listener()
self._event_listener = None
@callback
def _async_stop_listeners(self) -> None:
"""Stop listeners."""
self._async_stop_queue_watcher_and_event_listener()
if self._keep_alive_listener:
self._keep_alive_listener()
self._keep_alive_listener = None
if self._commit_listener:
self._commit_listener()
self._commit_listener = None
if self._nightly_listener:
self._nightly_listener()
self._nightly_listener = None
if self._periodic_listener:
self._periodic_listener()
self._periodic_listener = None
async def _async_close(self, event: Event) -> None:
"""Empty the queue if its still present at close."""
# If the queue is full of events to be processed because
# the database is so broken that every event results in a retry
# we will never be able to get though the events to shutdown in time.
#
# We drain all the events in the queue and then insert
# an empty one to ensure the next thing the recorder sees
# is a request to shutdown.
while True:
try:
self._queue.get_nowait()
except queue.Empty:
break
self.queue_task(StopTask())
await self.hass.async_add_executor_job(self.join)
async def _async_shutdown(self, event: Event) -> None:
"""Shut down the Recorder at final write."""
if not self._hass_started.done():
self._hass_started.set_result(SHUTDOWN_TASK)
self.queue_task(StopTask())
self._async_stop_listeners()
await self.hass.async_add_executor_job(self.join)
@callback
def _async_hass_started(self, hass: HomeAssistant) -> None:
"""Notify that hass has started."""
self._hass_started.set_result(None)
@callback
def async_register(self) -> None:
"""Post connection initialize."""
bus = self.hass.bus
bus.async_listen_once(EVENT_HOMEASSISTANT_CLOSE, self._async_close)
bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, self._async_shutdown)
async_at_started(self.hass, self._async_hass_started)
@callback
def _async_startup_failed(self) -> None:
"""Report startup failure."""
# If a live migration failed, we were able to connect (async_db_connected
# marked True), the database was marked ready (async_db_ready marked
# True), the data in the queue cannot be written to the database because
# the schema not in the correct format so we must stop listeners and report
# failure.
if not self.async_db_connected.done():
self.async_db_connected.set_result(False)
if not self.async_db_ready.done():
self.async_db_ready.set_result(False)
persistent_notification.async_create(
self.hass,
"The recorder could not start, check [the logs](/config/logs)",
"Recorder",
)
self._async_stop_listeners()
@callback
def async_connection_success(self) -> None:
"""Connect to the database succeeded, schema version and migration need known.
The database may not yet be ready for use in case of a non-live migration.
"""
self.async_db_connected.set_result(True)
@callback
def async_set_db_ready(self) -> None:
"""Database live and ready for use.
Called after non-live migration steps are finished.
"""
if self.async_db_ready.done():
return
self.async_db_ready.set_result(True)
self.async_start_executor()
@callback
def _async_set_recorder_ready_migration_done(self) -> None:
"""Finish start and mark recorder ready.
Called after all migration steps are finished.
"""
self._async_setup_periodic_tasks()
self.async_recorder_ready.set()
@callback
def async_nightly_tasks(self, now: datetime) -> None:
"""Trigger the purge."""
if self.auto_purge:
# Purge will schedule the periodic cleanups
# after it completes to ensure it does not happen
# until after the database is vacuumed
repack = self.auto_repack and is_second_sunday(now)
purge_before = dt_util.utcnow() - timedelta(days=self.keep_days)
self.queue_task(PurgeTask(purge_before, repack=repack, apply_filter=False))
else:
self.queue_task(PerodicCleanupTask())
@callback
def _async_five_minute_tasks(self, now: datetime) -> None:
"""Run tasks every five minutes."""
self.queue_task(ADJUST_LRU_SIZE_TASK)
self.async_periodic_statistics()
def _adjust_lru_size(self) -> None:
"""Trigger the LRU adjustment.
If the number of entities has increased, increase the size of the LRU
cache to avoid thrashing.
"""
if new_size := self.hass.states.async_entity_ids_count() * 2:
self.state_attributes_manager.adjust_lru_size(new_size)
self.states_meta_manager.adjust_lru_size(new_size)
self.statistics_meta_manager.adjust_lru_size(new_size)
@callback
def async_periodic_statistics(self) -> None:
"""Trigger the statistics run.
Short term statistics run every 5 minutes
"""
start = statistics.get_start_time()
self.queue_task(StatisticsTask(start, True))
@callback
def async_adjust_statistics(
self,
statistic_id: str,
start_time: datetime,
sum_adjustment: float,
adjustment_unit: str,
) -> None:
"""Adjust statistics."""
self.queue_task(
AdjustStatisticsTask(
statistic_id, start_time, sum_adjustment, adjustment_unit
)
)
@callback
def async_clear_statistics(self, statistic_ids: list[str]) -> None:
"""Clear statistics for a list of statistic_ids."""
self.queue_task(ClearStatisticsTask(statistic_ids))
@callback
def async_update_statistics_metadata(
self,
statistic_id: str,
*,
new_statistic_id: str | UndefinedType = UNDEFINED,
new_unit_of_measurement: str | None | UndefinedType = UNDEFINED,
) -> None:
"""Update statistics metadata for a statistic_id."""
self.queue_task(
UpdateStatisticsMetadataTask(
statistic_id, new_statistic_id, new_unit_of_measurement
)
)
@callback
def async_update_states_metadata(
self,
entity_id: str,
new_entity_id: str,
) -> None:
"""Update states metadata for an entity_id."""
self.queue_task(UpdateStatesMetadataTask(entity_id, new_entity_id))
@callback
def async_change_statistics_unit(
self,
statistic_id: str,
*,
new_unit_of_measurement: str,
old_unit_of_measurement: str,
) -> None:
"""Change statistics unit for a statistic_id."""
self.queue_task(
ChangeStatisticsUnitTask(
statistic_id, new_unit_of_measurement, old_unit_of_measurement
)
)
@callback
def async_import_statistics(
self,
metadata: StatisticMetaData,
stats: Iterable[StatisticData],
table: type[Statistics | StatisticsShortTerm],
) -> None:
"""Schedule import of statistics."""
self.queue_task(ImportStatisticsTask(metadata, stats, table))
@callback
def _async_setup_periodic_tasks(self) -> None:
"""Prepare periodic tasks."""
if self.hass.is_stopping or not self._get_session:
# Home Assistant is shutting down
return
# If the db is using a socket connection, we need to keep alive
# to prevent errors from unexpected disconnects
if self.dialect_name != SupportedDialect.SQLITE:
self._keep_alive_listener = async_track_time_interval(
self.hass,
self._async_keep_alive,
timedelta(seconds=KEEPALIVE_TIME),
name="Recorder keep alive",
)
# If the commit interval is not 0, we need to commit periodically
if self.commit_interval:
self._commit_listener = async_track_time_interval(
self.hass,
self._async_commit,
timedelta(seconds=self.commit_interval),
name="Recorder commit",
)
# Run nightly tasks at 4:12am
self._nightly_listener = async_track_time_change(
self.hass, self.async_nightly_tasks, hour=4, minute=12, second=0
)
# Compile short term statistics every 5 minutes
self._periodic_listener = async_track_utc_time_change(
self.hass, self._async_five_minute_tasks, minute=range(0, 60, 5), second=10
)
async def _async_wait_for_started(self) -> object | None:
"""Wait for the hass started future."""
return await self._hass_started
def _wait_startup_or_shutdown(self) -> object | None:
"""Wait for startup or shutdown before starting."""
try:
return asyncio.run_coroutine_threadsafe(
self._async_wait_for_started(), self.hass.loop
).result()
except CancelledError as ex:
_LOGGER.warning(
"Recorder startup was externally canceled before it could complete: %s",
ex,
)
return SHUTDOWN_TASK
def run(self) -> None:
"""Run the recorder thread."""
self.is_running = True
try:
self._run()
except Exception:
_LOGGER.exception(
"Recorder._run threw unexpected exception, recorder shutting down"
)
finally:
# Ensure shutdown happens cleanly if
# anything goes wrong in the run loop
self.is_running = False
self._shutdown()
def _add_to_session(self, session: Session, obj: object) -> None:
"""Add an object to the session."""
self._event_session_has_pending_writes = True
session.add(obj)
def _run(self) -> None:
"""Start processing events to save."""
thread_id = threading.get_ident()
self.thread_id = thread_id
self.recorder_and_worker_thread_ids.add(thread_id)
setup_result = self._setup_recorder()
if not setup_result:
# Give up if we could not connect
return
schema_status = migration.validate_db_schema(self.hass, self, self.get_session)
if schema_status is None:
# Give up if we could not validate the schema
return
self.schema_version = schema_status.current_version
if schema_status.valid:
self._setup_run()
else:
self.migration_in_progress = True
self.migration_is_live = migration.live_migration(schema_status)
self.hass.add_job(self.async_connection_success)
database_was_ready = self.migration_is_live or schema_status.valid
if database_was_ready:
# If the migrate is live or the schema is valid, we need to
# wait for startup to complete. If its not live, we need to continue
# on.
self._activate_and_set_db_ready()
# We wait to start a live migration until startup has finished
# since it can be cpu intensive and we do not want it to compete
# with startup which is also cpu intensive
if self._wait_startup_or_shutdown() is SHUTDOWN_TASK:
# Shutdown happened before Home Assistant finished starting
self.migration_in_progress = False
# Make sure we cleanly close the run if
# we restart before startup finishes
return
if not schema_status.valid:
if self._migrate_schema_and_setup_run(schema_status):
self.schema_version = SCHEMA_VERSION
if not self._event_listener:
# If the schema migration takes so long that the end
# queue watcher safety kicks in because _reached_max_backlog
# was True, we need to reinitialize the listener.
self.hass.add_job(self.async_initialize)
else:
persistent_notification.create(
self.hass,
"The database migration failed, check [the logs](/config/logs).",
"Database Migration Failed",
"recorder_database_migration",
)
return
if not database_was_ready:
self._activate_and_set_db_ready()
# Catch up with missed statistics
self._schedule_compile_missing_statistics()
_LOGGER.debug("Recorder processing the queue")
self._adjust_lru_size()
self.hass.add_job(self._async_set_recorder_ready_migration_done)
self._run_event_loop()
def _activate_and_set_db_ready(self) -> None:
"""Activate the table managers or schedule migrations and mark the db as ready."""
with session_scope(session=self.get_session()) as session:
# Prime the statistics meta manager as soon as possible
# since we want the frontend queries to avoid a thundering
# herd of queries to find the statistics meta data if
# there are a lot of statistics graphs on the frontend.
schema_version = self.schema_version
if schema_version >= STATISTICS_ROWS_SCHEMA_VERSION:
self.statistics_meta_manager.load(session)
migration_changes: dict[str, int] = {
row[0]: row[1]
for row in execute_stmt_lambda_element(session, get_migration_changes())
}
for migrator_cls in (StatesContextIDMigration, EventsContextIDMigration):
migrator = migrator_cls(session, schema_version, migration_changes)
if migrator.needs_migrate():
self.queue_task(migrator.task())
migrator = EventTypeIDMigration(session, schema_version, migration_changes)
if migrator.needs_migrate():
self.queue_task(migrator.task())
else:
_LOGGER.debug("Activating event_types manager as all data is migrated")
self.event_type_manager.active = True
migrator = EntityIDMigration(session, schema_version, migration_changes)
if migrator.needs_migrate():
self.queue_task(migrator.task())
else:
_LOGGER.debug("Activating states_meta manager as all data is migrated")
self.states_meta_manager.active = True
with contextlib.suppress(SQLAlchemyError):
# If ix_states_entity_id_last_updated_ts still exists
# on the states table it means the entity id migration
# finished by the EntityIDPostMigrationTask did not
# because they restarted in the middle of it. We need
# to pick back up where we left off.
if get_index_by_name(
session,
TABLE_STATES,
LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX,
):
self.queue_task(EntityIDPostMigrationTask())
if self.schema_version > LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION:
with contextlib.suppress(SQLAlchemyError):
# If the index of event_ids on the states table is still present
# we need to queue a task to remove it.
if get_index_by_name(
session, TABLE_STATES, LEGACY_STATES_EVENT_ID_INDEX
):
self.queue_task(EventIdMigrationTask())
self.use_legacy_events_index = True
# We must only set the db ready after we have set the table managers
# to active if there is no data to migrate.
#
# This ensures that the history queries will use the new tables
# and not the old ones as soon as the API is available.
self.hass.add_job(self.async_set_db_ready)
def _run_event_loop(self) -> None:
"""Run the event loop for the recorder."""
# Use a session for the event read loop
# with a commit every time the event time
# has changed. This reduces the disk io.
queue_ = self._queue
startup_task_or_events: list[RecorderTask | Event] = []
while not queue_.empty() and (task_or_event := queue_.get_nowait()):
startup_task_or_events.append(task_or_event)
self._pre_process_startup_events(startup_task_or_events)
for task in startup_task_or_events:
self._guarded_process_one_task_or_event_or_recover(task)
# Clear startup tasks since this thread runs forever
# and we don't want to hold them in memory
del startup_task_or_events
self.stop_requested = False
while not self.stop_requested:
self._guarded_process_one_task_or_event_or_recover(queue_.get())
def _pre_process_startup_events(
self, startup_task_or_events: list[RecorderTask | Event[Any]]
) -> None:
"""Pre process startup events."""
# Prime all the state_attributes and event_data caches
# before we start processing events
state_change_events: list[Event[EventStateChangedData]] = []
non_state_change_events: list[Event] = []
for task_or_event in startup_task_or_events:
# Event is never subclassed so we can
# use a fast type check
if type(task_or_event) is Event:
event_ = task_or_event
if event_.event_type == EVENT_STATE_CHANGED:
state_change_events.append(event_)
else:
non_state_change_events.append(event_)
assert self.event_session is not None
session = self.event_session
self.event_data_manager.load(non_state_change_events, session)
self.event_type_manager.load(non_state_change_events, session)
self.states_meta_manager.load(state_change_events, session)
self.state_attributes_manager.load(state_change_events, session)
def _guarded_process_one_task_or_event_or_recover(
self, task: RecorderTask | Event
) -> None:
"""Process a task, guarding against exceptions to ensure the loop does not collapse."""
_LOGGER.debug("Processing task: %s", task)
try:
self._process_one_task_or_event_or_recover(task)
except Exception:
_LOGGER.exception("Error while processing event %s", task)
def _process_one_task_or_event_or_recover(self, task: RecorderTask | Event) -> None:
"""Process a task or event, reconnect, or recover a malformed database."""
try:
# Almost everything coming in via the queue
# is an Event so we can process it directly
# and since its never subclassed, we can
# use a fast type check
if type(task) is Event:
self._process_one_event(task)
return
# If its not an event, commit everything
# that is pending before running the task
if TYPE_CHECKING:
assert isinstance(task, RecorderTask)
if task.commit_before:
self._commit_event_session_or_retry()
task.run(self)
except exc.DatabaseError as err:
if self._handle_database_error(err):
return
_LOGGER.exception("Unhandled database error while processing task %s", task)
except SQLAlchemyError:
_LOGGER.exception("SQLAlchemyError error processing task %s", task)
else:
return
# Reset the session if an SQLAlchemyError (including DatabaseError)
# happens to rollback and recover
self._reopen_event_session()
def _setup_recorder(self) -> bool:
"""Create a connection to the database."""
tries = 1
while tries <= self.db_max_retries:
try:
self._setup_connection()
return migration.initialize_database(self.get_session)
except UnsupportedDialect:
break
except Exception:
_LOGGER.exception(
"Error during connection setup: (retrying in %s seconds)",
self.db_retry_wait,
)
tries += 1
time.sleep(self.db_retry_wait)
return False
@callback
def _async_migration_started(self) -> None:
"""Set the migration started event."""
self.async_migration_event.set()
def _migrate_schema_and_setup_run(
self, schema_status: migration.SchemaValidationStatus
) -> bool:
"""Migrate schema to the latest version."""
persistent_notification.create(
self.hass,
(
"System performance will temporarily degrade during the database"
" upgrade. Do not power down or restart the system until the upgrade"
" completes. Integrations that read the database, such as logbook,"
" history, and statistics may return inconsistent results until the "
" upgrade completes. This notification will be automatically dismissed"
" when the upgrade completes."
),
"Database upgrade in progress",
"recorder_database_migration",
)
self.hass.add_job(self._async_migration_started)
try:
assert self.engine is not None
migration.migrate_schema(
self, self.hass, self.engine, self.get_session, schema_status
)
except exc.DatabaseError as err:
if self._handle_database_error(err):
return True
_LOGGER.exception("Database error during schema migration")
return False
except Exception:
_LOGGER.exception("Error during schema migration")
return False
else:
self._setup_run()
return True
finally:
self.migration_in_progress = False
persistent_notification.dismiss(self.hass, "recorder_database_migration")
def _lock_database(self, task: DatabaseLockTask) -> None:
@callback
def _async_set_database_locked(task: DatabaseLockTask) -> None:
task.database_locked.set()
local_start_time = dt_util.now()
hass = self.hass
with write_lock_db_sqlite(self):
# Notify that lock is being held, wait until database can be used again.
hass.add_job(_async_set_database_locked, task)
while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT):
if self._reached_max_backlog_percentage(90):
_LOGGER.warning(
"Database queue backlog reached more than %s (%s events) of maximum queue "
"length while waiting for backup to finish; recorder will now "
"resume writing to database. The backup cannot be trusted and "
"must be restarted",
"90%",
self.backlog,
)
task.queue_overflow = True
hass.add_job(
async_create_backup_failure_issue, self.hass, local_start_time
)
break
_LOGGER.info(
"Database queue backlog reached %d entries during backup",
self.backlog,
)
def _process_one_event(self, event: Event[Any]) -> None:
if not self.enabled:
return
if event.event_type == EVENT_STATE_CHANGED:
self._process_state_changed_event_into_session(event)
else:
self._process_non_state_changed_event_into_session(event)
# Commit if the commit interval is zero
if not self.commit_interval:
self._commit_event_session_or_retry()
def _process_non_state_changed_event_into_session(self, event: Event) -> None:
"""Process any event into the session except state changed."""
session = self.event_session
assert session is not None
dbevent = Events.from_event(event)
# Map the event_type to the EventTypes table
event_type_manager = self.event_type_manager
if pending_event_types := event_type_manager.get_pending(event.event_type):
dbevent.event_type_rel = pending_event_types
elif event_type_id := event_type_manager.get(event.event_type, session, True):
dbevent.event_type_id = event_type_id
else:
event_types = EventTypes(event_type=event.event_type)
event_type_manager.add_pending(event_types)
self._add_to_session(session, event_types)
dbevent.event_type_rel = event_types
if not event.data:
self._add_to_session(session, dbevent)
return
event_data_manager = self.event_data_manager
if not (shared_data_bytes := event_data_manager.serialize_from_event(event)):
return
# Map the event data to the EventData table
shared_data = shared_data_bytes.decode("utf-8")
# Matching attributes found in the pending commit
if pending_event_data := event_data_manager.get_pending(shared_data):
dbevent.event_data_rel = pending_event_data
# Matching attributes id found in the cache
elif (data_id := event_data_manager.get_from_cache(shared_data)) or (
(hash_ := EventData.hash_shared_data_bytes(shared_data_bytes))
and (data_id := event_data_manager.get(shared_data, hash_, session))
):
dbevent.data_id = data_id
else:
# No matching attributes found, save them in the DB
dbevent_data = EventData(shared_data=shared_data, hash=hash_)
event_data_manager.add_pending(dbevent_data)
self._add_to_session(session, dbevent_data)
dbevent.event_data_rel = dbevent_data
self._add_to_session(session, dbevent)
def _process_state_changed_event_into_session(
self, event: Event[EventStateChangedData]
) -> None:
"""Process a state_changed event into the session."""
state_attributes_manager = self.state_attributes_manager
states_meta_manager = self.states_meta_manager
entity_removed = not event.data.get("new_state")
entity_id = event.data["entity_id"]
dbstate = States.from_event(event)
old_state = event.data["old_state"]
assert self.event_session is not None
session = self.event_session
states_manager = self.states_manager
if pending_state := states_manager.pop_pending(entity_id):
dbstate.old_state = pending_state
if old_state:
pending_state.last_reported_ts = old_state.last_reported_timestamp
elif old_state_id := states_manager.pop_committed(entity_id):
dbstate.old_state_id = old_state_id
if old_state:
states_manager.update_pending_last_reported(
old_state_id, old_state.last_reported_timestamp
)
if entity_removed:
dbstate.state = None
else:
states_manager.add_pending(entity_id, dbstate)
if states_meta_manager.active:
dbstate.entity_id = None
if entity_id is None or not (
shared_attrs_bytes := state_attributes_manager.serialize_from_event(event)
):
return
# Map the entity_id to the StatesMeta table
if pending_states_meta := states_meta_manager.get_pending(entity_id):
dbstate.states_meta_rel = pending_states_meta
elif metadata_id := states_meta_manager.get(entity_id, session, True):
dbstate.metadata_id = metadata_id
elif states_meta_manager.active and entity_removed:
# If the entity was removed, we don't need to add it to the
# StatesMeta table or record it in the pending commit
# if it does not have a metadata_id allocated to it as
# it either never existed or was just renamed.
return
else:
states_meta = StatesMeta(entity_id=entity_id)
states_meta_manager.add_pending(states_meta)
self._add_to_session(session, states_meta)
dbstate.states_meta_rel = states_meta
# Map the event data to the StateAttributes table
shared_attrs = shared_attrs_bytes.decode("utf-8")
dbstate.attributes = None
# Matching attributes found in the pending commit
if pending_event_data := state_attributes_manager.get_pending(shared_attrs):
dbstate.state_attributes = pending_event_data
# Matching attributes id found in the cache
elif (
attributes_id := state_attributes_manager.get_from_cache(shared_attrs)
) or (
(hash_ := StateAttributes.hash_shared_attrs_bytes(shared_attrs_bytes))
and (
attributes_id := state_attributes_manager.get(
shared_attrs, hash_, session
)
)
):
dbstate.attributes_id = attributes_id
else:
# No matching attributes found, save them in the DB
dbstate_attributes = StateAttributes(shared_attrs=shared_attrs, hash=hash_)
state_attributes_manager.add_pending(dbstate_attributes)
self._add_to_session(session, dbstate_attributes)
dbstate.state_attributes = dbstate_attributes
self._add_to_session(session, dbstate)
def _handle_database_error(self, err: Exception) -> bool:
"""Handle a database error that may result in moving away the corrupt db."""
if isinstance(err.__cause__, sqlite3.DatabaseError):
_LOGGER.exception(
"Unrecoverable sqlite3 database corruption detected: %s", err
)
self._handle_sqlite_corruption()
return True
return False
def _commit_event_session_or_retry(self) -> None:
"""Commit the event session if there is work to do."""
if not self._event_session_has_pending_writes:
return
tries = 1
while tries <= self.db_max_retries:
try:
self._commit_event_session()
except (exc.InternalError, exc.OperationalError) as err:
_LOGGER.error(
"%s: Error executing query: %s. (retrying in %s seconds)",
INVALIDATED_ERR if err.connection_invalidated else CONNECTIVITY_ERR,
err,
self.db_retry_wait,
)
if tries == self.db_max_retries:
raise
tries += 1
time.sleep(self.db_retry_wait)
else:
return
def _commit_event_session(self) -> None:
assert self.event_session is not None
session = self.event_session
self._commits_without_expire += 1
if (
pending_last_reported
:= self.states_manager.get_pending_last_reported_timestamp()
) and self.schema_version >= LAST_REPORTED_SCHEMA_VERSION:
with session.no_autoflush:
session.execute(
update(States),
[
{
"state_id": state_id,
"last_reported_ts": last_reported_timestamp,
}
for state_id, last_reported_timestamp in pending_last_reported.items()
],
)
session.commit()
self._event_session_has_pending_writes = False
# We just committed the state attributes to the database
# and we now know the attributes_ids. We can save
# many selects for matching attributes by loading them
# into the LRU or committed now.
self.states_manager.post_commit_pending()
self.state_attributes_manager.post_commit_pending()
self.event_data_manager.post_commit_pending()
self.event_type_manager.post_commit_pending()
self.states_meta_manager.post_commit_pending()
# Expire is an expensive operation (frequently more expensive
# than the flush and commit itself) so we only
# do it after EXPIRE_AFTER_COMMITS commits
if self._commits_without_expire >= EXPIRE_AFTER_COMMITS:
self._commits_without_expire = 0
session.expire_all()
def _handle_sqlite_corruption(self) -> None:
"""Handle the sqlite3 database being corrupt."""
try:
self._close_event_session()
finally:
self._close_connection()
move_away_broken_database(dburl_to_path(self.db_url))
self.recorder_runs_manager.reset()
self._setup_recorder()
self._setup_run()
def _close_event_session(self) -> None:
"""Close the event session."""
self.states_manager.reset()
self.state_attributes_manager.reset()
self.event_data_manager.reset()
self.event_type_manager.reset()
self.states_meta_manager.reset()
self.statistics_meta_manager.reset()
if not self.event_session:
return
try:
self.event_session.rollback()
self.event_session.close()
except SQLAlchemyError:
_LOGGER.exception("Error while rolling back and closing the event session")
def _reopen_event_session(self) -> None:
"""Rollback the event session and reopen it after a failure."""
self._close_event_session()
self._open_event_session()
def _open_event_session(self) -> None:
"""Open the event session."""
self.event_session = self.get_session()
self.event_session.expire_on_commit = False
def _post_schema_migration(self, old_version: int, new_version: int) -> None:
"""Run post schema migration tasks."""
migration.post_schema_migration(self, old_version, new_version)
def _migrate_states_context_ids(self) -> bool:
"""Migrate states context ids if needed."""
return migration.migrate_states_context_ids(self)
def _migrate_events_context_ids(self) -> bool:
"""Migrate events context ids if needed."""
return migration.migrate_events_context_ids(self)
def _migrate_event_type_ids(self) -> bool:
"""Migrate event type ids if needed."""
return migration.migrate_event_type_ids(self)
def _migrate_entity_ids(self) -> bool:
"""Migrate entity_ids if needed."""
return migration.migrate_entity_ids(self)
def _post_migrate_entity_ids(self) -> bool:
"""Post migrate entity_ids if needed."""
return migration.post_migrate_entity_ids(self)
def _cleanup_legacy_states_event_ids(self) -> bool:
"""Cleanup legacy event_ids if needed."""
return migration.cleanup_legacy_states_event_ids(self)
def _send_keep_alive(self) -> None:
"""Send a keep alive to keep the db connection open."""
assert self.event_session is not None
_LOGGER.debug("Sending keepalive")
self.event_session.connection().scalar(select(1))
async def async_block_till_done(self) -> None:
"""Async version of block_till_done."""
if self._queue.empty() and not self._event_session_has_pending_writes:
return
event = asyncio.Event()
self.queue_task(SynchronizeTask(event))
await event.wait()
def block_till_done(self) -> None:
"""Block till all events processed.
This is only called in tests.
This only blocks until the queue is empty
which does not mean the recorder is done.
Call tests.common's wait_recording_done
after calling this to ensure the data
is in the database.
"""
self._queue_watch.clear()
self.queue_task(WAIT_TASK)
self._queue_watch.wait()
async def lock_database(self) -> bool:
"""Lock database so it can be backed up safely."""
if self.dialect_name != SupportedDialect.SQLITE:
_LOGGER.debug(
"Not a SQLite database or not connected, locking not necessary"
)
return True
if self._database_lock_task:
_LOGGER.warning("Database already locked")
return False
database_locked = asyncio.Event()
task = DatabaseLockTask(database_locked, threading.Event(), False)
self.queue_task(task)
try:
async with asyncio.timeout(DB_LOCK_TIMEOUT):
await database_locked.wait()
except TimeoutError as err:
task.database_unlock.set()
raise TimeoutError(
f"Could not lock database within {DB_LOCK_TIMEOUT} seconds."
) from err
self._database_lock_task = task
return True
@callback
def unlock_database(self) -> bool:
"""Unlock database.
Returns true if database lock has been held throughout the process.
"""
if self.dialect_name != SupportedDialect.SQLITE:
_LOGGER.debug(
"Not a SQLite database or not connected, unlocking not necessary"
)
return True
if not self._database_lock_task:
_LOGGER.warning("Database currently not locked")
return False
self._database_lock_task.database_unlock.set()
success = not self._database_lock_task.queue_overflow
self._database_lock_task = None
return success
def _setup_recorder_connection(
self, dbapi_connection: DBAPIConnection, connection_record: Any
) -> None:
"""Dbapi specific connection settings."""
assert self.engine is not None
if database_engine := setup_connection_for_dialect(
self,
self.engine.dialect.name,
dbapi_connection,
not self._completed_first_database_setup,
):
self.database_engine = database_engine
self.max_bind_vars = database_engine.max_bind_vars
self._completed_first_database_setup = True
def _setup_connection(self) -> None:
"""Ensure database is ready to fly."""
kwargs: dict[str, Any] = {}
self._completed_first_database_setup = False
if self.db_url == SQLITE_URL_PREFIX or ":memory:" in self.db_url:
kwargs["connect_args"] = {"check_same_thread": False}
kwargs["poolclass"] = MutexPool
MutexPool.pool_lock = threading.RLock()
kwargs["pool_reset_on_return"] = None
elif self.db_url.startswith(SQLITE_URL_PREFIX):
kwargs["poolclass"] = RecorderPool
kwargs["recorder_and_worker_thread_ids"] = (
self.recorder_and_worker_thread_ids
)
elif self.db_url.startswith(
(
MARIADB_URL_PREFIX,
MARIADB_PYMYSQL_URL_PREFIX,
MYSQLDB_URL_PREFIX,
MYSQLDB_PYMYSQL_URL_PREFIX,
)
):
kwargs["connect_args"] = {"charset": "utf8mb4"}
if self.db_url.startswith((MARIADB_URL_PREFIX, MYSQLDB_URL_PREFIX)):
# If they have configured MySQLDB but don't have
# the MySQLDB module installed this will throw
# an ImportError which we suppress here since
# sqlalchemy will give them a better error when
# it tried to import it below.
with contextlib.suppress(ImportError):
kwargs["connect_args"]["conv"] = build_mysqldb_conv()
# Disable extended logging for non SQLite databases
if not self.db_url.startswith(SQLITE_URL_PREFIX):
kwargs["echo"] = False
if self._using_file_sqlite:
validate_or_move_away_sqlite_database(self.db_url)
self.engine = create_engine(self.db_url, **kwargs, future=True)
self._dialect_name = try_parse_enum(SupportedDialect, self.engine.dialect.name)
self.__dict__.pop("dialect_name", None)
sqlalchemy_event.listen(self.engine, "connect", self._setup_recorder_connection)
Base.metadata.create_all(self.engine)
self._get_session = scoped_session(sessionmaker(bind=self.engine, future=True))
_LOGGER.debug("Connected to recorder database")
def _close_connection(self) -> None:
"""Close the connection."""
if self.engine:
self.engine.dispose()
self.engine = None
self._get_session = None
def _setup_run(self) -> None:
"""Log the start of the current run and schedule any needed jobs."""
with session_scope(session=self.get_session()) as session:
end_incomplete_runs(session, self.recorder_runs_manager.recording_start)
self.recorder_runs_manager.start(session)
self._open_event_session()
def _schedule_compile_missing_statistics(self) -> None:
"""Add tasks for missing statistics runs."""
self.queue_task(CompileMissingStatisticsTask())
def _end_session(self) -> None:
"""End the recorder session."""
if self.event_session is None:
return
if self.recorder_runs_manager.active:
# .end will add to the event session
self._event_session_has_pending_writes = True
self.recorder_runs_manager.end(self.event_session)
try:
self._commit_event_session_or_retry()
except Exception:
_LOGGER.exception("Error saving the event session during shutdown")
self.event_session.close()
self.recorder_runs_manager.clear()
def _shutdown(self) -> None:
"""Save end time for current run."""
_LOGGER.debug("Shutting down recorder")
if not self.schema_version or self.schema_version != SCHEMA_VERSION:
# If the schema version is not set, we never had a working
# connection to the database or the schema never reached a
# good state.
#
# In either case, we want to mark startup as failed.
#
self.hass.add_job(self._async_startup_failed)
else:
self.hass.add_job(self._async_stop_listeners)
try:
self._end_session()
finally:
self._stop_executor()
self._close_connection()