Reduce state conversion overhead in history by lazy loading properties (#36963)

pull/36979/head
J. Nick Koston 2020-06-22 09:19:33 -05:00 committed by GitHub
parent b47be05efc
commit e25f216fd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 139 additions and 62 deletions

View File

@ -2,6 +2,7 @@
from collections import defaultdict
from datetime import timedelta
from itertools import groupby
import json
import logging
import time
from typing import Optional, cast
@ -22,7 +23,7 @@ from homeassistant.const import (
CONF_INCLUDE,
HTTP_BAD_REQUEST,
)
from homeassistant.core import split_entity_id
from homeassistant.core import Context, State, split_entity_id
import homeassistant.helpers.config_validation as cv
import homeassistant.util.dt as dt_util
@ -200,13 +201,6 @@ def _get_states_with_session(
session, utc_point_in_time, entity_ids=None, run=None, filters=None
):
"""Return the states at a specific point in time."""
if run is None:
run = recorder.run_information_with_session(session, utc_point_in_time)
# History did not run before utc_point_in_time
if run is None:
return []
query = session.query(*QUERY_STATES)
if entity_ids and len(entity_ids) == 1:
@ -214,64 +208,69 @@ def _get_states_with_session(
# have a single entity id
query = (
query.filter(
States.last_updated >= run.start,
States.last_updated < utc_point_in_time,
States.entity_id.in_(entity_ids),
)
.order_by(States.last_updated.desc())
.limit(1)
)
return _dbquery_to_non_hidden_states(query)
else:
# We have more than one entity to look at (most commonly we want
# all entities,) so we need to do a search on all states since the
# last recorder run started.
if run is None:
run = recorder.run_information_with_session(session, utc_point_in_time)
most_recent_states_by_date = session.query(
States.entity_id.label("max_entity_id"),
func.max(States.last_updated).label("max_last_updated"),
).filter(
(States.last_updated >= run.start)
& (States.last_updated < utc_point_in_time)
)
# History did not run before utc_point_in_time
if run is None:
return []
if entity_ids:
most_recent_states_by_date.filter(States.entity_id.in_(entity_ids))
# We have more than one entity to look at (most commonly we want
# all entities,) so we need to do a search on all states since the
# last recorder run started.
most_recent_states_by_date = most_recent_states_by_date.group_by(
States.entity_id
)
most_recent_states_by_date = session.query(
States.entity_id.label("max_entity_id"),
func.max(States.last_updated).label("max_last_updated"),
).filter(
(States.last_updated >= run.start) & (States.last_updated < utc_point_in_time)
)
most_recent_states_by_date = most_recent_states_by_date.subquery()
if entity_ids:
most_recent_states_by_date.filter(States.entity_id.in_(entity_ids))
most_recent_state_ids = session.query(
func.max(States.state_id).label("max_state_id")
).join(
most_recent_states_by_date,
and_(
States.entity_id == most_recent_states_by_date.c.max_entity_id,
States.last_updated == most_recent_states_by_date.c.max_last_updated,
),
)
most_recent_states_by_date = most_recent_states_by_date.group_by(States.entity_id)
most_recent_state_ids = most_recent_state_ids.group_by(States.entity_id)
most_recent_states_by_date = most_recent_states_by_date.subquery()
most_recent_state_ids = most_recent_state_ids.subquery()
most_recent_state_ids = session.query(
func.max(States.state_id).label("max_state_id")
).join(
most_recent_states_by_date,
and_(
States.entity_id == most_recent_states_by_date.c.max_entity_id,
States.last_updated == most_recent_states_by_date.c.max_last_updated,
),
)
query = query.join(
most_recent_state_ids,
States.state_id == most_recent_state_ids.c.max_state_id,
).filter(~States.domain.in_(IGNORE_DOMAINS))
most_recent_state_ids = most_recent_state_ids.group_by(States.entity_id)
if filters:
query = filters.apply(query, entity_ids)
most_recent_state_ids = most_recent_state_ids.subquery()
query = query.join(
most_recent_state_ids, States.state_id == most_recent_state_ids.c.max_state_id,
).filter(~States.domain.in_(IGNORE_DOMAINS))
if filters:
query = filters.apply(query, entity_ids)
return _dbquery_to_non_hidden_states(query)
def _dbquery_to_non_hidden_states(query):
"""Return states that are not hidden."""
return [
state
for state in (
States.to_native(row, validate_entity_id=False) for row in execute(query)
)
if not state.attributes.get(ATTR_HIDDEN, False)
for state in (LazyState(row) for row in execute(query))
if not state.hidden
]
@ -329,15 +328,12 @@ def _sorted_states_to_json(
ent_results.extend(
[
native_state
for native_state in (
States.to_native(db_state, validate_entity_id=False)
for db_state in group
)
for native_state in (LazyState(db_state) for db_state in group)
if (
domain != SCRIPT_DOMAIN
or native_state.attributes.get(ATTR_CAN_CANCEL)
)
and not native_state.attributes.get(ATTR_HIDDEN, False)
and not native_state.hidden
]
)
continue
@ -347,15 +343,14 @@ def _sorted_states_to_json(
# in-between only provide the "state" and the
# "last_changed".
if not ent_results:
ent_results.append(States.to_native(next(group), validate_entity_id=False))
ent_results.append(LazyState(next(group)))
initial_state = ent_results[-1]
prev_state = ent_results[-1]
initial_state_count = len(ent_results)
for db_state in group:
if ATTR_HIDDEN in db_state.attributes and States.to_native(
db_state, validate_entity_id=False
if ATTR_HIDDEN in db_state.attributes and LazyState(
db_state
).attributes.get(ATTR_HIDDEN, False):
continue
@ -374,15 +369,11 @@ def _sorted_states_to_json(
)
prev_state = db_state
if (
prev_state
and prev_state != initial_state
and len(ent_results) != initial_state_count
):
if prev_state and len(ent_results) != initial_state_count:
# There was at least one state change
# replace the last minimal state with
# a full state
ent_results[-1] = States.to_native(prev_state, validate_entity_id=False)
ent_results[-1] = LazyState(prev_state)
# Filter out the empty lists if some states had 0 results.
return {key: val for key, val in result.items() if val}
@ -594,3 +585,89 @@ class Filters:
if self.excluded_entities:
query = query.filter(~States.entity_id.in_(self.excluded_entities))
return query
class LazyState(State):
"""A lazy version of core State."""
__slots__ = [
"_row",
"entity_id",
"state",
"_attributes",
"_last_changed",
"_last_updated",
"_context",
]
def __init__(self, row): # pylint: disable=super-init-not-called
"""Init the lazy state."""
self._row = row
self.entity_id = self._row.entity_id
self.state = self._row.state
self._attributes = None
self._last_changed = None
self._last_updated = None
self._context = None
@property
def attributes(self):
"""State attributes."""
if not self._attributes:
try:
self._attributes = json.loads(self._row.attributes)
except ValueError:
# When json.loads fails
_LOGGER.exception("Error converting row to state: %s", self)
self._attributes = {}
return self._attributes
@property
def hidden(self):
"""Determine if a state is hidden."""
if ATTR_HIDDEN not in self._row.attributes:
return False
return self.attributes.get(ATTR_HIDDEN, False)
@property
def context(self):
"""State context."""
if not self._context:
self._context = Context(
id=self._row.context_id, user_id=self._row.context_user_id
)
return self._context
@property # type: ignore
def last_changed(self):
"""Last changed datetime."""
if not self._last_changed:
self._last_changed = process_timestamp(self._row.last_changed)
return self._last_changed
@last_changed.setter
def last_changed(self, value):
"""Set last changed datetime."""
self._last_changed = value
@property # type: ignore
def last_updated(self):
"""Last updated datetime."""
if not self._last_updated:
self._last_updated = process_timestamp(self._row.last_updated)
return self._last_updated
@last_updated.setter
def last_updated(self, value):
"""Set last updated datetime."""
self._last_updated = value
def __eq__(self, other):
"""Return the comparison."""
return (
other.__class__ in [self.__class__, State]
and self.entity_id == other.entity_id
and self.state == other.state
and self.attributes == other.attributes
and self.context == other.context
)