core/homeassistant/components/recorder/queries.py

649 lines
20 KiB
Python

"""Queries for the recorder."""
from __future__ import annotations
from collections.abc import Iterable
from datetime import datetime
from sqlalchemy import and_, delete, distinct, func, lambda_stmt, select, update
from sqlalchemy.sql.lambdas import StatementLambdaElement
from sqlalchemy.sql.selectable import Select
from .db_schema import (
EventData,
Events,
EventTypes,
MigrationChanges,
RecorderRuns,
StateAttributes,
States,
StatesMeta,
Statistics,
StatisticsRuns,
StatisticsShortTerm,
)
def select_event_type_ids(event_types: tuple[str, ...]) -> Select:
"""Generate a select for event type ids.
This query is intentionally not a lambda statement as it is used inside
other lambda statements.
"""
return select(EventTypes.event_type_id).where(
EventTypes.event_type.in_(event_types)
)
def get_shared_attributes(hashes: list[int]) -> StatementLambdaElement:
"""Load shared attributes from the database."""
return lambda_stmt(
lambda: select(
StateAttributes.attributes_id, StateAttributes.shared_attrs
).where(StateAttributes.hash.in_(hashes))
)
def get_shared_event_datas(hashes: list[int]) -> StatementLambdaElement:
"""Load shared event data from the database."""
return lambda_stmt(
lambda: select(EventData.data_id, EventData.shared_data).where(
EventData.hash.in_(hashes)
)
)
def find_event_type_ids(event_types: Iterable[str]) -> StatementLambdaElement:
"""Find an event_type id by event_type."""
return lambda_stmt(
lambda: select(EventTypes.event_type_id, EventTypes.event_type).filter(
EventTypes.event_type.in_(event_types)
)
)
def find_all_states_metadata_ids() -> StatementLambdaElement:
"""Find all metadata_ids and entity_ids."""
return lambda_stmt(lambda: select(StatesMeta.metadata_id, StatesMeta.entity_id))
def find_states_metadata_ids(entity_ids: Iterable[str]) -> StatementLambdaElement:
"""Find metadata_ids by entity_ids."""
return lambda_stmt(
lambda: select(StatesMeta.metadata_id, StatesMeta.entity_id).filter(
StatesMeta.entity_id.in_(entity_ids)
)
)
def attributes_ids_exist_in_states_with_fast_in_distinct(
attributes_ids: Iterable[int],
) -> StatementLambdaElement:
"""Find attributes ids that exist in the states table."""
return lambda_stmt(
lambda: select(distinct(States.attributes_id)).filter(
States.attributes_id.in_(attributes_ids)
)
)
def attributes_ids_exist_in_states(
attributes_ids: Iterable[int],
) -> StatementLambdaElement:
"""Find attributes ids that exist in the states table.
PostgreSQL does not support skip/loose index scan
https://wiki.postgresql.org/wiki/Loose_indexscan
To avoid using distinct, we use a subquery to get the latest last_updated_ts
for each attributes_id. This is then used to filter out the attributes_id
that no longer exist in the States table.
This query is fast for older MariaDB, older MySQL, and PostgreSQL.
"""
return lambda_stmt(
lambda: select(StateAttributes.attributes_id)
.select_from(StateAttributes)
.join(
States,
and_(
States.attributes_id == StateAttributes.attributes_id,
States.last_updated_ts
== select(States.last_updated_ts)
.where(States.attributes_id == StateAttributes.attributes_id)
.limit(1)
.scalar_subquery()
.correlate(StateAttributes),
),
)
.where(StateAttributes.attributes_id.in_(attributes_ids))
)
def data_ids_exist_in_events_with_fast_in_distinct(
data_ids: Iterable[int],
) -> StatementLambdaElement:
"""Find data ids that exist in the events table."""
return lambda_stmt(
lambda: select(distinct(Events.data_id)).filter(Events.data_id.in_(data_ids))
)
def data_ids_exist_in_events(
data_ids: Iterable[int],
) -> StatementLambdaElement:
"""Find data ids that exist in the events table.
PostgreSQL does not support skip/loose index scan
https://wiki.postgresql.org/wiki/Loose_indexscan
To avoid using distinct, we use a subquery to get the latest time_fired_ts
for each data_id. This is then used to filter out the data_id
that no longer exist in the Events table.
This query is fast for older MariaDB, older MySQL, and PostgreSQL.
"""
return lambda_stmt(
lambda: select(EventData.data_id)
.select_from(EventData)
.join(
Events,
and_(
Events.data_id == EventData.data_id,
Events.time_fired_ts
== select(Events.time_fired_ts)
.where(Events.data_id == EventData.data_id)
.limit(1)
.scalar_subquery()
.correlate(EventData),
),
)
.where(EventData.data_id.in_(data_ids))
)
def disconnect_states_rows(state_ids: Iterable[int]) -> StatementLambdaElement:
"""Disconnect states rows."""
return lambda_stmt(
lambda: update(States)
.where(States.old_state_id.in_(state_ids))
.values(old_state_id=None)
.execution_options(synchronize_session=False)
)
def delete_states_rows(state_ids: Iterable[int]) -> StatementLambdaElement:
"""Delete states rows."""
return lambda_stmt(
lambda: delete(States)
.where(States.state_id.in_(state_ids))
.execution_options(synchronize_session=False)
)
def delete_event_data_rows(data_ids: Iterable[int]) -> StatementLambdaElement:
"""Delete event_data rows."""
return lambda_stmt(
lambda: delete(EventData)
.where(EventData.data_id.in_(data_ids))
.execution_options(synchronize_session=False)
)
def delete_states_attributes_rows(
attributes_ids: Iterable[int],
) -> StatementLambdaElement:
"""Delete states_attributes rows."""
return lambda_stmt(
lambda: delete(StateAttributes)
.where(StateAttributes.attributes_id.in_(attributes_ids))
.execution_options(synchronize_session=False)
)
def delete_statistics_runs_rows(
statistics_runs: Iterable[int],
) -> StatementLambdaElement:
"""Delete statistics_runs rows."""
return lambda_stmt(
lambda: delete(StatisticsRuns)
.where(StatisticsRuns.run_id.in_(statistics_runs))
.execution_options(synchronize_session=False)
)
def delete_statistics_short_term_rows(
short_term_statistics: Iterable[int],
) -> StatementLambdaElement:
"""Delete statistics_short_term rows."""
return lambda_stmt(
lambda: delete(StatisticsShortTerm)
.where(StatisticsShortTerm.id.in_(short_term_statistics))
.execution_options(synchronize_session=False)
)
def delete_event_rows(
event_ids: Iterable[int],
) -> StatementLambdaElement:
"""Delete event rows."""
return lambda_stmt(
lambda: delete(Events)
.where(Events.event_id.in_(event_ids))
.execution_options(synchronize_session=False)
)
def delete_recorder_runs_rows(
purge_before: datetime, current_run_id: int
) -> StatementLambdaElement:
"""Delete recorder_runs rows."""
return lambda_stmt(
lambda: delete(RecorderRuns)
.filter(RecorderRuns.end.is_not(None))
.filter(RecorderRuns.end < purge_before)
.filter(RecorderRuns.run_id != current_run_id)
.execution_options(synchronize_session=False)
)
def find_events_to_purge(
purge_before: float, max_bind_vars: int
) -> StatementLambdaElement:
"""Find events to purge."""
return lambda_stmt(
lambda: select(Events.event_id, Events.data_id)
.filter(Events.time_fired_ts < purge_before)
.limit(max_bind_vars)
)
def find_states_to_purge(
purge_before: float, max_bind_vars: int
) -> StatementLambdaElement:
"""Find states to purge."""
return lambda_stmt(
lambda: select(States.state_id, States.attributes_id)
.filter(States.last_updated_ts < purge_before)
.limit(max_bind_vars)
)
def find_oldest_state() -> StatementLambdaElement:
"""Find the last_updated_ts of the oldest state."""
return lambda_stmt(
lambda: select(States.last_updated_ts)
.order_by(States.last_updated_ts.asc())
.limit(1)
)
def find_short_term_statistics_to_purge(
purge_before: datetime, max_bind_vars: int
) -> StatementLambdaElement:
"""Find short term statistics to purge."""
purge_before_ts = purge_before.timestamp()
return lambda_stmt(
lambda: select(StatisticsShortTerm.id)
.filter(StatisticsShortTerm.start_ts < purge_before_ts)
.limit(max_bind_vars)
)
def find_statistics_runs_to_purge(
purge_before: datetime, max_bind_vars: int
) -> StatementLambdaElement:
"""Find statistics_runs to purge."""
return lambda_stmt(
lambda: select(StatisticsRuns.run_id)
.filter(StatisticsRuns.start < purge_before)
.limit(max_bind_vars)
)
def find_latest_statistics_runs_run_id() -> StatementLambdaElement:
"""Find the latest statistics_runs run_id."""
return lambda_stmt(lambda: select(func.max(StatisticsRuns.run_id)))
def find_legacy_event_state_and_attributes_and_data_ids_to_purge(
purge_before: float, max_bind_vars: int
) -> StatementLambdaElement:
"""Find the latest row in the legacy format to purge."""
return lambda_stmt(
lambda: select(
Events.event_id, Events.data_id, States.state_id, States.attributes_id
)
.outerjoin(States, Events.event_id == States.event_id)
.filter(Events.time_fired_ts < purge_before)
.limit(max_bind_vars)
)
def find_legacy_detached_states_and_attributes_to_purge(
purge_before: float, max_bind_vars: int
) -> StatementLambdaElement:
"""Find states rows with event_id set but not linked event_id in Events."""
return lambda_stmt(
lambda: select(States.state_id, States.attributes_id)
.outerjoin(Events, States.event_id == Events.event_id)
.filter(States.event_id.isnot(None))
.filter(
(States.last_updated_ts < purge_before) | States.last_updated_ts.is_(None)
)
.filter(Events.event_id.is_(None))
.limit(max_bind_vars)
)
def find_legacy_row() -> StatementLambdaElement:
"""Check if there are still states in the table with an event_id."""
return lambda_stmt(lambda: select(func.max(States.event_id)))
def find_events_context_ids_to_migrate(max_bind_vars: int) -> StatementLambdaElement:
"""Find events context_ids to migrate."""
return lambda_stmt(
lambda: select(
Events.event_id,
Events.time_fired_ts,
Events.context_id,
Events.context_user_id,
Events.context_parent_id,
)
.filter(Events.context_id_bin.is_(None))
.limit(max_bind_vars)
)
def find_event_type_to_migrate(max_bind_vars: int) -> StatementLambdaElement:
"""Find events event_type to migrate."""
return lambda_stmt(
lambda: select(
Events.event_id,
Events.event_type,
)
.filter(Events.event_type_id.is_(None))
.limit(max_bind_vars)
)
def find_entity_ids_to_migrate(max_bind_vars: int) -> StatementLambdaElement:
"""Find entity_id to migrate."""
return lambda_stmt(
lambda: select(
States.state_id,
States.entity_id,
)
.filter(States.metadata_id.is_(None))
.limit(max_bind_vars)
)
def batch_cleanup_entity_ids() -> StatementLambdaElement:
"""Find entity_id to cleanup."""
# Self join because This version of MariaDB doesn't yet support 'LIMIT & IN/ALL/ANY/SOME subquery'
return lambda_stmt(
lambda: update(States)
.where(
States.state_id.in_(
select(States.state_id)
.join(
states_with_entity_ids := select(
States.state_id.label("state_id_with_entity_id")
)
.filter(States.entity_id.is_not(None))
.limit(5000)
.subquery(),
States.state_id == states_with_entity_ids.c.state_id_with_entity_id,
)
.alias("states_with_entity_ids")
.select()
)
)
.values(entity_id=None)
)
def has_used_states_entity_ids() -> StatementLambdaElement:
"""Check if there are used entity_ids in the states table."""
return lambda_stmt(
lambda: select(States.state_id).filter(States.entity_id.isnot(None)).limit(1)
)
def has_used_states_event_ids() -> StatementLambdaElement:
"""Check if there are used event_ids in the states table."""
return lambda_stmt(
lambda: select(States.state_id).filter(States.event_id.isnot(None)).limit(1)
)
def has_events_context_ids_to_migrate() -> StatementLambdaElement:
"""Check if there are events context ids to migrate."""
return lambda_stmt(
lambda: select(Events.event_id).filter(Events.context_id_bin.is_(None)).limit(1)
)
def has_states_context_ids_to_migrate() -> StatementLambdaElement:
"""Check if there are states context ids to migrate."""
return lambda_stmt(
lambda: select(States.state_id).filter(States.context_id_bin.is_(None)).limit(1)
)
def has_event_type_to_migrate() -> StatementLambdaElement:
"""Check if there are event_types to migrate."""
return lambda_stmt(
lambda: select(Events.event_id).filter(Events.event_type_id.is_(None)).limit(1)
)
def has_entity_ids_to_migrate() -> StatementLambdaElement:
"""Check if there are entity_id to migrate."""
return lambda_stmt(
lambda: select(States.state_id).filter(States.metadata_id.is_(None)).limit(1)
)
def find_states_context_ids_to_migrate(max_bind_vars: int) -> StatementLambdaElement:
"""Find events context_ids to migrate."""
return lambda_stmt(
lambda: select(
States.state_id,
States.last_updated_ts,
States.context_id,
States.context_user_id,
States.context_parent_id,
)
.filter(States.context_id_bin.is_(None))
.limit(max_bind_vars)
)
def get_migration_changes() -> StatementLambdaElement:
"""Query the database for previous migration changes."""
return lambda_stmt(
lambda: select(MigrationChanges.migration_id, MigrationChanges.version)
)
def find_event_types_to_purge() -> StatementLambdaElement:
"""Find event_type_ids to purge.
PostgreSQL does not support skip/loose index scan
https://wiki.postgresql.org/wiki/Loose_indexscan
To avoid using distinct, we use a subquery to get the latest time_fired_ts
for each event_type. This is then used to filter out the event_type_ids
that no longer exist in the Events table.
This query is fast for SQLite, MariaDB, MySQL, and PostgreSQL.
"""
return lambda_stmt(
lambda: select(EventTypes.event_type_id, EventTypes.event_type).where(
EventTypes.event_type_id.not_in(
select(EventTypes.event_type_id)
.select_from(EventTypes)
.join(
Events,
and_(
EventTypes.event_type_id == Events.event_type_id,
Events.time_fired_ts
== select(Events.time_fired_ts)
.where(Events.event_type_id == EventTypes.event_type_id)
.limit(1)
.scalar_subquery()
.correlate(EventTypes),
),
)
)
)
)
def find_entity_ids_to_purge() -> StatementLambdaElement:
"""Find metadata_ids for each entity_id to purge.
PostgreSQL does not support skip/loose index scan
https://wiki.postgresql.org/wiki/Loose_indexscan
To avoid using distinct, we use a subquery to get the latest last_updated_ts
for each entity_id. This is then used to filter out the metadata_ids
that no longer exist in the States table.
This query is fast for SQLite, MariaDB, MySQL, and PostgreSQL.
"""
return lambda_stmt(
lambda: select(StatesMeta.metadata_id, StatesMeta.entity_id).where(
StatesMeta.metadata_id.not_in(
select(StatesMeta.metadata_id)
.select_from(StatesMeta)
.join(
States,
and_(
StatesMeta.metadata_id == States.metadata_id,
States.last_updated_ts
== select(States.last_updated_ts)
.where(States.metadata_id == StatesMeta.metadata_id)
.limit(1)
.scalar_subquery()
.correlate(StatesMeta),
),
)
)
)
)
def delete_event_types_rows(event_type_ids: Iterable[int]) -> StatementLambdaElement:
"""Delete EventTypes rows."""
return lambda_stmt(
lambda: delete(EventTypes)
.where(EventTypes.event_type_id.in_(event_type_ids))
.execution_options(synchronize_session=False)
)
def delete_states_meta_rows(metadata_ids: Iterable[int]) -> StatementLambdaElement:
"""Delete StatesMeta rows."""
return lambda_stmt(
lambda: delete(StatesMeta)
.where(StatesMeta.metadata_id.in_(metadata_ids))
.execution_options(synchronize_session=False)
)
def find_unmigrated_short_term_statistics_rows(
max_bind_vars: int,
) -> StatementLambdaElement:
"""Find unmigrated short term statistics rows."""
return lambda_stmt(
lambda: select(
StatisticsShortTerm.id,
StatisticsShortTerm.start,
StatisticsShortTerm.created,
StatisticsShortTerm.last_reset,
)
.filter(StatisticsShortTerm.start_ts.is_(None))
.filter(StatisticsShortTerm.start.isnot(None))
.limit(max_bind_vars)
)
def find_unmigrated_statistics_rows(max_bind_vars: int) -> StatementLambdaElement:
"""Find unmigrated statistics rows."""
return lambda_stmt(
lambda: select(
Statistics.id, Statistics.start, Statistics.created, Statistics.last_reset
)
.filter(Statistics.start_ts.is_(None))
.filter(Statistics.start.isnot(None))
.limit(max_bind_vars)
)
def migrate_single_short_term_statistics_row_to_timestamp(
statistic_id: int,
start_ts: float | None,
created_ts: float | None,
last_reset_ts: float | None,
) -> StatementLambdaElement:
"""Migrate a single short term statistics row to timestamp."""
return lambda_stmt(
lambda: update(StatisticsShortTerm)
.where(StatisticsShortTerm.id == statistic_id)
.values(
start_ts=start_ts,
start=None,
created_ts=created_ts,
created=None,
last_reset_ts=last_reset_ts,
last_reset=None,
)
.execution_options(synchronize_session=False)
)
def migrate_single_statistics_row_to_timestamp(
statistic_id: int,
start_ts: float | None,
created_ts: float | None,
last_reset_ts: float | None,
) -> StatementLambdaElement:
"""Migrate a single statistics row to timestamp."""
return lambda_stmt(
lambda: update(Statistics)
.where(Statistics.id == statistic_id)
.values(
start_ts=start_ts,
start=None,
created_ts=created_ts,
created=None,
last_reset_ts=last_reset_ts,
last_reset=None,
)
.execution_options(synchronize_session=False)
)
def delete_duplicate_short_term_statistics_row(
statistic_id: int,
) -> StatementLambdaElement:
"""Delete a single duplicate short term statistics row."""
return lambda_stmt(
lambda: delete(StatisticsShortTerm)
.where(StatisticsShortTerm.id == statistic_id)
.execution_options(synchronize_session=False)
)
def delete_duplicate_statistics_row(statistic_id: int) -> StatementLambdaElement:
"""Delete a single duplicate statistics row."""
return lambda_stmt(
lambda: delete(Statistics)
.where(Statistics.id == statistic_id)
.execution_options(synchronize_session=False)
)