Convert unindexed domain queries to entity_id queries (#68404)

pull/68377/head^2
J. Nick Koston 2022-03-20 01:28:17 -10:00 committed by GitHub
parent 816695cc96
commit 3150915cb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 127 additions and 65 deletions

View File

@ -363,7 +363,14 @@ class Filters:
"""Generate the entity filter query."""
includes = []
if self.included_domains:
includes.append(history_models.States.domain.in_(self.included_domains))
includes.append(
or_(
*[
history_models.States.entity_id.like(f"{domain}.%")
for domain in self.included_domains
]
).self_group()
)
if self.included_entities:
includes.append(history_models.States.entity_id.in_(self.included_entities))
for glob in self.included_entity_globs:
@ -371,7 +378,14 @@ class Filters:
excludes = []
if self.excluded_domains:
excludes.append(history_models.States.domain.in_(self.excluded_domains))
excludes.append(
or_(
*[
history_models.States.entity_id.like(f"{domain}.%")
for domain in self.excluded_domains
]
).self_group()
)
if self.excluded_entities:
excludes.append(history_models.States.entity_id.in_(self.excluded_entities))
for glob in self.excluded_entity_globs:

View File

@ -1,6 +1,9 @@
"""Event parser and human readable log generator."""
from __future__ import annotations
from collections.abc import Iterable
from contextlib import suppress
from datetime import timedelta
from datetime import datetime as dt, timedelta
from http import HTTPStatus
from itertools import groupby
import json
@ -9,6 +12,8 @@ from typing import Any
import sqlalchemy
from sqlalchemy.orm import aliased
from sqlalchemy.orm.query import Query
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import literal
import voluptuous as vol
@ -59,13 +64,14 @@ from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import bind_hass
import homeassistant.util.dt as dt_util
ENTITY_ID_JSON_TEMPLATE = '"entity_id":"{}"'
ENTITY_ID_JSON_TEMPLATE = '%"entity_id":"{}"%'
ENTITY_ID_JSON_EXTRACT = re.compile('"entity_id": ?"([^"]+)"')
DOMAIN_JSON_EXTRACT = re.compile('"domain": ?"([^"]+)"')
ICON_JSON_EXTRACT = re.compile('"icon": ?"([^"]+)"')
ATTR_MESSAGE = "message"
CONTINUOUS_DOMAINS = ["proximity", "sensor"]
CONTINUOUS_DOMAINS = {"proximity", "sensor"}
CONTINUOUS_ENTITY_ID_LIKE = [f"{domain}.%" for domain in CONTINUOUS_DOMAINS]
DOMAIN = "logbook"
@ -73,7 +79,7 @@ GROUP_BY_MINUTES = 15
EMPTY_JSON_OBJECT = "{}"
UNIT_OF_MEASUREMENT_JSON = '"unit_of_measurement":'
UNIT_OF_MEASUREMENT_JSON_LIKE = f"%{UNIT_OF_MEASUREMENT_JSON}%"
HA_DOMAIN_ENTITY_ID = f"{HA_DOMAIN}._"
CONFIG_SCHEMA = vol.Schema(
@ -489,35 +495,39 @@ def _get_events(
)
def _generate_events_query(session):
def _generate_events_query(session: Session) -> Query:
return session.query(
*EVENT_COLUMNS,
States.state,
States.entity_id,
States.domain,
States.attributes,
StateAttributes.shared_attrs,
)
def _generate_events_query_without_states(session):
def _generate_events_query_without_states(session: Session) -> Query:
return session.query(
*EVENT_COLUMNS,
literal(value=None, type_=sqlalchemy.String).label("state"),
literal(value=None, type_=sqlalchemy.String).label("entity_id"),
literal(value=None, type_=sqlalchemy.String).label("domain"),
literal(value=None, type_=sqlalchemy.Text).label("attributes"),
literal(value=None, type_=sqlalchemy.Text).label("shared_attrs"),
)
def _generate_states_query(session, start_day, end_day, old_state, entity_ids):
def _generate_states_query(
session: Session,
start_day: dt,
end_day: dt,
old_state: States,
entity_ids: Iterable[str],
) -> Query:
return (
_generate_events_query(session)
.outerjoin(Events, (States.event_id == Events.event_id))
.outerjoin(old_state, (States.old_state_id == old_state.state_id))
.filter(_missing_state_matcher(old_state))
.filter(_continuous_entity_matcher())
.filter(_not_continuous_entity_matcher())
.filter((States.last_updated > start_day) & (States.last_updated < end_day))
.filter(
(States.last_updated == States.last_changed)
@ -529,7 +539,9 @@ def _generate_states_query(session, start_day, end_day, old_state, entity_ids):
)
def _apply_events_types_and_states_filter(hass, query, old_state):
def _apply_events_types_and_states_filter(
hass: HomeAssistant, query: Query, old_state: States
) -> Query:
events_query = (
query.outerjoin(States, (Events.event_id == States.event_id))
.outerjoin(old_state, (States.old_state_id == old_state.state_id))
@ -538,7 +550,8 @@ def _apply_events_types_and_states_filter(hass, query, old_state):
| _missing_state_matcher(old_state)
)
.filter(
(Events.event_type != EVENT_STATE_CHANGED) | _continuous_entity_matcher()
(Events.event_type != EVENT_STATE_CHANGED)
| _not_continuous_entity_matcher()
)
)
return _apply_event_types_filter(hass, events_query, ALL_EVENT_TYPES).outerjoin(
@ -546,7 +559,7 @@ def _apply_events_types_and_states_filter(hass, query, old_state):
)
def _missing_state_matcher(old_state):
def _missing_state_matcher(old_state: States) -> Any:
# The below removes state change events that do not have
# and old_state or the old_state is missing (newly added entities)
# or the new_state is missing (removed entities)
@ -557,37 +570,64 @@ def _missing_state_matcher(old_state):
)
def _continuous_entity_matcher():
#
# Prefilter out continuous domains that have
# ATTR_UNIT_OF_MEASUREMENT as its much faster in sql.
#
def _not_continuous_entity_matcher() -> Any:
"""Match non continuous entities."""
return sqlalchemy.or_(
sqlalchemy.not_(States.domain.in_(CONTINUOUS_DOMAINS)),
sqlalchemy.not_(States.attributes.contains(UNIT_OF_MEASUREMENT_JSON)),
sqlalchemy.not_(
StateAttributes.shared_attrs.contains(UNIT_OF_MEASUREMENT_JSON)
),
_not_continuous_domain_matcher(),
sqlalchemy.and_(
_continuous_domain_matcher, _not_uom_attributes_matcher()
).self_group(),
)
def _apply_event_time_filter(events_query, start_day, end_day):
def _not_continuous_domain_matcher() -> Any:
"""Match not continuous domains."""
return sqlalchemy.and_(
*[
~States.entity_id.like(entity_domain)
for entity_domain in CONTINUOUS_ENTITY_ID_LIKE
],
).self_group()
def _continuous_domain_matcher() -> Any:
"""Match continuous domains."""
return sqlalchemy.or_(
*[
States.entity_id.like(entity_domain)
for entity_domain in CONTINUOUS_ENTITY_ID_LIKE
],
).self_group()
def _not_uom_attributes_matcher() -> Any:
"""Prefilter ATTR_UNIT_OF_MEASUREMENT as its much faster in sql."""
return ~StateAttributes.shared_attrs.like(
UNIT_OF_MEASUREMENT_JSON_LIKE
) | ~States.attributes.like(UNIT_OF_MEASUREMENT_JSON_LIKE)
def _apply_event_time_filter(events_query: Query, start_day: dt, end_day: dt) -> Query:
return events_query.filter(
(Events.time_fired > start_day) & (Events.time_fired < end_day)
)
def _apply_event_types_filter(hass, query, event_types):
def _apply_event_types_filter(
hass: HomeAssistant, query: Query, event_types: list[str]
) -> Query:
return query.filter(
Events.event_type.in_(event_types + list(hass.data.get(DOMAIN, {})))
)
def _apply_event_entity_id_matchers(events_query, entity_ids):
def _apply_event_entity_id_matchers(
events_query: Query, entity_ids: Iterable[str]
) -> Query:
return events_query.filter(
sqlalchemy.or_(
*(
Events.event_data.contains(ENTITY_ID_JSON_TEMPLATE.format(entity_id))
Events.event_data.like(ENTITY_ID_JSON_TEMPLATE.format(entity_id))
for entity_id in entity_ids
)
)
@ -694,7 +734,7 @@ class LazyEventPartialState:
"event_type",
"entity_id",
"state",
"domain",
"_domain",
"context_id",
"context_user_id",
"context_parent_id",
@ -707,15 +747,22 @@ class LazyEventPartialState:
self._event_data = None
self._time_fired_isoformat = None
self._attributes = None
self._domain = None
self.event_type = self._row.event_type
self.entity_id = self._row.entity_id
self.state = self._row.state
self.domain = self._row.domain
self.context_id = self._row.context_id
self.context_user_id = self._row.context_user_id
self.context_parent_id = self._row.context_parent_id
self.time_fired_minute = self._row.time_fired.minute
@property
def domain(self):
"""Return the domain for the state."""
if self._domain is None:
self._domain = split_entity_id(self.entity_id)[0]
return self._domain
@property
def attributes_icon(self):
"""Extract the icon from the decoded attributes or json."""

View File

@ -7,7 +7,7 @@ from itertools import groupby
import logging
import time
from sqlalchemy import Text, and_, bindparam, func
from sqlalchemy import Text, and_, bindparam, func, or_
from sqlalchemy.ext import baked
from sqlalchemy.sql.expression import literal
@ -30,14 +30,16 @@ _LOGGER = logging.getLogger(__name__)
STATE_KEY = "state"
LAST_CHANGED_KEY = "last_changed"
SIGNIFICANT_DOMAINS = (
SIGNIFICANT_DOMAINS = {
"climate",
"device_tracker",
"humidifier",
"thermostat",
"water_heater",
)
IGNORE_DOMAINS = ("zone", "scene")
}
SIGNIFICANT_DOMAINS_ENTITY_ID_LIKE = [f"{domain}.%" for domain in SIGNIFICANT_DOMAINS]
IGNORE_DOMAINS = {"zone", "scene"}
IGNORE_DOMAINS_ENTITY_ID_LIKE = [f"{domain}.%" for domain in IGNORE_DOMAINS]
NEED_ATTRIBUTE_DOMAINS = {
"climate",
"humidifier",
@ -47,7 +49,6 @@ NEED_ATTRIBUTE_DOMAINS = {
}
BASE_STATES = [
States.domain,
States.entity_id,
States.state,
States.last_changed,
@ -106,26 +107,42 @@ def get_significant_states_with_session(
query_keys = QUERY_STATE_NO_ATTR if no_attributes else QUERY_STATES
baked_query = hass.data[HISTORY_BAKERY](lambda session: session.query(*query_keys))
if significant_changes_only:
baked_query += lambda q: q.filter(
(
States.domain.in_(SIGNIFICANT_DOMAINS)
| (States.last_changed == States.last_updated)
if entity_ids is not None and len(entity_ids) == 1:
if (
significant_changes_only
and split_entity_id(entity_ids[0])[0] not in SIGNIFICANT_DOMAINS
):
baked_query += lambda q: q.filter(
States.last_changed == States.last_updated
)
elif significant_changes_only:
baked_query += lambda q: q.filter(
or_(
*[
States.entity_id.like(entity_domain)
for entity_domain in SIGNIFICANT_DOMAINS_ENTITY_ID_LIKE
],
(States.last_changed == States.last_updated),
)
& (States.last_updated > bindparam("start_time"))
)
else:
baked_query += lambda q: q.filter(States.last_updated > bindparam("start_time"))
if entity_ids is not None:
baked_query += lambda q: q.filter(
States.entity_id.in_(bindparam("entity_ids", expanding=True))
)
else:
baked_query += lambda q: q.filter(~States.domain.in_(IGNORE_DOMAINS))
baked_query += lambda q: q.filter(
and_(
*[
~States.entity_id.like(entity_domain)
for entity_domain in IGNORE_DOMAINS_ENTITY_ID_LIKE
]
)
)
if filters:
filters.bake(baked_query)
baked_query += lambda q: q.filter(States.last_updated > bindparam("start_time"))
if end_time is not None:
baked_query += lambda q: q.filter(States.last_updated < bindparam("end_time"))
@ -365,7 +382,8 @@ def _get_states_with_session(
most_recent_state_ids,
States.state_id == most_recent_state_ids.c.max_state_id,
)
query = query.filter(~States.domain.in_(IGNORE_DOMAINS))
for entity_domain in IGNORE_DOMAINS_ENTITY_ID_LIKE:
query = query.filter(~States.entity_id.like(entity_domain))
if filters:
query = filters.apply(query)
if not no_attributes:

View File

@ -30,11 +30,10 @@ from homeassistant.const import (
MAX_LENGTH_EVENT_CONTEXT_ID,
MAX_LENGTH_EVENT_EVENT_TYPE,
MAX_LENGTH_EVENT_ORIGIN,
MAX_LENGTH_STATE_DOMAIN,
MAX_LENGTH_STATE_ENTITY_ID,
MAX_LENGTH_STATE_STATE,
)
from homeassistant.core import Context, Event, EventOrigin, State, split_entity_id
from homeassistant.core import Context, Event, EventOrigin, State
import homeassistant.util.dt as dt_util
from .const import JSON_DUMP
@ -157,7 +156,6 @@ class States(Base): # type: ignore[misc,valid-type]
)
__tablename__ = TABLE_STATES
state_id = Column(Integer, Identity(), primary_key=True)
domain = Column(String(MAX_LENGTH_STATE_DOMAIN))
entity_id = Column(String(MAX_LENGTH_STATE_ENTITY_ID))
state = Column(String(MAX_LENGTH_STATE_STATE))
attributes = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
@ -178,7 +176,7 @@ class States(Base): # type: ignore[misc,valid-type]
"""Return string representation of instance for debugging."""
return (
f"<recorder.States("
f"id={self.state_id}, domain='{self.domain}', entity_id='{self.entity_id}', "
f"id={self.state_id}, entity_id='{self.entity_id}', "
f"state='{self.state}', event_id='{self.event_id}', "
f"last_updated='{self.last_updated.isoformat(sep=' ', timespec='seconds')}', "
f"old_state_id={self.old_state_id}, attributes_id={self.attributes_id}"
@ -195,11 +193,9 @@ class States(Base): # type: ignore[misc,valid-type]
# None state means the state was removed from the state machine
if state is None:
dbstate.state = ""
dbstate.domain = split_entity_id(entity_id)[0]
dbstate.last_changed = event.time_fired
dbstate.last_updated = event.time_fired
else:
dbstate.domain = state.domain
dbstate.state = state.state
dbstate.last_changed = state.last_changed
dbstate.last_updated = state.last_updated

View File

@ -76,7 +76,6 @@ def test_from_event_to_delete_state():
db_state = States.from_event(event)
assert db_state.entity_id == "sensor.temperature"
assert db_state.domain == "sensor"
assert db_state.state == ""
assert db_state.last_changed == event.time_fired
assert db_state.last_updated == event.time_fired

View File

@ -375,7 +375,6 @@ async def test_purge_edge_case(
session.add(
States(
entity_id="test.recorder2",
domain="sensor",
state="purgeme",
attributes="{}",
last_changed=timestamp,
@ -444,7 +443,6 @@ async def test_purge_cutoff_date(
session.add(
States(
entity_id="test.cutoff",
domain="sensor",
state="keep",
attributes="{}",
last_changed=timestamp_keep,
@ -473,7 +471,6 @@ async def test_purge_cutoff_date(
session.add(
States(
entity_id="test.cutoff",
domain="sensor",
state="purge",
attributes="{}",
last_changed=timestamp_purge,
@ -592,7 +589,6 @@ async def test_purge_filtered_states(
session.add(
States(
entity_id="sensor.excluded",
domain="sensor",
state="purgeme",
attributes="{}",
last_changed=timestamp,
@ -619,7 +615,6 @@ async def test_purge_filtered_states(
)
state_1 = States(
entity_id="sensor.linked_old_state_id",
domain="sensor",
state="keep",
attributes="{}",
last_changed=timestamp,
@ -630,7 +625,6 @@ async def test_purge_filtered_states(
timestamp = dt_util.utcnow() - timedelta(days=4)
state_2 = States(
entity_id="sensor.linked_old_state_id",
domain="sensor",
state="keep",
attributes="{}",
last_changed=timestamp,
@ -640,7 +634,6 @@ async def test_purge_filtered_states(
)
state_3 = States(
entity_id="sensor.linked_old_state_id",
domain="sensor",
state="keep",
attributes="{}",
last_changed=timestamp,
@ -814,7 +807,6 @@ async def test_purge_without_state_attributes_filtered_states_to_empty(
session.add(
States(
entity_id="sensor.old_format",
domain="sensor",
state=STATE_ON,
attributes=json.dumps({"old": "not_using_state_attributes"}),
last_changed=timestamp,
@ -979,7 +971,6 @@ async def test_purge_filtered_events_state_changed(
timestamp = dt_util.utcnow() - timedelta(days=0)
state_1 = States(
entity_id="sensor.linked_old_state_id",
domain="sensor",
state="keep",
attributes="{}",
last_changed=timestamp,
@ -989,7 +980,6 @@ async def test_purge_filtered_events_state_changed(
timestamp = dt_util.utcnow() - timedelta(days=4)
state_2 = States(
entity_id="sensor.linked_old_state_id",
domain="sensor",
state="keep",
attributes="{}",
last_changed=timestamp,
@ -998,7 +988,6 @@ async def test_purge_filtered_events_state_changed(
)
state_3 = States(
entity_id="sensor.linked_old_state_id",
domain="sensor",
state="keep",
attributes="{}",
last_changed=timestamp,
@ -1318,7 +1307,6 @@ def _add_state_and_state_changed_event(
session.add(
States(
entity_id=entity_id,
domain="sensor",
state=state,
attributes=None,
last_changed=timestamp,