Speed up sql queries where ORM rows are not needed (#91839)
* Speed up logbook and history queries where ORM rows are not needed This avoids having sqlalchemy wrap Result in ChunkedIteratorResult which has additional overhead we do not need for these cases * more places * anything that uses _sorted_statistics_to_dict does not need orm rows eitherpull/91848/head
parent
2663901603
commit
95fcdc5684
|
@ -19,7 +19,10 @@ from homeassistant.components.recorder.models import (
|
|||
process_datetime_to_timestamp,
|
||||
process_timestamp_to_utc_isoformat,
|
||||
)
|
||||
from homeassistant.components.recorder.util import session_scope
|
||||
from homeassistant.components.recorder.util import (
|
||||
execute_stmt_lambda_element,
|
||||
session_scope,
|
||||
)
|
||||
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
|
||||
from homeassistant.const import (
|
||||
ATTR_DOMAIN,
|
||||
|
@ -135,25 +138,6 @@ class EventProcessor:
|
|||
end_day: dt,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Get events for a period of time."""
|
||||
|
||||
def yield_rows(result: Result) -> Sequence[Row] | Result:
|
||||
"""Yield rows from the database."""
|
||||
# end_day - start_day intentionally checks .days and not .total_seconds()
|
||||
# since we don't want to switch over to buffered if they go
|
||||
# over one day by a few hours since the UI makes it so easy to do that.
|
||||
if self.limited_select or (end_day - start_day).days <= 1:
|
||||
return result.all()
|
||||
# Only buffer rows to reduce memory pressure
|
||||
# if we expect the result set is going to be very large.
|
||||
# What is considered very large is going to differ
|
||||
# based on the hardware Home Assistant is running on.
|
||||
#
|
||||
# sqlalchemy suggests that is at least 10k, but for
|
||||
# even and RPi3 that number seems higher in testing
|
||||
# so we don't switch over until we request > 1 day+ of data.
|
||||
#
|
||||
return result.yield_per(1024)
|
||||
|
||||
with session_scope(hass=self.hass, read_only=True) as session:
|
||||
metadata_ids: list[int] | None = None
|
||||
instance = get_instance(self.hass)
|
||||
|
@ -178,7 +162,9 @@ class EventProcessor:
|
|||
self.filters,
|
||||
self.context_id,
|
||||
)
|
||||
return self.humanify(yield_rows(session.execute(stmt)))
|
||||
return self.humanify(
|
||||
execute_stmt_lambda_element(session, stmt, orm_rows=False)
|
||||
)
|
||||
|
||||
def humanify(
|
||||
self, rows: Generator[EventAsRow, None, None] | Sequence[Row] | Result
|
||||
|
|
|
@ -273,7 +273,7 @@ def get_significant_states_with_session(
|
|||
],
|
||||
)
|
||||
return _sorted_states_to_dict(
|
||||
execute_stmt_lambda_element(session, stmt, None, end_time),
|
||||
execute_stmt_lambda_element(session, stmt, None, end_time, orm_rows=False),
|
||||
start_time_ts if include_start_time_state else None,
|
||||
entity_ids,
|
||||
entity_id_to_metadata_id,
|
||||
|
@ -426,7 +426,9 @@ def state_changes_during_period(
|
|||
return cast(
|
||||
MutableMapping[str, list[State]],
|
||||
_sorted_states_to_dict(
|
||||
execute_stmt_lambda_element(session, stmt, None, end_time),
|
||||
execute_stmt_lambda_element(
|
||||
session, stmt, None, end_time, orm_rows=False
|
||||
),
|
||||
start_time_ts if include_start_time_state else None,
|
||||
entity_ids,
|
||||
entity_id_to_metadata_id,
|
||||
|
@ -518,7 +520,7 @@ def get_last_state_changes(
|
|||
number_of_states, metadata_id
|
||||
),
|
||||
)
|
||||
states = list(execute_stmt_lambda_element(session, stmt))
|
||||
states = list(execute_stmt_lambda_element(session, stmt, orm_rows=False))
|
||||
return cast(
|
||||
MutableMapping[str, list[State]],
|
||||
_sorted_states_to_dict(
|
||||
|
|
|
@ -1555,7 +1555,9 @@ def _statistics_during_period_with_session(
|
|||
stmt = _generate_statistics_during_period_stmt(
|
||||
start_time, end_time, metadata_ids, table, types
|
||||
)
|
||||
stats = cast(Sequence[Row], execute_stmt_lambda_element(session, stmt))
|
||||
stats = cast(
|
||||
Sequence[Row], execute_stmt_lambda_element(session, stmt, orm_rows=False)
|
||||
)
|
||||
|
||||
if not stats:
|
||||
return {}
|
||||
|
@ -1664,7 +1666,9 @@ def _get_last_statistics(
|
|||
stmt = _get_last_statistics_stmt(metadata_id, number_of_stats)
|
||||
else:
|
||||
stmt = _get_last_statistics_short_term_stmt(metadata_id, number_of_stats)
|
||||
stats = cast(Sequence[Row], execute_stmt_lambda_element(session, stmt))
|
||||
stats = cast(
|
||||
Sequence[Row], execute_stmt_lambda_element(session, stmt, orm_rows=False)
|
||||
)
|
||||
|
||||
if not stats:
|
||||
return {}
|
||||
|
@ -1755,7 +1759,9 @@ def get_latest_short_term_statistics(
|
|||
if statistic_id in metadata
|
||||
]
|
||||
stmt = _latest_short_term_statistics_stmt(metadata_ids)
|
||||
stats = cast(Sequence[Row], execute_stmt_lambda_element(session, stmt))
|
||||
stats = cast(
|
||||
Sequence[Row], execute_stmt_lambda_element(session, stmt, orm_rows=False)
|
||||
)
|
||||
if not stats:
|
||||
return {}
|
||||
|
||||
|
|
|
@ -97,7 +97,7 @@ class EventDataManager(BaseLRUTableManager[EventData]):
|
|||
with session.no_autoflush:
|
||||
for hashs_chunk in chunked(hashes, SQLITE_MAX_BIND_VARS):
|
||||
for data_id, shared_data in execute_stmt_lambda_element(
|
||||
session, get_shared_event_datas(hashs_chunk)
|
||||
session, get_shared_event_datas(hashs_chunk), orm_rows=False
|
||||
):
|
||||
results[shared_data] = self._id_map[shared_data] = cast(
|
||||
int, data_id
|
||||
|
|
|
@ -69,7 +69,7 @@ class EventTypeManager(BaseLRUTableManager[EventTypes]):
|
|||
with session.no_autoflush:
|
||||
for missing_chunk in chunked(missing, SQLITE_MAX_BIND_VARS):
|
||||
for event_type_id, event_type in execute_stmt_lambda_element(
|
||||
session, find_event_type_ids(missing_chunk)
|
||||
session, find_event_type_ids(missing_chunk), orm_rows=False
|
||||
):
|
||||
results[event_type] = self._id_map[event_type] = cast(
|
||||
int, event_type_id
|
||||
|
|
|
@ -114,7 +114,7 @@ class StateAttributesManager(BaseLRUTableManager[StateAttributes]):
|
|||
with session.no_autoflush:
|
||||
for hashs_chunk in chunked(hashes, SQLITE_MAX_BIND_VARS):
|
||||
for attributes_id, shared_attrs in execute_stmt_lambda_element(
|
||||
session, get_shared_attributes(hashs_chunk)
|
||||
session, get_shared_attributes(hashs_chunk), orm_rows=False
|
||||
):
|
||||
results[shared_attrs] = self._id_map[shared_attrs] = cast(
|
||||
int, attributes_id
|
||||
|
|
|
@ -67,7 +67,7 @@ class StatesMetaManager(BaseLRUTableManager[StatesMeta]):
|
|||
cast(
|
||||
Sequence[tuple[int, str]],
|
||||
execute_stmt_lambda_element(
|
||||
session, find_all_states_metadata_ids()
|
||||
session, find_all_states_metadata_ids(), orm_rows=False
|
||||
),
|
||||
)
|
||||
)
|
||||
|
|
|
@ -109,6 +109,7 @@ class StatisticsMetaManager:
|
|||
_generate_get_metadata_stmt(
|
||||
statistic_ids, statistic_type, statistic_source
|
||||
),
|
||||
orm_rows=False,
|
||||
):
|
||||
statistics_meta = cast(StatisticsMeta, row)
|
||||
id_meta = _statistics_meta_to_id_statistics_metadata(statistics_meta)
|
||||
|
|
|
@ -199,6 +199,7 @@ def execute_stmt_lambda_element(
|
|||
start_time: datetime | None = None,
|
||||
end_time: datetime | None = None,
|
||||
yield_per: int = DEFAULT_YIELD_STATES_ROWS,
|
||||
orm_rows: bool = True,
|
||||
) -> Sequence[Row] | Result:
|
||||
"""Execute a StatementLambdaElement.
|
||||
|
||||
|
@ -211,10 +212,13 @@ def execute_stmt_lambda_element(
|
|||
specific entities) since they are usually faster
|
||||
with .all().
|
||||
"""
|
||||
executed = session.execute(stmt)
|
||||
use_all = not start_time or ((end_time or dt_util.utcnow()) - start_time).days <= 1
|
||||
for tryno in range(RETRIES):
|
||||
try:
|
||||
if orm_rows:
|
||||
executed = session.execute(stmt)
|
||||
else:
|
||||
executed = session.connection().execute(stmt)
|
||||
if use_all:
|
||||
return executed.all()
|
||||
return executed.yield_per(yield_per)
|
||||
|
|
|
@ -893,15 +893,16 @@ def test_execute_stmt_lambda_element(
|
|||
now = dt_util.utcnow()
|
||||
tomorrow = now + timedelta(days=1)
|
||||
one_week_from_now = now + timedelta(days=7)
|
||||
all_calls = 0
|
||||
|
||||
class MockExecutor:
|
||||
def __init__(self, stmt):
|
||||
assert isinstance(stmt, StatementLambdaElement)
|
||||
self.calls = 0
|
||||
|
||||
def all(self):
|
||||
self.calls += 1
|
||||
if self.calls == 2:
|
||||
nonlocal all_calls
|
||||
all_calls += 1
|
||||
if all_calls == 2:
|
||||
return ["mock_row"]
|
||||
raise SQLAlchemyError
|
||||
|
||||
|
@ -926,6 +927,16 @@ def test_execute_stmt_lambda_element(
|
|||
assert row.state == new_state.state
|
||||
assert row.metadata_id == metadata_id
|
||||
|
||||
# Time window >= 2 days, we should not get a ChunkedIteratorResult
|
||||
# because orm_rows=False
|
||||
rows = util.execute_stmt_lambda_element(
|
||||
session, stmt, now, one_week_from_now, orm_rows=False
|
||||
)
|
||||
assert not isinstance(rows, ChunkedIteratorResult)
|
||||
row = next(rows)
|
||||
assert row.state == new_state.state
|
||||
assert row.metadata_id == metadata_id
|
||||
|
||||
# Time window < 2 days, we get a list
|
||||
rows = util.execute_stmt_lambda_element(session, stmt, now, tomorrow)
|
||||
assert isinstance(rows, list)
|
||||
|
|
Loading…
Reference in New Issue