From 4e6937d20f9d70eb7a32c57a23e040c98a480b36 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 11 Apr 2023 16:38:23 -1000 Subject: [PATCH] Avoid multiple round trips to the database for history API calls (#91193) * delete more code * tweak * tweak * wrappers * restore lost performance * restore lost performance * restore lost performance * compact * reduce * fix refactor * DRY * tweak * delete the start time state injector * move away the legacy code * tweak * adjust * adjust * tweak * ignore impossible * fix a bug where the first start was changed to the start time when there was no previous history recorded before * avoid the empty scan most cases * postgresql * fixes * workaround for mariadb < 10.4 * remove unused * remove unused * adjust * bail early * tweak * tweak * fix more tests * fix recorderrun being init in the future in the test * run history tests on schema 30 as well * Revert "run history tests on schema 30 as well" This reverts commit d798b100ac45c7f8c4cee5d284d94beed5e3d454. * reduce * cleanup * tweak * reduce * prune * adjust * adjust * adjust * reverse later is faster because the index is in forward order and the data size we are reversing is much smaller even if we are in python code * Revert "reverse later is faster because the index is in forward order and the data size we are reversing is much smaller even if we are in python code" This reverts commit bf974e103e651a1334493a9594e08d19e51e392b. * fix test * Revert "Revert "reverse later is faster because the index is in forward order and the data size we are reversing is much smaller even if we are in python code"" This reverts commit 119354499ecf7c1025ec40350e97e73d62d3fd4b. * more coverage * adjust * fix for table order * impossible for it to be missing * remove some more legacy from the all states --- .../components/recorder/history/legacy.py | 17 +- .../components/recorder/history/modern.py | 615 ++++++++++-------- .../components/recorder/models/legacy.py | 127 +++- .../components/recorder/models/state.py | 50 +- tests/components/history/test_init.py | 7 +- .../history/test_init_db_schema_30.py | 55 +- .../components/history/test_websocket_api.py | 29 +- .../history/test_websocket_api_schema_32.py | 161 +++++ tests/components/history_stats/test_sensor.py | 14 +- tests/components/recorder/test_history.py | 126 ++-- tests/components/recorder/test_models.py | 8 +- .../components/recorder/test_models_legacy.py | 98 +++ tests/components/recorder/test_util.py | 11 +- 13 files changed, 891 insertions(+), 427 deletions(-) create mode 100644 tests/components/history/test_websocket_api_schema_32.py create mode 100644 tests/components/recorder/test_models_legacy.py diff --git a/homeassistant/components/recorder/history/legacy.py b/homeassistant/components/recorder/history/legacy.py index e9a435f9624..74b17d9daa7 100644 --- a/homeassistant/components/recorder/history/legacy.py +++ b/homeassistant/components/recorder/history/legacy.py @@ -24,13 +24,16 @@ from ... import recorder from ..db_schema import RecorderRuns, StateAttributes, States from ..filters import Filters from ..models import ( - LazyState, process_datetime_to_timestamp, process_timestamp, process_timestamp_to_utc_isoformat, - row_to_compressed_state, ) -from ..models.legacy import LazyStatePreSchema31, row_to_compressed_state_pre_schema_31 +from ..models.legacy import ( + LegacyLazyState, + LegacyLazyStatePreSchema31, + legacy_row_to_compressed_state, + legacy_row_to_compressed_state_pre_schema_31, +) from ..util import execute_stmt_lambda_element, session_scope from .common import _schema_version from .const import ( @@ -713,17 +716,17 @@ def _sorted_states_to_dict( ] if compressed_state_format: if schema_version >= 31: - state_class = row_to_compressed_state + state_class = legacy_row_to_compressed_state else: - state_class = row_to_compressed_state_pre_schema_31 + state_class = legacy_row_to_compressed_state_pre_schema_31 _process_timestamp = process_datetime_to_timestamp attr_time = COMPRESSED_STATE_LAST_UPDATED attr_state = COMPRESSED_STATE_STATE else: if schema_version >= 31: - state_class = LazyState + state_class = LegacyLazyState else: - state_class = LazyStatePreSchema31 + state_class = LegacyLazyStatePreSchema31 _process_timestamp = process_timestamp_to_utc_isoformat attr_time = LAST_CHANGED_KEY attr_state = STATE_KEY diff --git a/homeassistant/components/recorder/history/modern.py b/homeassistant/components/recorder/history/modern.py index 75a99c6e502..74f613c2cec 100644 --- a/homeassistant/components/recorder/history/modern.py +++ b/homeassistant/components/recorder/history/modern.py @@ -1,29 +1,38 @@ """Provide pre-made queries on top of the recorder component.""" from __future__ import annotations -from collections import defaultdict from collections.abc import Callable, Iterable, Iterator, MutableMapping from datetime import datetime from itertools import groupby from operator import itemgetter from typing import Any, cast -from sqlalchemy import Column, and_, func, lambda_stmt, select +from sqlalchemy import ( + CompoundSelect, + Integer, + Select, + Subquery, + and_, + func, + lambda_stmt, + literal, + select, + union_all, +) +from sqlalchemy.dialects import postgresql from sqlalchemy.engine.row import Row -from sqlalchemy.orm.properties import MappedColumn from sqlalchemy.orm.session import Session -from sqlalchemy.sql.expression import literal -from sqlalchemy.sql.lambdas import StatementLambdaElement from homeassistant.const import COMPRESSED_STATE_LAST_UPDATED, COMPRESSED_STATE_STATE from homeassistant.core import HomeAssistant, State, split_entity_id import homeassistant.util.dt as dt_util from ... import recorder -from ..db_schema import RecorderRuns, StateAttributes, States +from ..db_schema import StateAttributes, States from ..filters import Filters from ..models import ( LazyState, + datetime_to_timestamp_or_none, extract_metadata_ids, process_timestamp, row_to_compressed_state, @@ -36,57 +45,66 @@ from .const import ( STATE_KEY, ) -_BASE_STATES = ( - States.metadata_id, - States.state, - States.last_changed_ts, - States.last_updated_ts, -) -_BASE_STATES_NO_LAST_CHANGED = ( # type: ignore[var-annotated] - States.metadata_id, - States.state, - literal(value=None).label("last_changed_ts"), - States.last_updated_ts, -) -_QUERY_STATE_NO_ATTR = (*_BASE_STATES,) -_QUERY_STATE_NO_ATTR_NO_LAST_CHANGED = (*_BASE_STATES_NO_LAST_CHANGED,) -_QUERY_STATES = ( - *_BASE_STATES, - # Remove States.attributes once all attributes are in StateAttributes.shared_attrs - States.attributes, - StateAttributes.shared_attrs, -) -_QUERY_STATES_NO_LAST_CHANGED = ( - *_BASE_STATES_NO_LAST_CHANGED, - # Remove States.attributes once all attributes are in StateAttributes.shared_attrs - States.attributes, - StateAttributes.shared_attrs, -) _FIELD_MAP = { - cast(MappedColumn, field).name: idx - for idx, field in enumerate(_QUERY_STATE_NO_ATTR) + "metadata_id": 0, + "state": 1, + "last_updated_ts": 2, } -def _lambda_stmt_and_join_attributes( - no_attributes: bool, include_last_changed: bool = True -) -> StatementLambdaElement: - """Return the lambda_stmt and if StateAttributes should be joined. +CASTABLE_DOUBLE_TYPE = ( + # MySQL/MariaDB < 10.4+ does not support casting to DOUBLE so we have to use Integer instead but it doesn't + # matter because we don't use the value as its always set to NULL + # + # sqlalchemy.exc.SAWarning: Datatype DOUBLE does not support CAST on MySQL/MariaDb; the CAST will be skipped. + # + Integer().with_variant(postgresql.DOUBLE_PRECISION(), "postgresql") +) - Because these are lambda_stmt the values inside the lambdas need - to be explicitly written out to avoid caching the wrong values. - """ - # If no_attributes was requested we do the query - # without the attributes fields and do not join the - # state_attributes table - if no_attributes: - if include_last_changed: - return lambda_stmt(lambda: select(*_QUERY_STATE_NO_ATTR)) - return lambda_stmt(lambda: select(*_QUERY_STATE_NO_ATTR_NO_LAST_CHANGED)) +def _stmt_and_join_attributes( + no_attributes: bool, include_last_changed: bool +) -> Select: + """Return the statement and if StateAttributes should be joined.""" + _select = select(States.metadata_id, States.state, States.last_updated_ts) if include_last_changed: - return lambda_stmt(lambda: select(*_QUERY_STATES)) - return lambda_stmt(lambda: select(*_QUERY_STATES_NO_LAST_CHANGED)) + _select = _select.add_columns(States.last_changed_ts) + if not no_attributes: + _select = _select.add_columns(States.attributes, StateAttributes.shared_attrs) + return _select + + +def _stmt_and_join_attributes_for_start_state( + no_attributes: bool, include_last_changed: bool +) -> Select: + """Return the statement and if StateAttributes should be joined.""" + _select = select(States.metadata_id, States.state) + _select = _select.add_columns( + literal(value=None).label("last_updated_ts").cast(CASTABLE_DOUBLE_TYPE) + ) + if include_last_changed: + _select = _select.add_columns( + literal(value=None).label("last_changed_ts").cast(CASTABLE_DOUBLE_TYPE) + ) + if not no_attributes: + _select = _select.add_columns(States.attributes, StateAttributes.shared_attrs) + return _select + + +def _select_from_subquery( + subquery: Subquery | CompoundSelect, no_attributes: bool, include_last_changed: bool +) -> Select: + """Return the statement to select from the union.""" + base_select = select( + subquery.c.metadata_id, + subquery.c.state, + subquery.c.last_updated_ts, + ) + if include_last_changed: + base_select = base_select.add_columns(subquery.c.last_changed_ts) + if no_attributes: + return base_select + return base_select.add_columns(subquery.c.attributes, subquery.c.shared_attrs) def get_significant_states( @@ -119,38 +137,65 @@ def get_significant_states( def _significant_states_stmt( - start_time: datetime, - end_time: datetime | None, + start_time_ts: float, + end_time_ts: float | None, + single_metadata_id: int | None, metadata_ids: list[int], metadata_ids_in_significant_domains: list[int], significant_changes_only: bool, no_attributes: bool, -) -> StatementLambdaElement: + include_start_time_state: bool, + run_start_ts: float | None, +) -> Select | CompoundSelect: """Query the database for significant state changes.""" - stmt = _lambda_stmt_and_join_attributes( - no_attributes, include_last_changed=not significant_changes_only - ) + include_last_changed = not significant_changes_only + stmt = _stmt_and_join_attributes(no_attributes, include_last_changed) if significant_changes_only: # Since we are filtering on entity_id (metadata_id) we can avoid # the join of the states_meta table since we already know which # metadata_ids are in the significant domains. - stmt += lambda q: q.filter( - States.metadata_id.in_(metadata_ids_in_significant_domains) - | (States.last_changed_ts == States.last_updated_ts) - | States.last_changed_ts.is_(None) - ) - stmt += lambda q: q.filter(States.metadata_id.in_(metadata_ids)) - start_time_ts = start_time.timestamp() - stmt += lambda q: q.filter(States.last_updated_ts > start_time_ts) - if end_time: - end_time_ts = end_time.timestamp() - stmt += lambda q: q.filter(States.last_updated_ts < end_time_ts) + if metadata_ids_in_significant_domains: + stmt = stmt.filter( + States.metadata_id.in_(metadata_ids_in_significant_domains) + | (States.last_changed_ts == States.last_updated_ts) + | States.last_changed_ts.is_(None) + ) + else: + stmt = stmt.filter( + (States.last_changed_ts == States.last_updated_ts) + | States.last_changed_ts.is_(None) + ) + stmt = stmt.filter(States.metadata_id.in_(metadata_ids)).filter( + States.last_updated_ts > start_time_ts + ) + if end_time_ts: + stmt = stmt.filter(States.last_updated_ts < end_time_ts) if not no_attributes: - stmt += lambda q: q.outerjoin( + stmt = stmt.outerjoin( StateAttributes, States.attributes_id == StateAttributes.attributes_id ) - stmt += lambda q: q.order_by(States.metadata_id, States.last_updated_ts) - return stmt + stmt = stmt.order_by(States.metadata_id, States.last_updated_ts) + if not include_start_time_state or not run_start_ts: + return stmt + return _select_from_subquery( + union_all( + _select_from_subquery( + _get_start_time_state_stmt( + run_start_ts, + start_time_ts, + single_metadata_id, + metadata_ids, + no_attributes, + include_last_changed, + ).subquery(), + no_attributes, + include_last_changed, + ), + _select_from_subquery(stmt.subquery(), no_attributes, include_last_changed), + ).subquery(), + no_attributes, + include_last_changed, + ) def get_significant_states_with_session( @@ -181,7 +226,6 @@ def get_significant_states_with_session( raise NotImplementedError("Filters are no longer supported") if not entity_ids: raise ValueError("entity_ids must be provided") - metadata_ids: list[int] | None = None entity_id_to_metadata_id: dict[str, int | None] | None = None metadata_ids_in_significant_domains: list[int] = [] instance = recorder.get_instance(hass) @@ -189,8 +233,9 @@ def get_significant_states_with_session( entity_id_to_metadata_id := instance.states_meta_manager.get_many( entity_ids, session, False ) - ) or not (metadata_ids := extract_metadata_ids(entity_id_to_metadata_id)): + ) or not (possible_metadata_ids := extract_metadata_ids(entity_id_to_metadata_id)): return {} + metadata_ids = possible_metadata_ids if significant_changes_only: metadata_ids_in_significant_domains = [ metadata_id @@ -198,25 +243,41 @@ def get_significant_states_with_session( if metadata_id is not None and split_entity_id(entity_id)[0] in SIGNIFICANT_DOMAINS ] - stmt = _significant_states_stmt( - start_time, - end_time, - metadata_ids, - metadata_ids_in_significant_domains, - significant_changes_only, - no_attributes, + run_start_ts: float | None = None + if include_start_time_state and not ( + run_start_ts := _get_run_start_ts_for_utc_point_in_time(hass, start_time) + ): + include_start_time_state = False + start_time_ts = dt_util.utc_to_timestamp(start_time) + end_time_ts = datetime_to_timestamp_or_none(end_time) + single_metadata_id = metadata_ids[0] if len(metadata_ids) == 1 else None + stmt = lambda_stmt( + lambda: _significant_states_stmt( + start_time_ts, + end_time_ts, + single_metadata_id, + metadata_ids, + metadata_ids_in_significant_domains, + significant_changes_only, + no_attributes, + include_start_time_state, + run_start_ts, + ), + track_on=[ + bool(single_metadata_id), + bool(metadata_ids_in_significant_domains), + bool(end_time_ts), + significant_changes_only, + no_attributes, + include_start_time_state, + ], ) - states = execute_stmt_lambda_element(session, stmt, None, end_time) return _sorted_states_to_dict( - hass, - session, - states, - start_time, + execute_stmt_lambda_element(session, stmt, None, end_time), + start_time_ts if include_start_time_state else None, entity_ids, entity_id_to_metadata_id, - include_start_time_state, minimal_response, - no_attributes, compressed_state_format, ) @@ -255,38 +316,60 @@ def get_full_significant_states_with_session( def _state_changed_during_period_stmt( - start_time: datetime, - end_time: datetime | None, - metadata_id: int | None, + start_time_ts: float, + end_time_ts: float | None, + single_metadata_id: int, no_attributes: bool, - descending: bool, limit: int | None, -) -> StatementLambdaElement: - stmt = _lambda_stmt_and_join_attributes(no_attributes, include_last_changed=False) - start_time_ts = start_time.timestamp() - stmt += lambda q: q.filter( - ( - (States.last_changed_ts == States.last_updated_ts) - | States.last_changed_ts.is_(None) + include_start_time_state: bool, + run_start_ts: float | None, +) -> Select | CompoundSelect: + stmt = ( + _stmt_and_join_attributes(no_attributes, False) + .filter( + ( + (States.last_changed_ts == States.last_updated_ts) + | States.last_changed_ts.is_(None) + ) + & (States.last_updated_ts > start_time_ts) ) - & (States.last_updated_ts > start_time_ts) + .filter(States.metadata_id == single_metadata_id) ) - if end_time: - end_time_ts = end_time.timestamp() - stmt += lambda q: q.filter(States.last_updated_ts < end_time_ts) - if metadata_id: - stmt += lambda q: q.filter(States.metadata_id == metadata_id) + if end_time_ts: + stmt = stmt.filter(States.last_updated_ts < end_time_ts) if not no_attributes: - stmt += lambda q: q.outerjoin( + stmt = stmt.outerjoin( StateAttributes, States.attributes_id == StateAttributes.attributes_id ) - if descending: - stmt += lambda q: q.order_by(States.metadata_id, States.last_updated_ts.desc()) - else: - stmt += lambda q: q.order_by(States.metadata_id, States.last_updated_ts) if limit: - stmt += lambda q: q.limit(limit) - return stmt + stmt = stmt.limit(limit) + stmt = stmt.order_by( + States.metadata_id, + States.last_updated_ts, + ) + if not include_start_time_state or not run_start_ts: + return stmt + return _select_from_subquery( + union_all( + _select_from_subquery( + _get_single_entity_start_time_stmt( + start_time_ts, + single_metadata_id, + no_attributes, + False, + ).subquery(), + no_attributes, + False, + ), + _select_from_subquery( + stmt.subquery(), + no_attributes, + False, + ), + ).subquery(), + no_attributes, + False, + ) def state_changes_during_period( @@ -305,42 +388,57 @@ def state_changes_during_period( entity_ids = [entity_id.lower()] with session_scope(hass=hass, read_only=True) as session: - metadata_id: int | None = None instance = recorder.get_instance(hass) if not ( - metadata_id := instance.states_meta_manager.get(entity_id, session, False) + possible_metadata_id := instance.states_meta_manager.get( + entity_id, session, False + ) ): return {} - entity_id_to_metadata_id: dict[str, int | None] = {entity_id: metadata_id} - stmt = _state_changed_during_period_stmt( - start_time, - end_time, - metadata_id, - no_attributes, - descending, - limit, + single_metadata_id = possible_metadata_id + entity_id_to_metadata_id: dict[str, int | None] = { + entity_id: single_metadata_id + } + run_start_ts: float | None = None + if include_start_time_state and not ( + run_start_ts := _get_run_start_ts_for_utc_point_in_time(hass, start_time) + ): + include_start_time_state = False + start_time_ts = dt_util.utc_to_timestamp(start_time) + end_time_ts = datetime_to_timestamp_or_none(end_time) + stmt = lambda_stmt( + lambda: _state_changed_during_period_stmt( + start_time_ts, + end_time_ts, + single_metadata_id, + no_attributes, + limit, + include_start_time_state, + run_start_ts, + ), + track_on=[ + bool(end_time_ts), + no_attributes, + bool(limit), + include_start_time_state, + ], ) - states = execute_stmt_lambda_element(session, stmt, None, end_time) return cast( MutableMapping[str, list[State]], _sorted_states_to_dict( - hass, - session, - states, - start_time, + execute_stmt_lambda_element(session, stmt, None, end_time), + start_time_ts if include_start_time_state else None, entity_ids, entity_id_to_metadata_id, - include_start_time_state=include_start_time_state, + descending=descending, ), ) -def _get_last_state_changes_stmt( - number_of_states: int, metadata_id: int -) -> StatementLambdaElement: - stmt = _lambda_stmt_and_join_attributes(False, include_last_changed=False) - if number_of_states == 1: - stmt += lambda q: q.join( +def _get_last_state_changes_single_stmt(metadata_id: int) -> Select: + return ( + _stmt_and_join_attributes(False, False) + .join( ( lastest_state_for_metadata_id := ( select( @@ -360,8 +458,19 @@ def _get_last_state_changes_stmt( == lastest_state_for_metadata_id.c.max_last_updated, ), ) - else: - stmt += lambda q: q.where( + .outerjoin( + StateAttributes, States.attributes_id == StateAttributes.attributes_id + ) + .order_by(States.state_id.desc()) + ) + + +def _get_last_state_changes_multiple_stmt( + number_of_states: int, metadata_id: int +) -> Select: + return ( + _stmt_and_join_attributes(False, False) + .where( States.state_id == ( select(States.state_id) @@ -371,10 +480,11 @@ def _get_last_state_changes_stmt( .subquery() ).c.state_id ) - stmt += lambda q: q.outerjoin( - StateAttributes, States.attributes_id == StateAttributes.attributes_id - ).order_by(States.state_id.desc()) - return stmt + .outerjoin( + StateAttributes, States.attributes_id == StateAttributes.attributes_id + ) + .order_by(States.state_id.desc()) + ) def get_last_state_changes( @@ -391,39 +501,48 @@ def get_last_state_changes( with session_scope(hass=hass, read_only=True) as session: instance = recorder.get_instance(hass) if not ( - metadata_id := instance.states_meta_manager.get(entity_id, session, False) + possible_metadata_id := instance.states_meta_manager.get( + entity_id, session, False + ) ): return {} + metadata_id = possible_metadata_id entity_id_to_metadata_id: dict[str, int | None] = {entity_id_lower: metadata_id} - stmt = _get_last_state_changes_stmt(number_of_states, metadata_id) + if number_of_states == 1: + stmt = lambda_stmt( + lambda: _get_last_state_changes_single_stmt(metadata_id), + ) + else: + stmt = lambda_stmt( + lambda: _get_last_state_changes_multiple_stmt( + number_of_states, metadata_id + ), + ) states = list(execute_stmt_lambda_element(session, stmt)) return cast( MutableMapping[str, list[State]], _sorted_states_to_dict( - hass, - session, reversed(states), - dt_util.utcnow(), + None, entity_ids, entity_id_to_metadata_id, - include_start_time_state=False, ), ) -def _get_states_for_entities_stmt( - run_start: datetime, - utc_point_in_time: datetime, +def _get_start_time_state_for_entities_stmt( + run_start_ts: float, + epoch_time: float, metadata_ids: list[int], no_attributes: bool, -) -> StatementLambdaElement: + include_last_changed: bool, +) -> Select: """Baked query to get states for specific entities.""" - stmt = _lambda_stmt_and_join_attributes(no_attributes, include_last_changed=True) # We got an include-list of entities, accelerate the query by filtering already # in the inner query. - run_start_ts = process_timestamp(run_start).timestamp() - utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time) - stmt += lambda q: q.join( + stmt = _stmt_and_join_attributes_for_start_state( + no_attributes, include_last_changed + ).join( ( most_recent_states_for_entities_by_date := ( select( @@ -434,7 +553,7 @@ def _get_states_for_entities_stmt( ) .filter( (States.last_updated_ts >= run_start_ts) - & (States.last_updated_ts < utc_point_in_time_ts) + & (States.last_updated_ts < epoch_time) ) .filter(States.metadata_id.in_(metadata_ids)) .group_by(States.metadata_id) @@ -448,89 +567,88 @@ def _get_states_for_entities_stmt( == most_recent_states_for_entities_by_date.c.max_last_updated, ), ) - if not no_attributes: - stmt += lambda q: q.outerjoin( - StateAttributes, (States.attributes_id == StateAttributes.attributes_id) - ) - return stmt + if no_attributes: + return stmt + return stmt.outerjoin( + StateAttributes, (States.attributes_id == StateAttributes.attributes_id) + ) -def _get_rows_with_session( - hass: HomeAssistant, - session: Session, - utc_point_in_time: datetime, - entity_ids: list[str], - entity_id_to_metadata_id: dict[str, int | None] | None = None, - run: RecorderRuns | None = None, - no_attributes: bool = False, -) -> Iterable[Row]: +def _get_run_start_ts_for_utc_point_in_time( + hass: HomeAssistant, utc_point_in_time: datetime +) -> float | None: + """Return the start time of a run.""" + run = recorder.get_instance(hass).recorder_runs_manager.get(utc_point_in_time) + if ( + run is not None + and (run_start := process_timestamp(run.start)) < utc_point_in_time + ): + return run_start.timestamp() + # History did not run before utc_point_in_time but we still + return None + + +def _get_start_time_state_stmt( + run_start_ts: float, + epoch_time: float, + single_metadata_id: int | None, + metadata_ids: list[int], + no_attributes: bool, + include_last_changed: bool, +) -> Select: """Return the states at a specific point in time.""" - if len(entity_ids) == 1: - if not entity_id_to_metadata_id or not ( - metadata_id := entity_id_to_metadata_id.get(entity_ids[0]) - ): - return [] - return execute_stmt_lambda_element( - session, - _get_single_entity_states_stmt( - utc_point_in_time, metadata_id, no_attributes - ), + if single_metadata_id: + # Use an entirely different (and extremely fast) query if we only + # have a single entity id + return _get_single_entity_start_time_stmt( + epoch_time, + single_metadata_id, + no_attributes, + include_last_changed, ) - - if run is None: - run = recorder.get_instance(hass).recorder_runs_manager.get(utc_point_in_time) - - if run is None or process_timestamp(run.start) > utc_point_in_time: - # History did not run before utc_point_in_time - return [] - # We have more than one entity to look at so we need to do a query on states # since the last recorder run started. - if not entity_id_to_metadata_id or not ( - metadata_ids := extract_metadata_ids(entity_id_to_metadata_id) - ): - return [] - stmt = _get_states_for_entities_stmt( - run.start, utc_point_in_time, metadata_ids, no_attributes + return _get_start_time_state_for_entities_stmt( + run_start_ts, + epoch_time, + metadata_ids, + no_attributes, + include_last_changed, ) - return execute_stmt_lambda_element(session, stmt) -def _get_single_entity_states_stmt( - utc_point_in_time: datetime, +def _get_single_entity_start_time_stmt( + epoch_time: float, metadata_id: int, - no_attributes: bool = False, -) -> StatementLambdaElement: + no_attributes: bool, + include_last_changed: bool, +) -> Select: # Use an entirely different (and extremely fast) query if we only # have a single entity id - stmt = _lambda_stmt_and_join_attributes(no_attributes, include_last_changed=True) - utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time) - stmt += ( - lambda q: q.filter( - States.last_updated_ts < utc_point_in_time_ts, + stmt = ( + _stmt_and_join_attributes_for_start_state(no_attributes, include_last_changed) + .filter( + States.last_updated_ts < epoch_time, States.metadata_id == metadata_id, ) .order_by(States.last_updated_ts.desc()) .limit(1) ) - if not no_attributes: - stmt += lambda q: q.outerjoin( - StateAttributes, States.attributes_id == StateAttributes.attributes_id - ) - return stmt + if no_attributes: + return stmt + return stmt.outerjoin( + StateAttributes, States.attributes_id == StateAttributes.attributes_id + ) def _sorted_states_to_dict( - hass: HomeAssistant, - session: Session, states: Iterable[Row], - start_time: datetime, + start_time_ts: float | None, entity_ids: list[str], entity_id_to_metadata_id: dict[str, int | None], - include_start_time_state: bool = True, minimal_response: bool = False, - no_attributes: bool = False, compressed_state_format: bool = False, + descending: bool = False, ) -> MutableMapping[str, list[State | dict[str, Any]]]: """Convert SQL results into JSON friendly data structure. @@ -545,7 +663,8 @@ def _sorted_states_to_dict( """ field_map = _FIELD_MAP state_class: Callable[ - [Row, dict[str, dict[str, Any]], datetime | None], State | dict[str, Any] + [Row, dict[str, dict[str, Any]], float | None, str, str, float | None], + State | dict[str, Any], ] if compressed_state_format: state_class = row_to_compressed_state @@ -556,32 +675,15 @@ def _sorted_states_to_dict( attr_time = LAST_CHANGED_KEY attr_state = STATE_KEY - result: dict[str, list[State | dict[str, Any]]] = defaultdict(list) - metadata_id_to_entity_id: dict[int, str] = {} - metadata_id_idx = field_map["metadata_id"] - # Set all entity IDs to empty lists in result set to maintain the order - for ent_id in entity_ids: - result[ent_id] = [] - + result: dict[str, list[State | dict[str, Any]]] = { + entity_id: [] for entity_id in entity_ids + } + metadata_id_to_entity_id: dict[int, str] = {} metadata_id_to_entity_id = { v: k for k, v in entity_id_to_metadata_id.items() if v is not None } # Get the states at the start time - initial_states: dict[int, Row] = {} - if include_start_time_state: - initial_states = { - row[metadata_id_idx]: row - for row in _get_rows_with_session( - hass, - session, - start_time, - entity_ids, - entity_id_to_metadata_id, - no_attributes=no_attributes, - ) - } - if len(entity_ids) == 1: metadata_id = entity_id_to_metadata_id[entity_ids[0]] assert metadata_id is not None # should not be possible if we got here @@ -589,30 +691,35 @@ def _sorted_states_to_dict( (metadata_id, iter(states)), ) else: - key_func = itemgetter(metadata_id_idx) + key_func = itemgetter(field_map["metadata_id"]) states_iter = groupby(states, key_func) + state_idx = field_map["state"] + last_updated_ts_idx = field_map["last_updated_ts"] + # Append all changes to it for metadata_id, group in states_iter: + entity_id = metadata_id_to_entity_id[metadata_id] attr_cache: dict[str, dict[str, Any]] = {} - prev_state: Column | str | None = None - if not (entity_id := metadata_id_to_entity_id.get(metadata_id)): - continue ent_results = result[entity_id] - if row := initial_states.pop(metadata_id, None): - prev_state = row.state - ent_results.append(state_class(row, attr_cache, start_time, entity_id=entity_id)) # type: ignore[call-arg] - if ( not minimal_response or split_entity_id(entity_id)[0] in NEED_ATTRIBUTE_DOMAINS ): ent_results.extend( - state_class(db_state, attr_cache, None, entity_id=entity_id) # type: ignore[call-arg] + state_class( + db_state, + attr_cache, + start_time_ts, + entity_id, + db_state[state_idx], + db_state[last_updated_ts_idx], + ) for db_state in group ) continue + prev_state: str | None = None # With minimal response we only provide a native # State for the first and last response. All the states # in-between only provide the "state" and the @@ -620,14 +727,18 @@ def _sorted_states_to_dict( if not ent_results: if (first_state := next(group, None)) is None: continue - prev_state = first_state.state + prev_state = first_state[state_idx] ent_results.append( - state_class(first_state, attr_cache, None, entity_id=entity_id) # type: ignore[call-arg] + state_class( + first_state, + attr_cache, + start_time_ts, + entity_id, + prev_state, # type: ignore[arg-type] + first_state[last_updated_ts_idx], + ) ) - state_idx = field_map["state"] - last_updated_ts_idx = field_map["last_updated_ts"] - # # minimal_response only makes sense with last_updated == last_updated # @@ -658,13 +769,9 @@ def _sorted_states_to_dict( if (state := row[state_idx]) != prev_state ) - # If there are no states beyond the initial state, - # the state a was never popped from initial_states - for metadata_id, row in initial_states.items(): - if entity_id := metadata_id_to_entity_id.get(metadata_id): - result[entity_id].append( - state_class(row, {}, start_time, entity_id=entity_id) # type: ignore[call-arg] - ) + if descending: + for ent_results in result.values(): + ent_results.reverse() # Filter out the empty lists if some states had 0 results. return {key: val for key, val in result.items() if val} diff --git a/homeassistant/components/recorder/models/legacy.py b/homeassistant/components/recorder/models/legacy.py index c26e5177720..8a093472afb 100644 --- a/homeassistant/components/recorder/models/legacy.py +++ b/homeassistant/components/recorder/models/legacy.py @@ -13,6 +13,7 @@ from homeassistant.const import ( COMPRESSED_STATE_STATE, ) from homeassistant.core import Context, State +import homeassistant.util.dt as dt_util from .state_attributes import decode_attributes_from_row from .time import ( @@ -24,7 +25,7 @@ from .time import ( # pylint: disable=invalid-name -class LazyStatePreSchema31(State): +class LegacyLazyStatePreSchema31(State): """A lazy version of core State before schema 31.""" __slots__ = [ @@ -138,7 +139,7 @@ class LazyStatePreSchema31(State): } -def row_to_compressed_state_pre_schema_31( +def legacy_row_to_compressed_state_pre_schema_31( row: Row, attr_cache: dict[str, dict[str, Any]], start_time: datetime | None, @@ -162,3 +163,125 @@ def row_to_compressed_state_pre_schema_31( row_changed_changed ) return comp_state + + +class LegacyLazyState(State): + """A lazy version of core State after schema 31.""" + + __slots__ = [ + "_row", + "_attributes", + "_last_changed_ts", + "_last_updated_ts", + "_context", + "attr_cache", + ] + + def __init__( # pylint: disable=super-init-not-called + self, + row: Row, + attr_cache: dict[str, dict[str, Any]], + start_time: datetime | None, + entity_id: str | None = None, + ) -> None: + """Init the lazy state.""" + self._row = row + self.entity_id = entity_id or self._row.entity_id + self.state = self._row.state or "" + self._attributes: dict[str, Any] | None = None + self._last_updated_ts: float | None = self._row.last_updated_ts or ( + dt_util.utc_to_timestamp(start_time) if start_time else None + ) + self._last_changed_ts: float | None = ( + self._row.last_changed_ts or self._last_updated_ts + ) + self._context: Context | None = None + self.attr_cache = attr_cache + + @property # type: ignore[override] + def attributes(self) -> dict[str, Any]: + """State attributes.""" + if self._attributes is None: + self._attributes = decode_attributes_from_row(self._row, self.attr_cache) + return self._attributes + + @attributes.setter + def attributes(self, value: dict[str, Any]) -> None: + """Set attributes.""" + self._attributes = value + + @property + def context(self) -> Context: + """State context.""" + if self._context is None: + self._context = Context(id=None) + return self._context + + @context.setter + def context(self, value: Context) -> None: + """Set context.""" + self._context = value + + @property + def last_changed(self) -> datetime: + """Last changed datetime.""" + assert self._last_changed_ts is not None + return dt_util.utc_from_timestamp(self._last_changed_ts) + + @last_changed.setter + def last_changed(self, value: datetime) -> None: + """Set last changed datetime.""" + self._last_changed_ts = process_timestamp(value).timestamp() + + @property + def last_updated(self) -> datetime: + """Last updated datetime.""" + assert self._last_updated_ts is not None + return dt_util.utc_from_timestamp(self._last_updated_ts) + + @last_updated.setter + def last_updated(self, value: datetime) -> None: + """Set last updated datetime.""" + self._last_updated_ts = process_timestamp(value).timestamp() + + def as_dict(self) -> dict[str, Any]: # type: ignore[override] + """Return a dict representation of the LazyState. + + Async friendly. + To be used for JSON serialization. + """ + last_updated_isoformat = self.last_updated.isoformat() + if self._last_changed_ts == self._last_updated_ts: + last_changed_isoformat = last_updated_isoformat + else: + last_changed_isoformat = self.last_changed.isoformat() + return { + "entity_id": self.entity_id, + "state": self.state, + "attributes": self._attributes or self.attributes, + "last_changed": last_changed_isoformat, + "last_updated": last_updated_isoformat, + } + + +def legacy_row_to_compressed_state( + row: Row, + attr_cache: dict[str, dict[str, Any]], + start_time: datetime | None, + entity_id: str | None = None, +) -> dict[str, Any]: + """Convert a database row to a compressed state schema 31 and later.""" + comp_state = { + COMPRESSED_STATE_STATE: row.state, + COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_row(row, attr_cache), + } + if start_time: + comp_state[COMPRESSED_STATE_LAST_UPDATED] = dt_util.utc_to_timestamp(start_time) + else: + row_last_updated_ts: float = row.last_updated_ts + comp_state[COMPRESSED_STATE_LAST_UPDATED] = row_last_updated_ts + if ( + row_last_changed_ts := row.last_changed_ts + ) and row_last_updated_ts != row_last_changed_ts: + comp_state[COMPRESSED_STATE_LAST_CHANGED] = row_last_changed_ts + return comp_state diff --git a/homeassistant/components/recorder/models/state.py b/homeassistant/components/recorder/models/state.py index 5594f5f6d43..9d0d24c43fe 100644 --- a/homeassistant/components/recorder/models/state.py +++ b/homeassistant/components/recorder/models/state.py @@ -51,20 +51,18 @@ class LazyState(State): self, row: Row, attr_cache: dict[str, dict[str, Any]], - start_time: datetime | None, - entity_id: str | None = None, + start_time_ts: float | None, + entity_id: str, + state: str, + last_updated_ts: float | None, ) -> None: """Init the lazy state.""" self._row = row - self.entity_id = entity_id or self._row.entity_id - self.state = self._row.state or "" + self.entity_id = entity_id + self.state = state or "" self._attributes: dict[str, Any] | None = None - self._last_updated_ts: float | None = self._row.last_updated_ts or ( - dt_util.utc_to_timestamp(start_time) if start_time else None - ) - self._last_changed_ts: float | None = ( - self._row.last_changed_ts or self._last_updated_ts - ) + self._last_updated_ts: float | None = last_updated_ts or start_time_ts + self._last_changed_ts: float | None = None self._context: Context | None = None self.attr_cache = attr_cache @@ -95,7 +93,10 @@ class LazyState(State): @property def last_changed(self) -> datetime: """Last changed datetime.""" - assert self._last_changed_ts is not None + if self._last_changed_ts is None: + self._last_changed_ts = ( + getattr(self._row, "last_changed_ts", None) or self._last_updated_ts + ) return dt_util.utc_from_timestamp(self._last_changed_ts) @last_changed.setter @@ -138,21 +139,22 @@ class LazyState(State): def row_to_compressed_state( row: Row, attr_cache: dict[str, dict[str, Any]], - start_time: datetime | None, - entity_id: str | None = None, + start_time_ts: float | None, + entity_id: str, + state: str, + last_updated_ts: float | None, ) -> dict[str, Any]: """Convert a database row to a compressed state schema 31 and later.""" - comp_state = { - COMPRESSED_STATE_STATE: row.state, + comp_state: dict[str, Any] = { + COMPRESSED_STATE_STATE: state, COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_row(row, attr_cache), } - if start_time: - comp_state[COMPRESSED_STATE_LAST_UPDATED] = dt_util.utc_to_timestamp(start_time) - else: - row_last_updated_ts: float = row.last_updated_ts - comp_state[COMPRESSED_STATE_LAST_UPDATED] = row_last_updated_ts - if ( - row_changed_changed_ts := row.last_changed_ts - ) and row_last_updated_ts != row_changed_changed_ts: - comp_state[COMPRESSED_STATE_LAST_CHANGED] = row_changed_changed_ts + row_last_updated_ts: float = last_updated_ts or start_time_ts # type: ignore[assignment] + comp_state[COMPRESSED_STATE_LAST_UPDATED] = row_last_updated_ts + if ( + (row_last_changed_ts := getattr(row, "last_changed_ts", None)) + and row_last_changed_ts + and row_last_updated_ts != row_last_changed_ts + ): + comp_state[COMPRESSED_STATE_LAST_CHANGED] = row_last_changed_ts return comp_state diff --git a/tests/components/history/test_init.py b/tests/components/history/test_init.py index a499c5bb0be..fa2332e71d7 100644 --- a/tests/components/history/test_init.py +++ b/tests/components/history/test_init.py @@ -130,13 +130,16 @@ def test_get_significant_states_with_initial(hass_history) -> None: """ hass = hass_history zero, four, states = record_states(hass) - one = zero + timedelta(seconds=1) one_and_half = zero + timedelta(seconds=1.5) for entity_id in states: if entity_id == "media_player.test": states[entity_id] = states[entity_id][1:] for state in states[entity_id]: - if state.last_changed == one: + # If the state is recorded before the start time + # start it will have its last_updated and last_changed + # set to the start time. + if state.last_updated < one_and_half: + state.last_updated = one_and_half state.last_changed = one_and_half hist = get_significant_states( diff --git a/tests/components/history/test_init_db_schema_30.py b/tests/components/history/test_init_db_schema_30.py index 893bcf94620..92f3d61392c 100644 --- a/tests/components/history/test_init_db_schema_30.py +++ b/tests/components/history/test_init_db_schema_30.py @@ -4,17 +4,13 @@ from __future__ import annotations # pylint: disable=invalid-name from datetime import timedelta from http import HTTPStatus -import importlib import json -import sys from unittest.mock import patch, sentinel import pytest -from sqlalchemy import create_engine -from sqlalchemy.orm import Session from homeassistant.components import recorder -from homeassistant.components.recorder import Recorder, core, statistics +from homeassistant.components.recorder import Recorder from homeassistant.components.recorder.history import get_significant_states from homeassistant.components.recorder.models import process_timestamp from homeassistant.core import HomeAssistant @@ -29,59 +25,16 @@ from tests.components.recorder.common import ( assert_states_equal_without_context, async_recorder_block_till_done, async_wait_recording_done, + old_db_schema, wait_recording_done, ) from tests.typing import ClientSessionGenerator, WebSocketGenerator -CREATE_ENGINE_TARGET = "homeassistant.components.recorder.core.create_engine" -SCHEMA_MODULE = "tests.components.recorder.db_schema_30" - - -def _create_engine_test(*args, **kwargs): - """Test version of create_engine that initializes with old schema. - - This simulates an existing db with the old schema. - """ - importlib.import_module(SCHEMA_MODULE) - old_db_schema = sys.modules[SCHEMA_MODULE] - engine = create_engine(*args, **kwargs) - old_db_schema.Base.metadata.create_all(engine) - with Session(engine) as session: - session.add( - recorder.db_schema.StatisticsRuns(start=statistics.get_start_time()) - ) - session.add( - recorder.db_schema.SchemaChanges( - schema_version=old_db_schema.SCHEMA_VERSION - ) - ) - session.commit() - return engine - @pytest.fixture(autouse=True) def db_schema_30(): - """Fixture to initialize the db with the old schema.""" - importlib.import_module(SCHEMA_MODULE) - old_db_schema = sys.modules[SCHEMA_MODULE] - - with patch.object(recorder, "db_schema", old_db_schema), patch.object( - recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION - ), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object( - core, "EventTypes", old_db_schema.EventTypes - ), patch.object( - core, "EventData", old_db_schema.EventData - ), patch.object( - core, "States", old_db_schema.States - ), patch.object( - core, "Events", old_db_schema.Events - ), patch.object( - core, "StateAttributes", old_db_schema.StateAttributes - ), patch.object( - core, "EntityIDMigrationTask", core.RecorderTask - ), patch( - CREATE_ENGINE_TARGET, new=_create_engine_test - ): + """Fixture to initialize the db with the old schema 30.""" + with old_db_schema("30"): yield diff --git a/tests/components/history/test_websocket_api.py b/tests/components/history/test_websocket_api.py index 8e353def1d1..fe202ef46bb 100644 --- a/tests/components/history/test_websocket_api.py +++ b/tests/components/history/test_websocket_api.py @@ -10,12 +10,7 @@ import pytest from homeassistant.components import history from homeassistant.components.history import websocket_api from homeassistant.components.recorder import Recorder -from homeassistant.const import ( - CONF_DOMAINS, - CONF_ENTITIES, - CONF_INCLUDE, - EVENT_HOMEASSISTANT_FINAL_WRITE, -) +from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE from homeassistant.core import HomeAssistant from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util @@ -1359,19 +1354,10 @@ async def test_history_stream_before_history_starts( include_start_time_state, ) -> None: """Test history stream before we have history.""" - sort_order = ["sensor.two", "sensor.four", "sensor.one"] await async_setup_component( hass, "history", - { - history.DOMAIN: { - history.CONF_ORDER: True, - CONF_INCLUDE: { - CONF_ENTITIES: sort_order, - CONF_DOMAINS: ["sensor"], - }, - } - }, + {}, ) await async_setup_component(hass, "sensor", {}) await async_recorder_block_till_done(hass) @@ -1416,19 +1402,10 @@ async def test_history_stream_for_entity_with_no_possible_changes( recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator ) -> None: """Test history stream for future with no possible changes where end time is less than or equal to now.""" - sort_order = ["sensor.two", "sensor.four", "sensor.one"] await async_setup_component( hass, "history", - { - history.DOMAIN: { - history.CONF_ORDER: True, - CONF_INCLUDE: { - CONF_ENTITIES: sort_order, - CONF_DOMAINS: ["sensor"], - }, - } - }, + {}, ) await async_setup_component(hass, "sensor", {}) await async_recorder_block_till_done(hass) diff --git a/tests/components/history/test_websocket_api_schema_32.py b/tests/components/history/test_websocket_api_schema_32.py new file mode 100644 index 00000000000..aebf5aa7ac2 --- /dev/null +++ b/tests/components/history/test_websocket_api_schema_32.py @@ -0,0 +1,161 @@ +"""The tests the History component websocket_api.""" +# pylint: disable=protected-access,invalid-name + +import pytest + +from homeassistant.components import recorder +from homeassistant.components.recorder import Recorder +from homeassistant.core import HomeAssistant +from homeassistant.setup import async_setup_component +import homeassistant.util.dt as dt_util + +from tests.components.recorder.common import ( + async_recorder_block_till_done, + async_wait_recording_done, + old_db_schema, +) +from tests.typing import WebSocketGenerator + + +@pytest.fixture(autouse=True) +def db_schema_32(): + """Fixture to initialize the db with the old schema 32.""" + with old_db_schema("32"): + yield + + +async def test_history_during_period( + recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator +) -> None: + """Test history_during_period.""" + now = dt_util.utcnow() + + await async_setup_component(hass, "history", {}) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + recorder.get_instance(hass).states_meta_manager.active = False + assert recorder.get_instance(hass).schema_version == 32 + + hass.states.async_set("sensor.test", "on", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.test", "off", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.test", "off", attributes={"any": "changed"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.test", "off", attributes={"any": "again"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.test", "on", attributes={"any": "attr"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/history_during_period", + "start_time": now.isoformat(), + "end_time": now.isoformat(), + "entity_ids": ["sensor.test"], + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["result"] == {} + + await client.send_json( + { + "id": 2, + "type": "history/history_during_period", + "start_time": now.isoformat(), + "entity_ids": ["sensor.test"], + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 2 + + sensor_test_history = response["result"]["sensor.test"] + assert len(sensor_test_history) == 3 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {} + assert isinstance(sensor_test_history[0]["lu"], float) + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + assert "a" not in sensor_test_history[1] + assert sensor_test_history[1]["s"] == "off" + assert isinstance(sensor_test_history[1]["lu"], float) + assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) + + assert sensor_test_history[2]["s"] == "on" + assert "a" not in sensor_test_history[2] + + await client.send_json( + { + "id": 3, + "type": "history/history_during_period", + "start_time": now.isoformat(), + "entity_ids": ["sensor.test"], + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": False, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 3 + sensor_test_history = response["result"]["sensor.test"] + + assert len(sensor_test_history) == 5 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {"any": "attr"} + assert isinstance(sensor_test_history[0]["lu"], float) + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + assert sensor_test_history[1]["s"] == "off" + assert isinstance(sensor_test_history[1]["lu"], float) + assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) + assert sensor_test_history[1]["a"] == {"any": "attr"} + + assert sensor_test_history[4]["s"] == "on" + assert sensor_test_history[4]["a"] == {"any": "attr"} + + await client.send_json( + { + "id": 4, + "type": "history/history_during_period", + "start_time": now.isoformat(), + "entity_ids": ["sensor.test"], + "include_start_time_state": True, + "significant_changes_only": True, + "no_attributes": False, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 4 + sensor_test_history = response["result"]["sensor.test"] + + assert len(sensor_test_history) == 3 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {"any": "attr"} + assert isinstance(sensor_test_history[0]["lu"], float) + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + assert sensor_test_history[1]["s"] == "off" + assert isinstance(sensor_test_history[1]["lu"], float) + assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) + assert sensor_test_history[1]["a"] == {"any": "attr"} + + assert sensor_test_history[2]["s"] == "on" + assert sensor_test_history[2]["a"] == {"any": "attr"} diff --git a/tests/components/history_stats/test_sensor.py b/tests/components/history_stats/test_sensor.py index 4d705fefcc4..f98cf08b2c4 100644 --- a/tests/components/history_stats/test_sensor.py +++ b/tests/components/history_stats/test_sensor.py @@ -20,6 +20,8 @@ from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util from tests.common import async_fire_time_changed, get_fixture_path +from tests.components.recorder.common import async_wait_recording_done +from tests.typing import RecorderInstanceGenerator async def test_setup(recorder_mock: Recorder, hass: HomeAssistant) -> None: @@ -1367,13 +1369,20 @@ async def test_measure_cet(recorder_mock: Recorder, hass: HomeAssistant) -> None @pytest.mark.parametrize("time_zone", ["Europe/Berlin", "America/Chicago", "US/Hawaii"]) async def test_end_time_with_microseconds_zeroed( - time_zone, recorder_mock: Recorder, hass: HomeAssistant + time_zone: str, + async_setup_recorder_instance: RecorderInstanceGenerator, + hass: HomeAssistant, ) -> None: """Test the history statistics sensor that has the end time microseconds zeroed out.""" hass.config.set_time_zone(time_zone) start_of_today = dt_util.now().replace( day=9, month=7, year=1986, hour=0, minute=0, second=0, microsecond=0 ) + with freeze_time(start_of_today): + await async_setup_recorder_instance(hass) + await hass.async_block_till_done() + await async_wait_recording_done(hass) + start_time = start_of_today + timedelta(minutes=60) t0 = start_time + timedelta(minutes=20) t1 = t0 + timedelta(minutes=10) @@ -1434,7 +1443,7 @@ async def test_end_time_with_microseconds_zeroed( await hass.async_block_till_done() assert hass.states.get("sensor.heatpump_compressor_today").state == "1.83" hass.states.async_set("binary_sensor.heatpump_compressor_state", "on") - await hass.async_block_till_done() + await async_wait_recording_done(hass) time_600 = start_of_today + timedelta(hours=6) with freeze_time(time_600): async_fire_time_changed(hass, time_600) @@ -1473,6 +1482,7 @@ async def test_end_time_with_microseconds_zeroed( ) with freeze_time(rolled_to_next_day_plus_16_860000): hass.states.async_set("binary_sensor.heatpump_compressor_state", "off") + await async_wait_recording_done(hass) async_fire_time_changed(hass, rolled_to_next_day_plus_16_860000) await hass.async_block_till_done() diff --git a/tests/components/recorder/test_history.py b/tests/components/recorder/test_history.py index ab4de11189a..ec781dbfc9c 100644 --- a/tests/components/recorder/test_history.py +++ b/tests/components/recorder/test_history.py @@ -24,7 +24,7 @@ from homeassistant.components.recorder.db_schema import ( from homeassistant.components.recorder.filters import Filters from homeassistant.components.recorder.history import legacy from homeassistant.components.recorder.models import LazyState, process_timestamp -from homeassistant.components.recorder.models.legacy import LazyStatePreSchema31 +from homeassistant.components.recorder.models.legacy import LegacyLazyStatePreSchema31 from homeassistant.components.recorder.util import session_scope import homeassistant.core as ha from homeassistant.core import HomeAssistant, State @@ -41,7 +41,6 @@ from .common import ( wait_recording_done, ) -from tests.common import mock_state_change_event from tests.typing import RecorderInstanceGenerator @@ -55,14 +54,20 @@ async def _async_get_states( """Get states from the database.""" def _get_states_with_session(): - if get_instance(hass).schema_version < 31: - klass = LazyStatePreSchema31 - else: - klass = LazyState with session_scope(hass=hass, read_only=True) as session: attr_cache = {} + pre_31_schema = get_instance(hass).schema_version < 31 return [ - klass(row, attr_cache, None) + LegacyLazyStatePreSchema31(row, attr_cache, None) + if pre_31_schema + else LazyState( + row, + attr_cache, + None, + row.entity_id, + row.state, + getattr(row, "last_updated_ts", None), + ) for row in legacy._get_rows_with_session( hass, session, @@ -112,44 +117,6 @@ def _add_db_entries( ) -def _setup_get_states(hass): - """Set up for testing get_states.""" - states = [] - now = dt_util.utcnow() - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=now - ): - for i in range(5): - state = ha.State( - f"test.point_in_time_{i % 5}", - f"State {i}", - {"attribute_test": i}, - ) - - mock_state_change_event(hass, state) - - states.append(state) - - wait_recording_done(hass) - - future = now + timedelta(seconds=1) - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=future - ): - for i in range(5): - state = ha.State( - f"test.point_in_time_{i % 5}", - f"State {i}", - {"attribute_test": i}, - ) - - mock_state_change_event(hass, state) - - wait_recording_done(hass) - - return now, future, states - - def test_get_full_significant_states_with_session_entity_no_matches( hass_recorder: Callable[..., HomeAssistant] ) -> None: @@ -297,12 +264,12 @@ def test_state_changes_during_period_descending( wait_recording_done(hass) return hass.states.get(entity_id) - start = dt_util.utcnow() + start = dt_util.utcnow().replace(microsecond=0) point = start + timedelta(seconds=1) - point2 = start + timedelta(seconds=1, microseconds=2) - point3 = start + timedelta(seconds=1, microseconds=3) - point4 = start + timedelta(seconds=1, microseconds=4) - end = point + timedelta(seconds=1) + point2 = start + timedelta(seconds=1, microseconds=100) + point3 = start + timedelta(seconds=1, microseconds=200) + point4 = start + timedelta(seconds=1, microseconds=300) + end = point + timedelta(seconds=1, microseconds=400) with patch( "homeassistant.components.recorder.core.dt_util.utcnow", return_value=start @@ -336,6 +303,7 @@ def test_state_changes_during_period_descending( hist = history.state_changes_during_period( hass, start, end, entity_id, no_attributes=False, descending=False ) + assert_multiple_states_equal_without_context(states, hist[entity_id]) hist = history.state_changes_during_period( @@ -345,6 +313,57 @@ def test_state_changes_during_period_descending( states, list(reversed(list(hist[entity_id]))) ) + start_time = point2 + timedelta(microseconds=10) + hist = history.state_changes_during_period( + hass, + start_time, # Pick a point where we will generate a start time state + end, + entity_id, + no_attributes=False, + descending=True, + include_start_time_state=True, + ) + hist_states = list(hist[entity_id]) + assert hist_states[-1].last_updated == start_time + assert hist_states[-1].last_changed == start_time + assert len(hist_states) == 3 + # Make sure they are in descending order + assert ( + hist_states[0].last_updated + > hist_states[1].last_updated + > hist_states[2].last_updated + ) + assert ( + hist_states[0].last_changed + > hist_states[1].last_changed + > hist_states[2].last_changed + ) + + hist = history.state_changes_during_period( + hass, + start_time, # Pick a point where we will generate a start time state + end, + entity_id, + no_attributes=False, + descending=False, + include_start_time_state=True, + ) + hist_states = list(hist[entity_id]) + assert hist_states[0].last_updated == start_time + assert hist_states[0].last_changed == start_time + assert len(hist_states) == 3 + # Make sure they are in ascending order + assert ( + hist_states[0].last_updated + < hist_states[1].last_updated + < hist_states[2].last_updated + ) + assert ( + hist_states[0].last_changed + < hist_states[1].last_changed + < hist_states[2].last_changed + ) + def test_get_last_state_changes(hass_recorder: Callable[..., HomeAssistant]) -> None: """Test number of state changes.""" @@ -548,13 +567,16 @@ def test_get_significant_states_with_initial( hass = hass_recorder() hass.config.set_time_zone(time_zone) zero, four, states = record_states(hass) - one = zero + timedelta(seconds=1) one_and_half = zero + timedelta(seconds=1.5) for entity_id in states: if entity_id == "media_player.test": states[entity_id] = states[entity_id][1:] for state in states[entity_id]: - if state.last_changed == one: + # If the state is recorded before the start time + # start it will have its last_updated and last_changed + # set to the start time. + if state.last_updated < one_and_half: + state.last_updated = one_and_half state.last_changed = one_and_half hist = history.get_significant_states( diff --git a/tests/components/recorder/test_models.py b/tests/components/recorder/test_models.py index c5033481f23..9dc2d53125f 100644 --- a/tests/components/recorder/test_models.py +++ b/tests/components/recorder/test_models.py @@ -269,7 +269,7 @@ async def test_lazy_state_handles_include_json( entity_id="sensor.invalid", shared_attrs="{INVALID_JSON}", ) - assert LazyState(row, {}, None).attributes == {} + assert LazyState(row, {}, None, row.entity_id, "", 1).attributes == {} assert "Error converting row to state attributes" in caplog.text @@ -282,7 +282,7 @@ async def test_lazy_state_prefers_shared_attrs_over_attrs( shared_attrs='{"shared":true}', attributes='{"shared":false}', ) - assert LazyState(row, {}, None).attributes == {"shared": True} + assert LazyState(row, {}, None, row.entity_id, "", 1).attributes == {"shared": True} async def test_lazy_state_handles_different_last_updated_and_last_changed( @@ -297,7 +297,7 @@ async def test_lazy_state_handles_different_last_updated_and_last_changed( last_updated_ts=now.timestamp(), last_changed_ts=(now - timedelta(seconds=60)).timestamp(), ) - lstate = LazyState(row, {}, None) + lstate = LazyState(row, {}, None, row.entity_id, row.state, row.last_updated_ts) assert lstate.as_dict() == { "attributes": {"shared": True}, "entity_id": "sensor.valid", @@ -328,7 +328,7 @@ async def test_lazy_state_handles_same_last_updated_and_last_changed( last_updated_ts=now.timestamp(), last_changed_ts=now.timestamp(), ) - lstate = LazyState(row, {}, None) + lstate = LazyState(row, {}, None, row.entity_id, row.state, row.last_updated_ts) assert lstate.as_dict() == { "attributes": {"shared": True}, "entity_id": "sensor.valid", diff --git a/tests/components/recorder/test_models_legacy.py b/tests/components/recorder/test_models_legacy.py new file mode 100644 index 00000000000..f830ac53544 --- /dev/null +++ b/tests/components/recorder/test_models_legacy.py @@ -0,0 +1,98 @@ +"""The tests for the Recorder component legacy models.""" +from datetime import datetime, timedelta +from unittest.mock import PropertyMock + +import pytest + +from homeassistant.components.recorder.models.legacy import LegacyLazyState +from homeassistant.util import dt as dt_util + + +async def test_legacy_lazy_state_prefers_shared_attrs_over_attrs( + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that the LazyState prefers shared_attrs over attributes.""" + row = PropertyMock( + entity_id="sensor.invalid", + shared_attrs='{"shared":true}', + attributes='{"shared":false}', + ) + assert LegacyLazyState(row, {}, None).attributes == {"shared": True} + + +async def test_legacy_lazy_state_handles_different_last_updated_and_last_changed( + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that the LazyState handles different last_updated and last_changed.""" + now = datetime(2021, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC) + row = PropertyMock( + entity_id="sensor.valid", + state="off", + shared_attrs='{"shared":true}', + last_updated_ts=now.timestamp(), + last_changed_ts=(now - timedelta(seconds=60)).timestamp(), + ) + lstate = LegacyLazyState(row, {}, None) + assert lstate.as_dict() == { + "attributes": {"shared": True}, + "entity_id": "sensor.valid", + "last_changed": "2021-06-12T03:03:01.000323+00:00", + "last_updated": "2021-06-12T03:04:01.000323+00:00", + "state": "off", + } + assert lstate.last_updated.timestamp() == row.last_updated_ts + assert lstate.last_changed.timestamp() == row.last_changed_ts + assert lstate.as_dict() == { + "attributes": {"shared": True}, + "entity_id": "sensor.valid", + "last_changed": "2021-06-12T03:03:01.000323+00:00", + "last_updated": "2021-06-12T03:04:01.000323+00:00", + "state": "off", + } + + +async def test_legacy_lazy_state_handles_same_last_updated_and_last_changed( + caplog: pytest.LogCaptureFixture, +) -> None: + """Test that the LazyState handles same last_updated and last_changed.""" + now = datetime(2021, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC) + row = PropertyMock( + entity_id="sensor.valid", + state="off", + shared_attrs='{"shared":true}', + last_updated_ts=now.timestamp(), + last_changed_ts=now.timestamp(), + ) + lstate = LegacyLazyState(row, {}, None) + assert lstate.as_dict() == { + "attributes": {"shared": True}, + "entity_id": "sensor.valid", + "last_changed": "2021-06-12T03:04:01.000323+00:00", + "last_updated": "2021-06-12T03:04:01.000323+00:00", + "state": "off", + } + assert lstate.last_updated.timestamp() == row.last_updated_ts + assert lstate.last_changed.timestamp() == row.last_changed_ts + assert lstate.as_dict() == { + "attributes": {"shared": True}, + "entity_id": "sensor.valid", + "last_changed": "2021-06-12T03:04:01.000323+00:00", + "last_updated": "2021-06-12T03:04:01.000323+00:00", + "state": "off", + } + lstate.last_updated = datetime(2020, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC) + assert lstate.as_dict() == { + "attributes": {"shared": True}, + "entity_id": "sensor.valid", + "last_changed": "2021-06-12T03:04:01.000323+00:00", + "last_updated": "2020-06-12T03:04:01.000323+00:00", + "state": "off", + } + lstate.last_changed = datetime(2020, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC) + assert lstate.as_dict() == { + "attributes": {"shared": True}, + "entity_id": "sensor.valid", + "last_changed": "2020-06-12T03:04:01.000323+00:00", + "last_updated": "2020-06-12T03:04:01.000323+00:00", + "state": "off", + } diff --git a/tests/components/recorder/test_util.py b/tests/components/recorder/test_util.py index b78b0a0aaee..f71ca773daa 100644 --- a/tests/components/recorder/test_util.py +++ b/tests/components/recorder/test_util.py @@ -7,7 +7,7 @@ import sqlite3 from unittest.mock import MagicMock, Mock, patch import pytest -from sqlalchemy import text +from sqlalchemy import lambda_stmt, text from sqlalchemy.engine.result import ChunkedIteratorResult from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.sql.elements import TextClause @@ -18,7 +18,7 @@ from homeassistant.components.recorder import util from homeassistant.components.recorder.const import DOMAIN, SQLITE_URL_PREFIX from homeassistant.components.recorder.db_schema import RecorderRuns from homeassistant.components.recorder.history.modern import ( - _get_single_entity_states_stmt, + _get_single_entity_start_time_stmt, ) from homeassistant.components.recorder.models import ( UnsupportedDialect, @@ -908,7 +908,12 @@ def test_execute_stmt_lambda_element( with session_scope(hass=hass) as session: # No time window, we always get a list metadata_id = instance.states_meta_manager.get("sensor.on", session, True) - stmt = _get_single_entity_states_stmt(dt_util.utcnow(), metadata_id, False) + start_time_ts = dt_util.utcnow().timestamp() + stmt = lambda_stmt( + lambda: _get_single_entity_start_time_stmt( + start_time_ts, metadata_id, False, False + ) + ) rows = util.execute_stmt_lambda_element(session, stmt) assert isinstance(rows, list) assert rows[0].state == new_state.state