Avoid multiple round trips to the database for history API calls (#91193)

* delete more code

* tweak

* tweak

* wrappers

* restore lost performance

* restore lost performance

* restore lost performance

* compact

* reduce

* fix refactor

* DRY

* tweak

* delete the start time state injector

* move away the legacy code

* tweak

* adjust

* adjust

* tweak

* ignore impossible

* fix a bug where the first start was changed to the start time when there was no previous history recorded before

* avoid the empty scan most cases

* postgresql

* fixes

* workaround for mariadb < 10.4

* remove unused

* remove unused

* adjust

* bail early

* tweak

* tweak

* fix more tests

* fix recorderrun being init in the future in the test

* run history tests on schema 30 as well

* Revert "run history tests on schema 30 as well"

This reverts commit d798b100ac.

* reduce

* cleanup

* tweak

* reduce

* prune

* adjust

* adjust

* adjust

* reverse later is faster because the index is in forward order and the data size we are reversing is much smaller even if we are in python code

* Revert "reverse later is faster because the index is in forward order and the data size we are reversing is much smaller even if we are in python code"

This reverts commit bf974e103e.

* fix test

* Revert "Revert "reverse later is faster because the index is in forward order and the data size we are reversing is much smaller even if we are in python code""

This reverts commit 119354499e.

* more coverage

* adjust

* fix for table order

* impossible for it to be missing

* remove some more legacy from the all states
pull/88626/head^2
J. Nick Koston 2023-04-11 16:38:23 -10:00 committed by GitHub
parent 7f62ed15fa
commit 4e6937d20f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 891 additions and 427 deletions

View File

@ -24,13 +24,16 @@ from ... import recorder
from ..db_schema import RecorderRuns, StateAttributes, States
from ..filters import Filters
from ..models import (
LazyState,
process_datetime_to_timestamp,
process_timestamp,
process_timestamp_to_utc_isoformat,
row_to_compressed_state,
)
from ..models.legacy import LazyStatePreSchema31, row_to_compressed_state_pre_schema_31
from ..models.legacy import (
LegacyLazyState,
LegacyLazyStatePreSchema31,
legacy_row_to_compressed_state,
legacy_row_to_compressed_state_pre_schema_31,
)
from ..util import execute_stmt_lambda_element, session_scope
from .common import _schema_version
from .const import (
@ -713,17 +716,17 @@ def _sorted_states_to_dict(
]
if compressed_state_format:
if schema_version >= 31:
state_class = row_to_compressed_state
state_class = legacy_row_to_compressed_state
else:
state_class = row_to_compressed_state_pre_schema_31
state_class = legacy_row_to_compressed_state_pre_schema_31
_process_timestamp = process_datetime_to_timestamp
attr_time = COMPRESSED_STATE_LAST_UPDATED
attr_state = COMPRESSED_STATE_STATE
else:
if schema_version >= 31:
state_class = LazyState
state_class = LegacyLazyState
else:
state_class = LazyStatePreSchema31
state_class = LegacyLazyStatePreSchema31
_process_timestamp = process_timestamp_to_utc_isoformat
attr_time = LAST_CHANGED_KEY
attr_state = STATE_KEY

View File

@ -1,29 +1,38 @@
"""Provide pre-made queries on top of the recorder component."""
from __future__ import annotations
from collections import defaultdict
from collections.abc import Callable, Iterable, Iterator, MutableMapping
from datetime import datetime
from itertools import groupby
from operator import itemgetter
from typing import Any, cast
from sqlalchemy import Column, and_, func, lambda_stmt, select
from sqlalchemy import (
CompoundSelect,
Integer,
Select,
Subquery,
and_,
func,
lambda_stmt,
literal,
select,
union_all,
)
from sqlalchemy.dialects import postgresql
from sqlalchemy.engine.row import Row
from sqlalchemy.orm.properties import MappedColumn
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import literal
from sqlalchemy.sql.lambdas import StatementLambdaElement
from homeassistant.const import COMPRESSED_STATE_LAST_UPDATED, COMPRESSED_STATE_STATE
from homeassistant.core import HomeAssistant, State, split_entity_id
import homeassistant.util.dt as dt_util
from ... import recorder
from ..db_schema import RecorderRuns, StateAttributes, States
from ..db_schema import StateAttributes, States
from ..filters import Filters
from ..models import (
LazyState,
datetime_to_timestamp_or_none,
extract_metadata_ids,
process_timestamp,
row_to_compressed_state,
@ -36,57 +45,66 @@ from .const import (
STATE_KEY,
)
_BASE_STATES = (
States.metadata_id,
States.state,
States.last_changed_ts,
States.last_updated_ts,
)
_BASE_STATES_NO_LAST_CHANGED = ( # type: ignore[var-annotated]
States.metadata_id,
States.state,
literal(value=None).label("last_changed_ts"),
States.last_updated_ts,
)
_QUERY_STATE_NO_ATTR = (*_BASE_STATES,)
_QUERY_STATE_NO_ATTR_NO_LAST_CHANGED = (*_BASE_STATES_NO_LAST_CHANGED,)
_QUERY_STATES = (
*_BASE_STATES,
# Remove States.attributes once all attributes are in StateAttributes.shared_attrs
States.attributes,
StateAttributes.shared_attrs,
)
_QUERY_STATES_NO_LAST_CHANGED = (
*_BASE_STATES_NO_LAST_CHANGED,
# Remove States.attributes once all attributes are in StateAttributes.shared_attrs
States.attributes,
StateAttributes.shared_attrs,
)
_FIELD_MAP = {
cast(MappedColumn, field).name: idx
for idx, field in enumerate(_QUERY_STATE_NO_ATTR)
"metadata_id": 0,
"state": 1,
"last_updated_ts": 2,
}
def _lambda_stmt_and_join_attributes(
no_attributes: bool, include_last_changed: bool = True
) -> StatementLambdaElement:
"""Return the lambda_stmt and if StateAttributes should be joined.
CASTABLE_DOUBLE_TYPE = (
# MySQL/MariaDB < 10.4+ does not support casting to DOUBLE so we have to use Integer instead but it doesn't
# matter because we don't use the value as its always set to NULL
#
# sqlalchemy.exc.SAWarning: Datatype DOUBLE does not support CAST on MySQL/MariaDb; the CAST will be skipped.
#
Integer().with_variant(postgresql.DOUBLE_PRECISION(), "postgresql")
)
Because these are lambda_stmt the values inside the lambdas need
to be explicitly written out to avoid caching the wrong values.
"""
# If no_attributes was requested we do the query
# without the attributes fields and do not join the
# state_attributes table
if no_attributes:
if include_last_changed:
return lambda_stmt(lambda: select(*_QUERY_STATE_NO_ATTR))
return lambda_stmt(lambda: select(*_QUERY_STATE_NO_ATTR_NO_LAST_CHANGED))
def _stmt_and_join_attributes(
no_attributes: bool, include_last_changed: bool
) -> Select:
"""Return the statement and if StateAttributes should be joined."""
_select = select(States.metadata_id, States.state, States.last_updated_ts)
if include_last_changed:
return lambda_stmt(lambda: select(*_QUERY_STATES))
return lambda_stmt(lambda: select(*_QUERY_STATES_NO_LAST_CHANGED))
_select = _select.add_columns(States.last_changed_ts)
if not no_attributes:
_select = _select.add_columns(States.attributes, StateAttributes.shared_attrs)
return _select
def _stmt_and_join_attributes_for_start_state(
no_attributes: bool, include_last_changed: bool
) -> Select:
"""Return the statement and if StateAttributes should be joined."""
_select = select(States.metadata_id, States.state)
_select = _select.add_columns(
literal(value=None).label("last_updated_ts").cast(CASTABLE_DOUBLE_TYPE)
)
if include_last_changed:
_select = _select.add_columns(
literal(value=None).label("last_changed_ts").cast(CASTABLE_DOUBLE_TYPE)
)
if not no_attributes:
_select = _select.add_columns(States.attributes, StateAttributes.shared_attrs)
return _select
def _select_from_subquery(
subquery: Subquery | CompoundSelect, no_attributes: bool, include_last_changed: bool
) -> Select:
"""Return the statement to select from the union."""
base_select = select(
subquery.c.metadata_id,
subquery.c.state,
subquery.c.last_updated_ts,
)
if include_last_changed:
base_select = base_select.add_columns(subquery.c.last_changed_ts)
if no_attributes:
return base_select
return base_select.add_columns(subquery.c.attributes, subquery.c.shared_attrs)
def get_significant_states(
@ -119,38 +137,65 @@ def get_significant_states(
def _significant_states_stmt(
start_time: datetime,
end_time: datetime | None,
start_time_ts: float,
end_time_ts: float | None,
single_metadata_id: int | None,
metadata_ids: list[int],
metadata_ids_in_significant_domains: list[int],
significant_changes_only: bool,
no_attributes: bool,
) -> StatementLambdaElement:
include_start_time_state: bool,
run_start_ts: float | None,
) -> Select | CompoundSelect:
"""Query the database for significant state changes."""
stmt = _lambda_stmt_and_join_attributes(
no_attributes, include_last_changed=not significant_changes_only
)
include_last_changed = not significant_changes_only
stmt = _stmt_and_join_attributes(no_attributes, include_last_changed)
if significant_changes_only:
# Since we are filtering on entity_id (metadata_id) we can avoid
# the join of the states_meta table since we already know which
# metadata_ids are in the significant domains.
stmt += lambda q: q.filter(
States.metadata_id.in_(metadata_ids_in_significant_domains)
| (States.last_changed_ts == States.last_updated_ts)
| States.last_changed_ts.is_(None)
)
stmt += lambda q: q.filter(States.metadata_id.in_(metadata_ids))
start_time_ts = start_time.timestamp()
stmt += lambda q: q.filter(States.last_updated_ts > start_time_ts)
if end_time:
end_time_ts = end_time.timestamp()
stmt += lambda q: q.filter(States.last_updated_ts < end_time_ts)
if metadata_ids_in_significant_domains:
stmt = stmt.filter(
States.metadata_id.in_(metadata_ids_in_significant_domains)
| (States.last_changed_ts == States.last_updated_ts)
| States.last_changed_ts.is_(None)
)
else:
stmt = stmt.filter(
(States.last_changed_ts == States.last_updated_ts)
| States.last_changed_ts.is_(None)
)
stmt = stmt.filter(States.metadata_id.in_(metadata_ids)).filter(
States.last_updated_ts > start_time_ts
)
if end_time_ts:
stmt = stmt.filter(States.last_updated_ts < end_time_ts)
if not no_attributes:
stmt += lambda q: q.outerjoin(
stmt = stmt.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
stmt += lambda q: q.order_by(States.metadata_id, States.last_updated_ts)
return stmt
stmt = stmt.order_by(States.metadata_id, States.last_updated_ts)
if not include_start_time_state or not run_start_ts:
return stmt
return _select_from_subquery(
union_all(
_select_from_subquery(
_get_start_time_state_stmt(
run_start_ts,
start_time_ts,
single_metadata_id,
metadata_ids,
no_attributes,
include_last_changed,
).subquery(),
no_attributes,
include_last_changed,
),
_select_from_subquery(stmt.subquery(), no_attributes, include_last_changed),
).subquery(),
no_attributes,
include_last_changed,
)
def get_significant_states_with_session(
@ -181,7 +226,6 @@ def get_significant_states_with_session(
raise NotImplementedError("Filters are no longer supported")
if not entity_ids:
raise ValueError("entity_ids must be provided")
metadata_ids: list[int] | None = None
entity_id_to_metadata_id: dict[str, int | None] | None = None
metadata_ids_in_significant_domains: list[int] = []
instance = recorder.get_instance(hass)
@ -189,8 +233,9 @@ def get_significant_states_with_session(
entity_id_to_metadata_id := instance.states_meta_manager.get_many(
entity_ids, session, False
)
) or not (metadata_ids := extract_metadata_ids(entity_id_to_metadata_id)):
) or not (possible_metadata_ids := extract_metadata_ids(entity_id_to_metadata_id)):
return {}
metadata_ids = possible_metadata_ids
if significant_changes_only:
metadata_ids_in_significant_domains = [
metadata_id
@ -198,25 +243,41 @@ def get_significant_states_with_session(
if metadata_id is not None
and split_entity_id(entity_id)[0] in SIGNIFICANT_DOMAINS
]
stmt = _significant_states_stmt(
start_time,
end_time,
metadata_ids,
metadata_ids_in_significant_domains,
significant_changes_only,
no_attributes,
run_start_ts: float | None = None
if include_start_time_state and not (
run_start_ts := _get_run_start_ts_for_utc_point_in_time(hass, start_time)
):
include_start_time_state = False
start_time_ts = dt_util.utc_to_timestamp(start_time)
end_time_ts = datetime_to_timestamp_or_none(end_time)
single_metadata_id = metadata_ids[0] if len(metadata_ids) == 1 else None
stmt = lambda_stmt(
lambda: _significant_states_stmt(
start_time_ts,
end_time_ts,
single_metadata_id,
metadata_ids,
metadata_ids_in_significant_domains,
significant_changes_only,
no_attributes,
include_start_time_state,
run_start_ts,
),
track_on=[
bool(single_metadata_id),
bool(metadata_ids_in_significant_domains),
bool(end_time_ts),
significant_changes_only,
no_attributes,
include_start_time_state,
],
)
states = execute_stmt_lambda_element(session, stmt, None, end_time)
return _sorted_states_to_dict(
hass,
session,
states,
start_time,
execute_stmt_lambda_element(session, stmt, None, end_time),
start_time_ts if include_start_time_state else None,
entity_ids,
entity_id_to_metadata_id,
include_start_time_state,
minimal_response,
no_attributes,
compressed_state_format,
)
@ -255,38 +316,60 @@ def get_full_significant_states_with_session(
def _state_changed_during_period_stmt(
start_time: datetime,
end_time: datetime | None,
metadata_id: int | None,
start_time_ts: float,
end_time_ts: float | None,
single_metadata_id: int,
no_attributes: bool,
descending: bool,
limit: int | None,
) -> StatementLambdaElement:
stmt = _lambda_stmt_and_join_attributes(no_attributes, include_last_changed=False)
start_time_ts = start_time.timestamp()
stmt += lambda q: q.filter(
(
(States.last_changed_ts == States.last_updated_ts)
| States.last_changed_ts.is_(None)
include_start_time_state: bool,
run_start_ts: float | None,
) -> Select | CompoundSelect:
stmt = (
_stmt_and_join_attributes(no_attributes, False)
.filter(
(
(States.last_changed_ts == States.last_updated_ts)
| States.last_changed_ts.is_(None)
)
& (States.last_updated_ts > start_time_ts)
)
& (States.last_updated_ts > start_time_ts)
.filter(States.metadata_id == single_metadata_id)
)
if end_time:
end_time_ts = end_time.timestamp()
stmt += lambda q: q.filter(States.last_updated_ts < end_time_ts)
if metadata_id:
stmt += lambda q: q.filter(States.metadata_id == metadata_id)
if end_time_ts:
stmt = stmt.filter(States.last_updated_ts < end_time_ts)
if not no_attributes:
stmt += lambda q: q.outerjoin(
stmt = stmt.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
if descending:
stmt += lambda q: q.order_by(States.metadata_id, States.last_updated_ts.desc())
else:
stmt += lambda q: q.order_by(States.metadata_id, States.last_updated_ts)
if limit:
stmt += lambda q: q.limit(limit)
return stmt
stmt = stmt.limit(limit)
stmt = stmt.order_by(
States.metadata_id,
States.last_updated_ts,
)
if not include_start_time_state or not run_start_ts:
return stmt
return _select_from_subquery(
union_all(
_select_from_subquery(
_get_single_entity_start_time_stmt(
start_time_ts,
single_metadata_id,
no_attributes,
False,
).subquery(),
no_attributes,
False,
),
_select_from_subquery(
stmt.subquery(),
no_attributes,
False,
),
).subquery(),
no_attributes,
False,
)
def state_changes_during_period(
@ -305,42 +388,57 @@ def state_changes_during_period(
entity_ids = [entity_id.lower()]
with session_scope(hass=hass, read_only=True) as session:
metadata_id: int | None = None
instance = recorder.get_instance(hass)
if not (
metadata_id := instance.states_meta_manager.get(entity_id, session, False)
possible_metadata_id := instance.states_meta_manager.get(
entity_id, session, False
)
):
return {}
entity_id_to_metadata_id: dict[str, int | None] = {entity_id: metadata_id}
stmt = _state_changed_during_period_stmt(
start_time,
end_time,
metadata_id,
no_attributes,
descending,
limit,
single_metadata_id = possible_metadata_id
entity_id_to_metadata_id: dict[str, int | None] = {
entity_id: single_metadata_id
}
run_start_ts: float | None = None
if include_start_time_state and not (
run_start_ts := _get_run_start_ts_for_utc_point_in_time(hass, start_time)
):
include_start_time_state = False
start_time_ts = dt_util.utc_to_timestamp(start_time)
end_time_ts = datetime_to_timestamp_or_none(end_time)
stmt = lambda_stmt(
lambda: _state_changed_during_period_stmt(
start_time_ts,
end_time_ts,
single_metadata_id,
no_attributes,
limit,
include_start_time_state,
run_start_ts,
),
track_on=[
bool(end_time_ts),
no_attributes,
bool(limit),
include_start_time_state,
],
)
states = execute_stmt_lambda_element(session, stmt, None, end_time)
return cast(
MutableMapping[str, list[State]],
_sorted_states_to_dict(
hass,
session,
states,
start_time,
execute_stmt_lambda_element(session, stmt, None, end_time),
start_time_ts if include_start_time_state else None,
entity_ids,
entity_id_to_metadata_id,
include_start_time_state=include_start_time_state,
descending=descending,
),
)
def _get_last_state_changes_stmt(
number_of_states: int, metadata_id: int
) -> StatementLambdaElement:
stmt = _lambda_stmt_and_join_attributes(False, include_last_changed=False)
if number_of_states == 1:
stmt += lambda q: q.join(
def _get_last_state_changes_single_stmt(metadata_id: int) -> Select:
return (
_stmt_and_join_attributes(False, False)
.join(
(
lastest_state_for_metadata_id := (
select(
@ -360,8 +458,19 @@ def _get_last_state_changes_stmt(
== lastest_state_for_metadata_id.c.max_last_updated,
),
)
else:
stmt += lambda q: q.where(
.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
.order_by(States.state_id.desc())
)
def _get_last_state_changes_multiple_stmt(
number_of_states: int, metadata_id: int
) -> Select:
return (
_stmt_and_join_attributes(False, False)
.where(
States.state_id
== (
select(States.state_id)
@ -371,10 +480,11 @@ def _get_last_state_changes_stmt(
.subquery()
).c.state_id
)
stmt += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
).order_by(States.state_id.desc())
return stmt
.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
.order_by(States.state_id.desc())
)
def get_last_state_changes(
@ -391,39 +501,48 @@ def get_last_state_changes(
with session_scope(hass=hass, read_only=True) as session:
instance = recorder.get_instance(hass)
if not (
metadata_id := instance.states_meta_manager.get(entity_id, session, False)
possible_metadata_id := instance.states_meta_manager.get(
entity_id, session, False
)
):
return {}
metadata_id = possible_metadata_id
entity_id_to_metadata_id: dict[str, int | None] = {entity_id_lower: metadata_id}
stmt = _get_last_state_changes_stmt(number_of_states, metadata_id)
if number_of_states == 1:
stmt = lambda_stmt(
lambda: _get_last_state_changes_single_stmt(metadata_id),
)
else:
stmt = lambda_stmt(
lambda: _get_last_state_changes_multiple_stmt(
number_of_states, metadata_id
),
)
states = list(execute_stmt_lambda_element(session, stmt))
return cast(
MutableMapping[str, list[State]],
_sorted_states_to_dict(
hass,
session,
reversed(states),
dt_util.utcnow(),
None,
entity_ids,
entity_id_to_metadata_id,
include_start_time_state=False,
),
)
def _get_states_for_entities_stmt(
run_start: datetime,
utc_point_in_time: datetime,
def _get_start_time_state_for_entities_stmt(
run_start_ts: float,
epoch_time: float,
metadata_ids: list[int],
no_attributes: bool,
) -> StatementLambdaElement:
include_last_changed: bool,
) -> Select:
"""Baked query to get states for specific entities."""
stmt = _lambda_stmt_and_join_attributes(no_attributes, include_last_changed=True)
# We got an include-list of entities, accelerate the query by filtering already
# in the inner query.
run_start_ts = process_timestamp(run_start).timestamp()
utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
stmt += lambda q: q.join(
stmt = _stmt_and_join_attributes_for_start_state(
no_attributes, include_last_changed
).join(
(
most_recent_states_for_entities_by_date := (
select(
@ -434,7 +553,7 @@ def _get_states_for_entities_stmt(
)
.filter(
(States.last_updated_ts >= run_start_ts)
& (States.last_updated_ts < utc_point_in_time_ts)
& (States.last_updated_ts < epoch_time)
)
.filter(States.metadata_id.in_(metadata_ids))
.group_by(States.metadata_id)
@ -448,89 +567,88 @@ def _get_states_for_entities_stmt(
== most_recent_states_for_entities_by_date.c.max_last_updated,
),
)
if not no_attributes:
stmt += lambda q: q.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
return stmt
if no_attributes:
return stmt
return stmt.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
def _get_rows_with_session(
hass: HomeAssistant,
session: Session,
utc_point_in_time: datetime,
entity_ids: list[str],
entity_id_to_metadata_id: dict[str, int | None] | None = None,
run: RecorderRuns | None = None,
no_attributes: bool = False,
) -> Iterable[Row]:
def _get_run_start_ts_for_utc_point_in_time(
hass: HomeAssistant, utc_point_in_time: datetime
) -> float | None:
"""Return the start time of a run."""
run = recorder.get_instance(hass).recorder_runs_manager.get(utc_point_in_time)
if (
run is not None
and (run_start := process_timestamp(run.start)) < utc_point_in_time
):
return run_start.timestamp()
# History did not run before utc_point_in_time but we still
return None
def _get_start_time_state_stmt(
run_start_ts: float,
epoch_time: float,
single_metadata_id: int | None,
metadata_ids: list[int],
no_attributes: bool,
include_last_changed: bool,
) -> Select:
"""Return the states at a specific point in time."""
if len(entity_ids) == 1:
if not entity_id_to_metadata_id or not (
metadata_id := entity_id_to_metadata_id.get(entity_ids[0])
):
return []
return execute_stmt_lambda_element(
session,
_get_single_entity_states_stmt(
utc_point_in_time, metadata_id, no_attributes
),
if single_metadata_id:
# Use an entirely different (and extremely fast) query if we only
# have a single entity id
return _get_single_entity_start_time_stmt(
epoch_time,
single_metadata_id,
no_attributes,
include_last_changed,
)
if run is None:
run = recorder.get_instance(hass).recorder_runs_manager.get(utc_point_in_time)
if run is None or process_timestamp(run.start) > utc_point_in_time:
# History did not run before utc_point_in_time
return []
# We have more than one entity to look at so we need to do a query on states
# since the last recorder run started.
if not entity_id_to_metadata_id or not (
metadata_ids := extract_metadata_ids(entity_id_to_metadata_id)
):
return []
stmt = _get_states_for_entities_stmt(
run.start, utc_point_in_time, metadata_ids, no_attributes
return _get_start_time_state_for_entities_stmt(
run_start_ts,
epoch_time,
metadata_ids,
no_attributes,
include_last_changed,
)
return execute_stmt_lambda_element(session, stmt)
def _get_single_entity_states_stmt(
utc_point_in_time: datetime,
def _get_single_entity_start_time_stmt(
epoch_time: float,
metadata_id: int,
no_attributes: bool = False,
) -> StatementLambdaElement:
no_attributes: bool,
include_last_changed: bool,
) -> Select:
# Use an entirely different (and extremely fast) query if we only
# have a single entity id
stmt = _lambda_stmt_and_join_attributes(no_attributes, include_last_changed=True)
utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
stmt += (
lambda q: q.filter(
States.last_updated_ts < utc_point_in_time_ts,
stmt = (
_stmt_and_join_attributes_for_start_state(no_attributes, include_last_changed)
.filter(
States.last_updated_ts < epoch_time,
States.metadata_id == metadata_id,
)
.order_by(States.last_updated_ts.desc())
.limit(1)
)
if not no_attributes:
stmt += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
return stmt
if no_attributes:
return stmt
return stmt.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
def _sorted_states_to_dict(
hass: HomeAssistant,
session: Session,
states: Iterable[Row],
start_time: datetime,
start_time_ts: float | None,
entity_ids: list[str],
entity_id_to_metadata_id: dict[str, int | None],
include_start_time_state: bool = True,
minimal_response: bool = False,
no_attributes: bool = False,
compressed_state_format: bool = False,
descending: bool = False,
) -> MutableMapping[str, list[State | dict[str, Any]]]:
"""Convert SQL results into JSON friendly data structure.
@ -545,7 +663,8 @@ def _sorted_states_to_dict(
"""
field_map = _FIELD_MAP
state_class: Callable[
[Row, dict[str, dict[str, Any]], datetime | None], State | dict[str, Any]
[Row, dict[str, dict[str, Any]], float | None, str, str, float | None],
State | dict[str, Any],
]
if compressed_state_format:
state_class = row_to_compressed_state
@ -556,32 +675,15 @@ def _sorted_states_to_dict(
attr_time = LAST_CHANGED_KEY
attr_state = STATE_KEY
result: dict[str, list[State | dict[str, Any]]] = defaultdict(list)
metadata_id_to_entity_id: dict[int, str] = {}
metadata_id_idx = field_map["metadata_id"]
# Set all entity IDs to empty lists in result set to maintain the order
for ent_id in entity_ids:
result[ent_id] = []
result: dict[str, list[State | dict[str, Any]]] = {
entity_id: [] for entity_id in entity_ids
}
metadata_id_to_entity_id: dict[int, str] = {}
metadata_id_to_entity_id = {
v: k for k, v in entity_id_to_metadata_id.items() if v is not None
}
# Get the states at the start time
initial_states: dict[int, Row] = {}
if include_start_time_state:
initial_states = {
row[metadata_id_idx]: row
for row in _get_rows_with_session(
hass,
session,
start_time,
entity_ids,
entity_id_to_metadata_id,
no_attributes=no_attributes,
)
}
if len(entity_ids) == 1:
metadata_id = entity_id_to_metadata_id[entity_ids[0]]
assert metadata_id is not None # should not be possible if we got here
@ -589,30 +691,35 @@ def _sorted_states_to_dict(
(metadata_id, iter(states)),
)
else:
key_func = itemgetter(metadata_id_idx)
key_func = itemgetter(field_map["metadata_id"])
states_iter = groupby(states, key_func)
state_idx = field_map["state"]
last_updated_ts_idx = field_map["last_updated_ts"]
# Append all changes to it
for metadata_id, group in states_iter:
entity_id = metadata_id_to_entity_id[metadata_id]
attr_cache: dict[str, dict[str, Any]] = {}
prev_state: Column | str | None = None
if not (entity_id := metadata_id_to_entity_id.get(metadata_id)):
continue
ent_results = result[entity_id]
if row := initial_states.pop(metadata_id, None):
prev_state = row.state
ent_results.append(state_class(row, attr_cache, start_time, entity_id=entity_id)) # type: ignore[call-arg]
if (
not minimal_response
or split_entity_id(entity_id)[0] in NEED_ATTRIBUTE_DOMAINS
):
ent_results.extend(
state_class(db_state, attr_cache, None, entity_id=entity_id) # type: ignore[call-arg]
state_class(
db_state,
attr_cache,
start_time_ts,
entity_id,
db_state[state_idx],
db_state[last_updated_ts_idx],
)
for db_state in group
)
continue
prev_state: str | None = None
# With minimal response we only provide a native
# State for the first and last response. All the states
# in-between only provide the "state" and the
@ -620,14 +727,18 @@ def _sorted_states_to_dict(
if not ent_results:
if (first_state := next(group, None)) is None:
continue
prev_state = first_state.state
prev_state = first_state[state_idx]
ent_results.append(
state_class(first_state, attr_cache, None, entity_id=entity_id) # type: ignore[call-arg]
state_class(
first_state,
attr_cache,
start_time_ts,
entity_id,
prev_state, # type: ignore[arg-type]
first_state[last_updated_ts_idx],
)
)
state_idx = field_map["state"]
last_updated_ts_idx = field_map["last_updated_ts"]
#
# minimal_response only makes sense with last_updated == last_updated
#
@ -658,13 +769,9 @@ def _sorted_states_to_dict(
if (state := row[state_idx]) != prev_state
)
# If there are no states beyond the initial state,
# the state a was never popped from initial_states
for metadata_id, row in initial_states.items():
if entity_id := metadata_id_to_entity_id.get(metadata_id):
result[entity_id].append(
state_class(row, {}, start_time, entity_id=entity_id) # type: ignore[call-arg]
)
if descending:
for ent_results in result.values():
ent_results.reverse()
# Filter out the empty lists if some states had 0 results.
return {key: val for key, val in result.items() if val}

View File

@ -13,6 +13,7 @@ from homeassistant.const import (
COMPRESSED_STATE_STATE,
)
from homeassistant.core import Context, State
import homeassistant.util.dt as dt_util
from .state_attributes import decode_attributes_from_row
from .time import (
@ -24,7 +25,7 @@ from .time import (
# pylint: disable=invalid-name
class LazyStatePreSchema31(State):
class LegacyLazyStatePreSchema31(State):
"""A lazy version of core State before schema 31."""
__slots__ = [
@ -138,7 +139,7 @@ class LazyStatePreSchema31(State):
}
def row_to_compressed_state_pre_schema_31(
def legacy_row_to_compressed_state_pre_schema_31(
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
@ -162,3 +163,125 @@ def row_to_compressed_state_pre_schema_31(
row_changed_changed
)
return comp_state
class LegacyLazyState(State):
"""A lazy version of core State after schema 31."""
__slots__ = [
"_row",
"_attributes",
"_last_changed_ts",
"_last_updated_ts",
"_context",
"attr_cache",
]
def __init__( # pylint: disable=super-init-not-called
self,
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
entity_id: str | None = None,
) -> None:
"""Init the lazy state."""
self._row = row
self.entity_id = entity_id or self._row.entity_id
self.state = self._row.state or ""
self._attributes: dict[str, Any] | None = None
self._last_updated_ts: float | None = self._row.last_updated_ts or (
dt_util.utc_to_timestamp(start_time) if start_time else None
)
self._last_changed_ts: float | None = (
self._row.last_changed_ts or self._last_updated_ts
)
self._context: Context | None = None
self.attr_cache = attr_cache
@property # type: ignore[override]
def attributes(self) -> dict[str, Any]:
"""State attributes."""
if self._attributes is None:
self._attributes = decode_attributes_from_row(self._row, self.attr_cache)
return self._attributes
@attributes.setter
def attributes(self, value: dict[str, Any]) -> None:
"""Set attributes."""
self._attributes = value
@property
def context(self) -> Context:
"""State context."""
if self._context is None:
self._context = Context(id=None)
return self._context
@context.setter
def context(self, value: Context) -> None:
"""Set context."""
self._context = value
@property
def last_changed(self) -> datetime:
"""Last changed datetime."""
assert self._last_changed_ts is not None
return dt_util.utc_from_timestamp(self._last_changed_ts)
@last_changed.setter
def last_changed(self, value: datetime) -> None:
"""Set last changed datetime."""
self._last_changed_ts = process_timestamp(value).timestamp()
@property
def last_updated(self) -> datetime:
"""Last updated datetime."""
assert self._last_updated_ts is not None
return dt_util.utc_from_timestamp(self._last_updated_ts)
@last_updated.setter
def last_updated(self, value: datetime) -> None:
"""Set last updated datetime."""
self._last_updated_ts = process_timestamp(value).timestamp()
def as_dict(self) -> dict[str, Any]: # type: ignore[override]
"""Return a dict representation of the LazyState.
Async friendly.
To be used for JSON serialization.
"""
last_updated_isoformat = self.last_updated.isoformat()
if self._last_changed_ts == self._last_updated_ts:
last_changed_isoformat = last_updated_isoformat
else:
last_changed_isoformat = self.last_changed.isoformat()
return {
"entity_id": self.entity_id,
"state": self.state,
"attributes": self._attributes or self.attributes,
"last_changed": last_changed_isoformat,
"last_updated": last_updated_isoformat,
}
def legacy_row_to_compressed_state(
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
entity_id: str | None = None,
) -> dict[str, Any]:
"""Convert a database row to a compressed state schema 31 and later."""
comp_state = {
COMPRESSED_STATE_STATE: row.state,
COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_row(row, attr_cache),
}
if start_time:
comp_state[COMPRESSED_STATE_LAST_UPDATED] = dt_util.utc_to_timestamp(start_time)
else:
row_last_updated_ts: float = row.last_updated_ts
comp_state[COMPRESSED_STATE_LAST_UPDATED] = row_last_updated_ts
if (
row_last_changed_ts := row.last_changed_ts
) and row_last_updated_ts != row_last_changed_ts:
comp_state[COMPRESSED_STATE_LAST_CHANGED] = row_last_changed_ts
return comp_state

View File

@ -51,20 +51,18 @@ class LazyState(State):
self,
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
entity_id: str | None = None,
start_time_ts: float | None,
entity_id: str,
state: str,
last_updated_ts: float | None,
) -> None:
"""Init the lazy state."""
self._row = row
self.entity_id = entity_id or self._row.entity_id
self.state = self._row.state or ""
self.entity_id = entity_id
self.state = state or ""
self._attributes: dict[str, Any] | None = None
self._last_updated_ts: float | None = self._row.last_updated_ts or (
dt_util.utc_to_timestamp(start_time) if start_time else None
)
self._last_changed_ts: float | None = (
self._row.last_changed_ts or self._last_updated_ts
)
self._last_updated_ts: float | None = last_updated_ts or start_time_ts
self._last_changed_ts: float | None = None
self._context: Context | None = None
self.attr_cache = attr_cache
@ -95,7 +93,10 @@ class LazyState(State):
@property
def last_changed(self) -> datetime:
"""Last changed datetime."""
assert self._last_changed_ts is not None
if self._last_changed_ts is None:
self._last_changed_ts = (
getattr(self._row, "last_changed_ts", None) or self._last_updated_ts
)
return dt_util.utc_from_timestamp(self._last_changed_ts)
@last_changed.setter
@ -138,21 +139,22 @@ class LazyState(State):
def row_to_compressed_state(
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None,
entity_id: str | None = None,
start_time_ts: float | None,
entity_id: str,
state: str,
last_updated_ts: float | None,
) -> dict[str, Any]:
"""Convert a database row to a compressed state schema 31 and later."""
comp_state = {
COMPRESSED_STATE_STATE: row.state,
comp_state: dict[str, Any] = {
COMPRESSED_STATE_STATE: state,
COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_row(row, attr_cache),
}
if start_time:
comp_state[COMPRESSED_STATE_LAST_UPDATED] = dt_util.utc_to_timestamp(start_time)
else:
row_last_updated_ts: float = row.last_updated_ts
comp_state[COMPRESSED_STATE_LAST_UPDATED] = row_last_updated_ts
if (
row_changed_changed_ts := row.last_changed_ts
) and row_last_updated_ts != row_changed_changed_ts:
comp_state[COMPRESSED_STATE_LAST_CHANGED] = row_changed_changed_ts
row_last_updated_ts: float = last_updated_ts or start_time_ts # type: ignore[assignment]
comp_state[COMPRESSED_STATE_LAST_UPDATED] = row_last_updated_ts
if (
(row_last_changed_ts := getattr(row, "last_changed_ts", None))
and row_last_changed_ts
and row_last_updated_ts != row_last_changed_ts
):
comp_state[COMPRESSED_STATE_LAST_CHANGED] = row_last_changed_ts
return comp_state

View File

@ -130,13 +130,16 @@ def test_get_significant_states_with_initial(hass_history) -> None:
"""
hass = hass_history
zero, four, states = record_states(hass)
one = zero + timedelta(seconds=1)
one_and_half = zero + timedelta(seconds=1.5)
for entity_id in states:
if entity_id == "media_player.test":
states[entity_id] = states[entity_id][1:]
for state in states[entity_id]:
if state.last_changed == one:
# If the state is recorded before the start time
# start it will have its last_updated and last_changed
# set to the start time.
if state.last_updated < one_and_half:
state.last_updated = one_and_half
state.last_changed = one_and_half
hist = get_significant_states(

View File

@ -4,17 +4,13 @@ from __future__ import annotations
# pylint: disable=invalid-name
from datetime import timedelta
from http import HTTPStatus
import importlib
import json
import sys
from unittest.mock import patch, sentinel
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from homeassistant.components import recorder
from homeassistant.components.recorder import Recorder, core, statistics
from homeassistant.components.recorder import Recorder
from homeassistant.components.recorder.history import get_significant_states
from homeassistant.components.recorder.models import process_timestamp
from homeassistant.core import HomeAssistant
@ -29,59 +25,16 @@ from tests.components.recorder.common import (
assert_states_equal_without_context,
async_recorder_block_till_done,
async_wait_recording_done,
old_db_schema,
wait_recording_done,
)
from tests.typing import ClientSessionGenerator, WebSocketGenerator
CREATE_ENGINE_TARGET = "homeassistant.components.recorder.core.create_engine"
SCHEMA_MODULE = "tests.components.recorder.db_schema_30"
def _create_engine_test(*args, **kwargs):
"""Test version of create_engine that initializes with old schema.
This simulates an existing db with the old schema.
"""
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
engine = create_engine(*args, **kwargs)
old_db_schema.Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(
recorder.db_schema.StatisticsRuns(start=statistics.get_start_time())
)
session.add(
recorder.db_schema.SchemaChanges(
schema_version=old_db_schema.SCHEMA_VERSION
)
)
session.commit()
return engine
@pytest.fixture(autouse=True)
def db_schema_30():
"""Fixture to initialize the db with the old schema."""
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(
core, "EventTypes", old_db_schema.EventTypes
), patch.object(
core, "EventData", old_db_schema.EventData
), patch.object(
core, "States", old_db_schema.States
), patch.object(
core, "Events", old_db_schema.Events
), patch.object(
core, "StateAttributes", old_db_schema.StateAttributes
), patch.object(
core, "EntityIDMigrationTask", core.RecorderTask
), patch(
CREATE_ENGINE_TARGET, new=_create_engine_test
):
"""Fixture to initialize the db with the old schema 30."""
with old_db_schema("30"):
yield

View File

@ -10,12 +10,7 @@ import pytest
from homeassistant.components import history
from homeassistant.components.history import websocket_api
from homeassistant.components.recorder import Recorder
from homeassistant.const import (
CONF_DOMAINS,
CONF_ENTITIES,
CONF_INCLUDE,
EVENT_HOMEASSISTANT_FINAL_WRITE,
)
from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
@ -1359,19 +1354,10 @@ async def test_history_stream_before_history_starts(
include_start_time_state,
) -> None:
"""Test history stream before we have history."""
sort_order = ["sensor.two", "sensor.four", "sensor.one"]
await async_setup_component(
hass,
"history",
{
history.DOMAIN: {
history.CONF_ORDER: True,
CONF_INCLUDE: {
CONF_ENTITIES: sort_order,
CONF_DOMAINS: ["sensor"],
},
}
},
{},
)
await async_setup_component(hass, "sensor", {})
await async_recorder_block_till_done(hass)
@ -1416,19 +1402,10 @@ async def test_history_stream_for_entity_with_no_possible_changes(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test history stream for future with no possible changes where end time is less than or equal to now."""
sort_order = ["sensor.two", "sensor.four", "sensor.one"]
await async_setup_component(
hass,
"history",
{
history.DOMAIN: {
history.CONF_ORDER: True,
CONF_INCLUDE: {
CONF_ENTITIES: sort_order,
CONF_DOMAINS: ["sensor"],
},
}
},
{},
)
await async_setup_component(hass, "sensor", {})
await async_recorder_block_till_done(hass)

View File

@ -0,0 +1,161 @@
"""The tests the History component websocket_api."""
# pylint: disable=protected-access,invalid-name
import pytest
from homeassistant.components import recorder
from homeassistant.components.recorder import Recorder
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from tests.components.recorder.common import (
async_recorder_block_till_done,
async_wait_recording_done,
old_db_schema,
)
from tests.typing import WebSocketGenerator
@pytest.fixture(autouse=True)
def db_schema_32():
"""Fixture to initialize the db with the old schema 32."""
with old_db_schema("32"):
yield
async def test_history_during_period(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test history_during_period."""
now = dt_util.utcnow()
await async_setup_component(hass, "history", {})
await async_setup_component(hass, "sensor", {})
await async_recorder_block_till_done(hass)
recorder.get_instance(hass).states_meta_manager.active = False
assert recorder.get_instance(hass).schema_version == 32
hass.states.async_set("sensor.test", "on", attributes={"any": "attr"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "off", attributes={"any": "attr"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "off", attributes={"any": "changed"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "off", attributes={"any": "again"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "on", attributes={"any": "attr"})
await async_wait_recording_done(hass)
await async_wait_recording_done(hass)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "history/history_during_period",
"start_time": now.isoformat(),
"end_time": now.isoformat(),
"entity_ids": ["sensor.test"],
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": True,
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {}
await client.send_json(
{
"id": 2,
"type": "history/history_during_period",
"start_time": now.isoformat(),
"entity_ids": ["sensor.test"],
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": True,
"minimal_response": True,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 2
sensor_test_history = response["result"]["sensor.test"]
assert len(sensor_test_history) == 3
assert sensor_test_history[0]["s"] == "on"
assert sensor_test_history[0]["a"] == {}
assert isinstance(sensor_test_history[0]["lu"], float)
assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu)
assert "a" not in sensor_test_history[1]
assert sensor_test_history[1]["s"] == "off"
assert isinstance(sensor_test_history[1]["lu"], float)
assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu)
assert sensor_test_history[2]["s"] == "on"
assert "a" not in sensor_test_history[2]
await client.send_json(
{
"id": 3,
"type": "history/history_during_period",
"start_time": now.isoformat(),
"entity_ids": ["sensor.test"],
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": False,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 3
sensor_test_history = response["result"]["sensor.test"]
assert len(sensor_test_history) == 5
assert sensor_test_history[0]["s"] == "on"
assert sensor_test_history[0]["a"] == {"any": "attr"}
assert isinstance(sensor_test_history[0]["lu"], float)
assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu)
assert sensor_test_history[1]["s"] == "off"
assert isinstance(sensor_test_history[1]["lu"], float)
assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu)
assert sensor_test_history[1]["a"] == {"any": "attr"}
assert sensor_test_history[4]["s"] == "on"
assert sensor_test_history[4]["a"] == {"any": "attr"}
await client.send_json(
{
"id": 4,
"type": "history/history_during_period",
"start_time": now.isoformat(),
"entity_ids": ["sensor.test"],
"include_start_time_state": True,
"significant_changes_only": True,
"no_attributes": False,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 4
sensor_test_history = response["result"]["sensor.test"]
assert len(sensor_test_history) == 3
assert sensor_test_history[0]["s"] == "on"
assert sensor_test_history[0]["a"] == {"any": "attr"}
assert isinstance(sensor_test_history[0]["lu"], float)
assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu)
assert sensor_test_history[1]["s"] == "off"
assert isinstance(sensor_test_history[1]["lu"], float)
assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu)
assert sensor_test_history[1]["a"] == {"any": "attr"}
assert sensor_test_history[2]["s"] == "on"
assert sensor_test_history[2]["a"] == {"any": "attr"}

View File

@ -20,6 +20,8 @@ from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from tests.common import async_fire_time_changed, get_fixture_path
from tests.components.recorder.common import async_wait_recording_done
from tests.typing import RecorderInstanceGenerator
async def test_setup(recorder_mock: Recorder, hass: HomeAssistant) -> None:
@ -1367,13 +1369,20 @@ async def test_measure_cet(recorder_mock: Recorder, hass: HomeAssistant) -> None
@pytest.mark.parametrize("time_zone", ["Europe/Berlin", "America/Chicago", "US/Hawaii"])
async def test_end_time_with_microseconds_zeroed(
time_zone, recorder_mock: Recorder, hass: HomeAssistant
time_zone: str,
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
) -> None:
"""Test the history statistics sensor that has the end time microseconds zeroed out."""
hass.config.set_time_zone(time_zone)
start_of_today = dt_util.now().replace(
day=9, month=7, year=1986, hour=0, minute=0, second=0, microsecond=0
)
with freeze_time(start_of_today):
await async_setup_recorder_instance(hass)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
start_time = start_of_today + timedelta(minutes=60)
t0 = start_time + timedelta(minutes=20)
t1 = t0 + timedelta(minutes=10)
@ -1434,7 +1443,7 @@ async def test_end_time_with_microseconds_zeroed(
await hass.async_block_till_done()
assert hass.states.get("sensor.heatpump_compressor_today").state == "1.83"
hass.states.async_set("binary_sensor.heatpump_compressor_state", "on")
await hass.async_block_till_done()
await async_wait_recording_done(hass)
time_600 = start_of_today + timedelta(hours=6)
with freeze_time(time_600):
async_fire_time_changed(hass, time_600)
@ -1473,6 +1482,7 @@ async def test_end_time_with_microseconds_zeroed(
)
with freeze_time(rolled_to_next_day_plus_16_860000):
hass.states.async_set("binary_sensor.heatpump_compressor_state", "off")
await async_wait_recording_done(hass)
async_fire_time_changed(hass, rolled_to_next_day_plus_16_860000)
await hass.async_block_till_done()

View File

@ -24,7 +24,7 @@ from homeassistant.components.recorder.db_schema import (
from homeassistant.components.recorder.filters import Filters
from homeassistant.components.recorder.history import legacy
from homeassistant.components.recorder.models import LazyState, process_timestamp
from homeassistant.components.recorder.models.legacy import LazyStatePreSchema31
from homeassistant.components.recorder.models.legacy import LegacyLazyStatePreSchema31
from homeassistant.components.recorder.util import session_scope
import homeassistant.core as ha
from homeassistant.core import HomeAssistant, State
@ -41,7 +41,6 @@ from .common import (
wait_recording_done,
)
from tests.common import mock_state_change_event
from tests.typing import RecorderInstanceGenerator
@ -55,14 +54,20 @@ async def _async_get_states(
"""Get states from the database."""
def _get_states_with_session():
if get_instance(hass).schema_version < 31:
klass = LazyStatePreSchema31
else:
klass = LazyState
with session_scope(hass=hass, read_only=True) as session:
attr_cache = {}
pre_31_schema = get_instance(hass).schema_version < 31
return [
klass(row, attr_cache, None)
LegacyLazyStatePreSchema31(row, attr_cache, None)
if pre_31_schema
else LazyState(
row,
attr_cache,
None,
row.entity_id,
row.state,
getattr(row, "last_updated_ts", None),
)
for row in legacy._get_rows_with_session(
hass,
session,
@ -112,44 +117,6 @@ def _add_db_entries(
)
def _setup_get_states(hass):
"""Set up for testing get_states."""
states = []
now = dt_util.utcnow()
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=now
):
for i in range(5):
state = ha.State(
f"test.point_in_time_{i % 5}",
f"State {i}",
{"attribute_test": i},
)
mock_state_change_event(hass, state)
states.append(state)
wait_recording_done(hass)
future = now + timedelta(seconds=1)
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=future
):
for i in range(5):
state = ha.State(
f"test.point_in_time_{i % 5}",
f"State {i}",
{"attribute_test": i},
)
mock_state_change_event(hass, state)
wait_recording_done(hass)
return now, future, states
def test_get_full_significant_states_with_session_entity_no_matches(
hass_recorder: Callable[..., HomeAssistant]
) -> None:
@ -297,12 +264,12 @@ def test_state_changes_during_period_descending(
wait_recording_done(hass)
return hass.states.get(entity_id)
start = dt_util.utcnow()
start = dt_util.utcnow().replace(microsecond=0)
point = start + timedelta(seconds=1)
point2 = start + timedelta(seconds=1, microseconds=2)
point3 = start + timedelta(seconds=1, microseconds=3)
point4 = start + timedelta(seconds=1, microseconds=4)
end = point + timedelta(seconds=1)
point2 = start + timedelta(seconds=1, microseconds=100)
point3 = start + timedelta(seconds=1, microseconds=200)
point4 = start + timedelta(seconds=1, microseconds=300)
end = point + timedelta(seconds=1, microseconds=400)
with patch(
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=start
@ -336,6 +303,7 @@ def test_state_changes_during_period_descending(
hist = history.state_changes_during_period(
hass, start, end, entity_id, no_attributes=False, descending=False
)
assert_multiple_states_equal_without_context(states, hist[entity_id])
hist = history.state_changes_during_period(
@ -345,6 +313,57 @@ def test_state_changes_during_period_descending(
states, list(reversed(list(hist[entity_id])))
)
start_time = point2 + timedelta(microseconds=10)
hist = history.state_changes_during_period(
hass,
start_time, # Pick a point where we will generate a start time state
end,
entity_id,
no_attributes=False,
descending=True,
include_start_time_state=True,
)
hist_states = list(hist[entity_id])
assert hist_states[-1].last_updated == start_time
assert hist_states[-1].last_changed == start_time
assert len(hist_states) == 3
# Make sure they are in descending order
assert (
hist_states[0].last_updated
> hist_states[1].last_updated
> hist_states[2].last_updated
)
assert (
hist_states[0].last_changed
> hist_states[1].last_changed
> hist_states[2].last_changed
)
hist = history.state_changes_during_period(
hass,
start_time, # Pick a point where we will generate a start time state
end,
entity_id,
no_attributes=False,
descending=False,
include_start_time_state=True,
)
hist_states = list(hist[entity_id])
assert hist_states[0].last_updated == start_time
assert hist_states[0].last_changed == start_time
assert len(hist_states) == 3
# Make sure they are in ascending order
assert (
hist_states[0].last_updated
< hist_states[1].last_updated
< hist_states[2].last_updated
)
assert (
hist_states[0].last_changed
< hist_states[1].last_changed
< hist_states[2].last_changed
)
def test_get_last_state_changes(hass_recorder: Callable[..., HomeAssistant]) -> None:
"""Test number of state changes."""
@ -548,13 +567,16 @@ def test_get_significant_states_with_initial(
hass = hass_recorder()
hass.config.set_time_zone(time_zone)
zero, four, states = record_states(hass)
one = zero + timedelta(seconds=1)
one_and_half = zero + timedelta(seconds=1.5)
for entity_id in states:
if entity_id == "media_player.test":
states[entity_id] = states[entity_id][1:]
for state in states[entity_id]:
if state.last_changed == one:
# If the state is recorded before the start time
# start it will have its last_updated and last_changed
# set to the start time.
if state.last_updated < one_and_half:
state.last_updated = one_and_half
state.last_changed = one_and_half
hist = history.get_significant_states(

View File

@ -269,7 +269,7 @@ async def test_lazy_state_handles_include_json(
entity_id="sensor.invalid",
shared_attrs="{INVALID_JSON}",
)
assert LazyState(row, {}, None).attributes == {}
assert LazyState(row, {}, None, row.entity_id, "", 1).attributes == {}
assert "Error converting row to state attributes" in caplog.text
@ -282,7 +282,7 @@ async def test_lazy_state_prefers_shared_attrs_over_attrs(
shared_attrs='{"shared":true}',
attributes='{"shared":false}',
)
assert LazyState(row, {}, None).attributes == {"shared": True}
assert LazyState(row, {}, None, row.entity_id, "", 1).attributes == {"shared": True}
async def test_lazy_state_handles_different_last_updated_and_last_changed(
@ -297,7 +297,7 @@ async def test_lazy_state_handles_different_last_updated_and_last_changed(
last_updated_ts=now.timestamp(),
last_changed_ts=(now - timedelta(seconds=60)).timestamp(),
)
lstate = LazyState(row, {}, None)
lstate = LazyState(row, {}, None, row.entity_id, row.state, row.last_updated_ts)
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
@ -328,7 +328,7 @@ async def test_lazy_state_handles_same_last_updated_and_last_changed(
last_updated_ts=now.timestamp(),
last_changed_ts=now.timestamp(),
)
lstate = LazyState(row, {}, None)
lstate = LazyState(row, {}, None, row.entity_id, row.state, row.last_updated_ts)
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",

View File

@ -0,0 +1,98 @@
"""The tests for the Recorder component legacy models."""
from datetime import datetime, timedelta
from unittest.mock import PropertyMock
import pytest
from homeassistant.components.recorder.models.legacy import LegacyLazyState
from homeassistant.util import dt as dt_util
async def test_legacy_lazy_state_prefers_shared_attrs_over_attrs(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that the LazyState prefers shared_attrs over attributes."""
row = PropertyMock(
entity_id="sensor.invalid",
shared_attrs='{"shared":true}',
attributes='{"shared":false}',
)
assert LegacyLazyState(row, {}, None).attributes == {"shared": True}
async def test_legacy_lazy_state_handles_different_last_updated_and_last_changed(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that the LazyState handles different last_updated and last_changed."""
now = datetime(2021, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC)
row = PropertyMock(
entity_id="sensor.valid",
state="off",
shared_attrs='{"shared":true}',
last_updated_ts=now.timestamp(),
last_changed_ts=(now - timedelta(seconds=60)).timestamp(),
)
lstate = LegacyLazyState(row, {}, None)
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
"last_changed": "2021-06-12T03:03:01.000323+00:00",
"last_updated": "2021-06-12T03:04:01.000323+00:00",
"state": "off",
}
assert lstate.last_updated.timestamp() == row.last_updated_ts
assert lstate.last_changed.timestamp() == row.last_changed_ts
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
"last_changed": "2021-06-12T03:03:01.000323+00:00",
"last_updated": "2021-06-12T03:04:01.000323+00:00",
"state": "off",
}
async def test_legacy_lazy_state_handles_same_last_updated_and_last_changed(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that the LazyState handles same last_updated and last_changed."""
now = datetime(2021, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC)
row = PropertyMock(
entity_id="sensor.valid",
state="off",
shared_attrs='{"shared":true}',
last_updated_ts=now.timestamp(),
last_changed_ts=now.timestamp(),
)
lstate = LegacyLazyState(row, {}, None)
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
"last_changed": "2021-06-12T03:04:01.000323+00:00",
"last_updated": "2021-06-12T03:04:01.000323+00:00",
"state": "off",
}
assert lstate.last_updated.timestamp() == row.last_updated_ts
assert lstate.last_changed.timestamp() == row.last_changed_ts
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
"last_changed": "2021-06-12T03:04:01.000323+00:00",
"last_updated": "2021-06-12T03:04:01.000323+00:00",
"state": "off",
}
lstate.last_updated = datetime(2020, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC)
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
"last_changed": "2021-06-12T03:04:01.000323+00:00",
"last_updated": "2020-06-12T03:04:01.000323+00:00",
"state": "off",
}
lstate.last_changed = datetime(2020, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC)
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
"last_changed": "2020-06-12T03:04:01.000323+00:00",
"last_updated": "2020-06-12T03:04:01.000323+00:00",
"state": "off",
}

View File

@ -7,7 +7,7 @@ import sqlite3
from unittest.mock import MagicMock, Mock, patch
import pytest
from sqlalchemy import text
from sqlalchemy import lambda_stmt, text
from sqlalchemy.engine.result import ChunkedIteratorResult
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.sql.elements import TextClause
@ -18,7 +18,7 @@ from homeassistant.components.recorder import util
from homeassistant.components.recorder.const import DOMAIN, SQLITE_URL_PREFIX
from homeassistant.components.recorder.db_schema import RecorderRuns
from homeassistant.components.recorder.history.modern import (
_get_single_entity_states_stmt,
_get_single_entity_start_time_stmt,
)
from homeassistant.components.recorder.models import (
UnsupportedDialect,
@ -908,7 +908,12 @@ def test_execute_stmt_lambda_element(
with session_scope(hass=hass) as session:
# No time window, we always get a list
metadata_id = instance.states_meta_manager.get("sensor.on", session, True)
stmt = _get_single_entity_states_stmt(dt_util.utcnow(), metadata_id, False)
start_time_ts = dt_util.utcnow().timestamp()
stmt = lambda_stmt(
lambda: _get_single_entity_start_time_stmt(
start_time_ts, metadata_id, False, False
)
)
rows = util.execute_stmt_lambda_element(session, stmt)
assert isinstance(rows, list)
assert rows[0].state == new_state.state