From 9215702388eef03c7c3ed9f756ea0db533d5beec Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 18 Mar 2022 00:23:13 -1000 Subject: [PATCH] Separate attrs into another table (reduces database size) (#68224) --- homeassistant/components/logbook/__init__.py | 35 ++- homeassistant/components/plant/__init__.py | 23 +- homeassistant/components/recorder/__init__.py | 75 +++-- homeassistant/components/recorder/history.py | 26 +- .../components/recorder/manifest.json | 2 +- .../components/recorder/migration.py | 3 + homeassistant/components/recorder/models.py | 102 +++++-- homeassistant/components/recorder/purge.py | 133 ++++++++- homeassistant/components/statistics/sensor.py | 15 +- homeassistant/package_constraints.txt | 2 + requirements_all.txt | 4 + requirements_test_all.txt | 4 + tests/components/logbook/test_init.py | 46 +++- tests/components/plant/test_init.py | 19 +- tests/components/recorder/test_init.py | 17 +- tests/components/recorder/test_models.py | 123 ++++++++- tests/components/recorder/test_purge.py | 257 +++++++++++++++++- 17 files changed, 788 insertions(+), 98 deletions(-) diff --git a/homeassistant/components/logbook/__init__.py b/homeassistant/components/logbook/__init__.py index 66c78e30eab..8860daaec3c 100644 --- a/homeassistant/components/logbook/__init__.py +++ b/homeassistant/components/logbook/__init__.py @@ -5,6 +5,7 @@ from http import HTTPStatus from itertools import groupby import json import re +from typing import Any import sqlalchemy from sqlalchemy.orm import aliased @@ -18,6 +19,7 @@ from homeassistant.components.http import HomeAssistantView from homeassistant.components.recorder import get_instance from homeassistant.components.recorder.models import ( Events, + StateAttributes, States, process_timestamp_to_utc_isoformat, ) @@ -494,6 +496,7 @@ def _generate_events_query(session): States.entity_id, States.domain, States.attributes, + StateAttributes.shared_attrs, ) @@ -504,6 +507,7 @@ def _generate_events_query_without_states(session): literal(value=None, type_=sqlalchemy.String).label("entity_id"), literal(value=None, type_=sqlalchemy.String).label("domain"), literal(value=None, type_=sqlalchemy.Text).label("attributes"), + literal(value=None, type_=sqlalchemy.Text).label("shared_attrs"), ) @@ -519,6 +523,9 @@ def _generate_states_query(session, start_day, end_day, old_state, entity_ids): (States.last_updated == States.last_changed) & States.entity_id.in_(entity_ids) ) + .outerjoin( + StateAttributes, (States.attributes_id == StateAttributes.attributes_id) + ) ) @@ -534,7 +541,9 @@ def _apply_events_types_and_states_filter(hass, query, old_state): (Events.event_type != EVENT_STATE_CHANGED) | _continuous_entity_matcher() ) ) - return _apply_event_types_filter(hass, events_query, ALL_EVENT_TYPES) + return _apply_event_types_filter(hass, events_query, ALL_EVENT_TYPES).outerjoin( + StateAttributes, (States.attributes_id == StateAttributes.attributes_id) + ) def _missing_state_matcher(old_state): @@ -556,6 +565,9 @@ def _continuous_entity_matcher(): return sqlalchemy.or_( sqlalchemy.not_(States.domain.in_(CONTINUOUS_DOMAINS)), sqlalchemy.not_(States.attributes.contains(UNIT_OF_MEASUREMENT_JSON)), + sqlalchemy.not_( + StateAttributes.shared_attrs.contains(UNIT_OF_MEASUREMENT_JSON) + ), ) @@ -709,8 +721,9 @@ class LazyEventPartialState: """Extract the icon from the decoded attributes or json.""" if self._attributes: return self._attributes.get(ATTR_ICON) - - result = ICON_JSON_EXTRACT.search(self._row.attributes) + result = ICON_JSON_EXTRACT.search( + self._row.shared_attrs or self._row.attributes + ) return result and result.group(1) @property @@ -734,14 +747,12 @@ class LazyEventPartialState: @property def attributes(self): """State attributes.""" - if not self._attributes: - if ( - self._row.attributes is None - or self._row.attributes == EMPTY_JSON_OBJECT - ): + if self._attributes is None: + source = self._row.shared_attrs or self._row.attributes + if source == EMPTY_JSON_OBJECT or source is None: self._attributes = {} else: - self._attributes = json.loads(self._row.attributes) + self._attributes = json.loads(source) return self._attributes @property @@ -772,12 +783,12 @@ class EntityAttributeCache: that are expected to change state. """ - def __init__(self, hass): + def __init__(self, hass: HomeAssistant) -> None: """Init the cache.""" self._hass = hass - self._cache = {} + self._cache: dict[str, dict[str, Any]] = {} - def get(self, entity_id, attribute, event): + def get(self, entity_id: str, attribute: str, event: LazyEventPartialState) -> Any: """Lookup an attribute for an entity or get it from the cache.""" if entity_id in self._cache: if attribute in self._cache[entity_id]: diff --git a/homeassistant/components/plant/__init__.py b/homeassistant/components/plant/__init__.py index 8b46ad7801e..d9d207e215b 100644 --- a/homeassistant/components/plant/__init__.py +++ b/homeassistant/components/plant/__init__.py @@ -7,7 +7,7 @@ import logging import voluptuous as vol from homeassistant.components.recorder import get_instance -from homeassistant.components.recorder.models import States +from homeassistant.components.recorder.models import StateAttributes, States from homeassistant.components.recorder.util import execute, session_scope from homeassistant.const import ( ATTR_TEMPERATURE, @@ -110,11 +110,6 @@ DOMAIN = "plant" CONFIG_SCHEMA = vol.Schema({DOMAIN: {cv.string: PLANT_SCHEMA}}, extra=vol.ALLOW_EXTRA) -# Flag for enabling/disabling the loading of the history from the database. -# This feature is turned off right now as its tests are not 100% stable. -ENABLE_LOAD_HISTORY = False - - async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the Plant component.""" component = EntityComponent(_LOGGER, DOMAIN, hass) @@ -282,7 +277,7 @@ class Plant(Entity): async def async_added_to_hass(self): """After being added to hass, load from history.""" - if ENABLE_LOAD_HISTORY and "recorder" in self.hass.config.components: + if "recorder" in self.hass.config.components: # only use the database if it's configured await get_instance(self.hass).async_add_executor_job( self._load_history_from_db @@ -315,14 +310,24 @@ class Plant(Entity): _LOGGER.debug("Initializing values for %s from the database", self._name) with session_scope(hass=self.hass) as session: query = ( - session.query(States) + session.query(States, StateAttributes) .filter( (States.entity_id == entity_id.lower()) and (States.last_updated > start_date) ) + .outerjoin( + StateAttributes, + States.attributes_id == StateAttributes.attributes_id, + ) .order_by(States.last_updated.asc()) ) - states = execute(query, to_native=True, validate_entity_ids=False) + states = [] + if results := execute(query, to_native=False, validate_entity_ids=False): + for state, attributes in results: + native = state.to_native() + if not native.attributes: + native.attributes = attributes.to_native() + states.append(native) for state in states: # filter out all None, NaN and "unknown" states diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 6894d1367e6..2ee6896d032 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -14,6 +14,7 @@ import threading import time from typing import Any, TypeVar +from lru import LRU # pylint: disable=no-name-in-module from sqlalchemy import create_engine, event as sqlalchemy_event, exc, func, select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import scoped_session, sessionmaker @@ -67,6 +68,7 @@ from .models import ( Base, Events, RecorderRuns, + StateAttributes, States, StatisticsRuns, process_timestamp, @@ -131,6 +133,15 @@ KEEPALIVE_TIME = 30 # States and Events objects EXPIRE_AFTER_COMMITS = 120 +# The number of attribute ids to cache in memory +# +# Based on: +# - The number of overlapping attributes +# - How frequently states with overlapping attributes will change +# - How much memory our low end hardware has +STATE_ATTRIBUTES_ID_CACHE_SIZE = 2048 + + DB_LOCK_TIMEOUT = 30 DB_LOCK_QUEUE_CHECK_TIMEOUT = 1 @@ -541,6 +552,8 @@ class Recorder(threading.Thread): self._commits_without_expire = 0 self._keepalive_count = 0 self._old_states: dict[str, States] = {} + self._state_attributes_ids: LRU = LRU(STATE_ATTRIBUTES_ID_CACHE_SIZE) + self._pending_state_attributes: dict[str, StateAttributes] = {} self._pending_expunge: list[States] = [] self.event_session = None self.get_session = None @@ -964,33 +977,58 @@ class Recorder(threading.Thread): dbevent.event_data = None else: dbevent = Events.from_event(event) - self.event_session.add(dbevent) except (TypeError, ValueError): _LOGGER.warning("Event is not JSON serializable: %s", event) return + self.event_session.add(dbevent) if event.event_type == EVENT_STATE_CHANGED: try: dbstate = States.from_event(event) - has_new_state = event.data.get("new_state") - if dbstate.entity_id in self._old_states: - old_state = self._old_states.pop(dbstate.entity_id) - if old_state.state_id: - dbstate.old_state_id = old_state.state_id - else: - dbstate.old_state = old_state - if not has_new_state: - dbstate.state = None - dbstate.event = dbevent - self.event_session.add(dbstate) - if has_new_state: - self._old_states[dbstate.entity_id] = dbstate - self._pending_expunge.append(dbstate) - except (TypeError, ValueError): + dbstate_attributes = StateAttributes.from_event(event) + except (TypeError, ValueError) as ex: _LOGGER.warning( - "State is not JSON serializable: %s", + "State is not JSON serializable: %s: %s", event.data.get("new_state"), + ex, ) + return + + dbstate.attributes = None + shared_attrs = dbstate_attributes.shared_attrs + # Matching attributes found in the pending commit + if pending_attributes := self._pending_state_attributes.get(shared_attrs): + dbstate.state_attributes = pending_attributes + # Matching attributes id found in the cache + elif attributes_id := self._state_attributes_ids.get(shared_attrs): + dbstate.attributes_id = attributes_id + # Matching attributes found in the database + elif ( + attributes := self.event_session.query(StateAttributes.attributes_id) + .filter(StateAttributes.hash == dbstate_attributes.hash) + .filter(StateAttributes.shared_attrs == shared_attrs) + .first() + ): + dbstate.attributes_id = attributes[0] + self._state_attributes_ids[shared_attrs] = attributes[0] + # No matching attributes found, save them in the DB + else: + dbstate.state_attributes = dbstate_attributes + self._pending_state_attributes[shared_attrs] = dbstate_attributes + self.event_session.add(dbstate_attributes) + + if old_state := self._old_states.pop(dbstate.entity_id, None): + if old_state.state_id: + dbstate.old_state_id = old_state.state_id + else: + dbstate.old_state = old_state + if event.data.get("new_state"): + self._old_states[dbstate.entity_id] = dbstate + self._pending_expunge.append(dbstate) + else: + dbstate.state = None + self.event_session.add(dbstate) + dbstate.event = dbevent # If they do not have a commit interval # than we commit right away @@ -1042,6 +1080,7 @@ class Recorder(threading.Thread): if dbstate in self.event_session: self.event_session.expunge(dbstate) self._pending_expunge = [] + self._pending_state_attributes = {} self.event_session.commit() # Expire is an expensive operation (frequently more expensive @@ -1062,6 +1101,8 @@ class Recorder(threading.Thread): def _close_event_session(self): """Close the event session.""" self._old_states = {} + self._state_attributes_ids = {} + self._pending_state_attributes = {} if not self.event_session: return diff --git a/homeassistant/components/recorder/history.py b/homeassistant/components/recorder/history.py index 8be60a60ac6..13574bc654f 100644 --- a/homeassistant/components/recorder/history.py +++ b/homeassistant/components/recorder/history.py @@ -13,7 +13,12 @@ from homeassistant.components import recorder from homeassistant.core import split_entity_id import homeassistant.util.dt as dt_util -from .models import LazyState, States, process_timestamp_to_utc_isoformat +from .models import ( + LazyState, + StateAttributes, + States, + process_timestamp_to_utc_isoformat, +) from .util import execute, session_scope # mypy: allow-untyped-defs, no-check-untyped-defs @@ -46,6 +51,7 @@ QUERY_STATES = [ States.attributes, States.last_changed, States.last_updated, + StateAttributes.shared_attrs, ] HISTORY_BAKERY = "recorder_history_bakery" @@ -114,6 +120,9 @@ def get_significant_states_with_session( if end_time is not None: baked_query += lambda q: q.filter(States.last_updated < bindparam("end_time")) + baked_query += lambda q: q.outerjoin( + StateAttributes, States.attributes_id == StateAttributes.attributes_id + ) baked_query += lambda q: q.order_by(States.entity_id, States.last_updated) states = execute( @@ -159,6 +168,9 @@ def state_changes_during_period(hass, start_time, end_time=None, entity_id=None) baked_query += lambda q: q.filter_by(entity_id=bindparam("entity_id")) entity_id = entity_id.lower() + baked_query += lambda q: q.outerjoin( + StateAttributes, States.attributes_id == StateAttributes.attributes_id + ) baked_query += lambda q: q.order_by(States.entity_id, States.last_updated) states = execute( @@ -186,6 +198,9 @@ def get_last_state_changes(hass, number_of_states, entity_id): baked_query += lambda q: q.filter_by(entity_id=bindparam("entity_id")) entity_id = entity_id.lower() + baked_query += lambda q: q.outerjoin( + StateAttributes, States.attributes_id == StateAttributes.attributes_id + ) baked_query += lambda q: q.order_by( States.entity_id, States.last_updated.desc() ) @@ -263,6 +278,8 @@ def _get_states_with_session( query = query.join( most_recent_state_ids, States.state_id == most_recent_state_ids.c.max_state_id, + ).outerjoin( + StateAttributes, (States.attributes_id == StateAttributes.attributes_id) ) else: # We did not get an include-list of entities, query all states in the inner @@ -301,7 +318,9 @@ def _get_states_with_session( query = query.filter(~States.domain.in_(IGNORE_DOMAINS)) if filters: query = filters.apply(query) - + query = query.outerjoin( + StateAttributes, (States.attributes_id == StateAttributes.attributes_id) + ) return [LazyState(row) for row in execute(query)] @@ -315,6 +334,9 @@ def _get_single_entity_states_with_session(hass, session, utc_point_in_time, ent States.last_updated < bindparam("utc_point_in_time"), States.entity_id == bindparam("entity_id"), ) + baked_query += lambda q: q.outerjoin( + StateAttributes, States.attributes_id == StateAttributes.attributes_id + ) baked_query += lambda q: q.order_by(States.last_updated.desc()) baked_query += lambda q: q.limit(1) diff --git a/homeassistant/components/recorder/manifest.json b/homeassistant/components/recorder/manifest.json index d4ac7fa91eb..fdadbfb328e 100644 --- a/homeassistant/components/recorder/manifest.json +++ b/homeassistant/components/recorder/manifest.json @@ -2,7 +2,7 @@ "domain": "recorder", "name": "Recorder", "documentation": "https://www.home-assistant.io/integrations/recorder", - "requirements": ["sqlalchemy==1.4.32"], + "requirements": ["sqlalchemy==1.4.32","fnvhash==0.1.0","lru-dict==1.1.7"], "codeowners": ["@home-assistant/core"], "quality_scale": "internal", "iot_class": "local_push" diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 48dca4d42ed..096ec380cf6 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -638,6 +638,9 @@ def _apply_update(instance, new_version, old_version): # noqa: C901 "statistics_short_term", "ix_statistics_short_term_statistic_id_start", ) + elif new_version == 25: + _add_columns(instance, "states", ["attributes_id INTEGER(20)"]) + _create_index(instance, "states", "ix_states_attributes_id") else: raise ValueError(f"No schema migration defined for version {new_version}") diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py index b49189a9c3a..329af989568 100644 --- a/homeassistant/components/recorder/models.py +++ b/homeassistant/components/recorder/models.py @@ -6,7 +6,9 @@ import json import logging from typing import TypedDict, overload +from fnvhash import fnv1a_32 from sqlalchemy import ( + BigInteger, Boolean, Column, DateTime, @@ -40,7 +42,7 @@ import homeassistant.util.dt as dt_util # pylint: disable=invalid-name Base = declarative_base() -SCHEMA_VERSION = 24 +SCHEMA_VERSION = 25 _LOGGER = logging.getLogger(__name__) @@ -48,6 +50,7 @@ DB_TIMEZONE = "+00:00" TABLE_EVENTS = "events" TABLE_STATES = "states" +TABLE_STATE_ATTRIBUTES = "state_attributes" TABLE_RECORDER_RUNS = "recorder_runs" TABLE_SCHEMA_CHANGES = "schema_changes" TABLE_STATISTICS = "statistics" @@ -66,6 +69,9 @@ ALL_TABLES = [ TABLE_STATISTICS_SHORT_TERM, ] +EMPTY_JSON_OBJECT = "{}" + + DATETIME_TYPE = DateTime(timezone=True).with_variant( mysql.DATETIME(timezone=True, fsp=6), "mysql" ) @@ -161,8 +167,12 @@ class States(Base): # type: ignore[misc,valid-type] last_changed = Column(DATETIME_TYPE, default=dt_util.utcnow) last_updated = Column(DATETIME_TYPE, default=dt_util.utcnow, index=True) old_state_id = Column(Integer, ForeignKey("states.state_id"), index=True) + attributes_id = Column( + Integer, ForeignKey("state_attributes.attributes_id"), index=True + ) event = relationship("Events", uselist=False) old_state = relationship("States", remote_side=[state_id]) + state_attributes = relationship("StateAttributes") def __repr__(self) -> str: """Return string representation of instance for debugging.""" @@ -171,7 +181,7 @@ class States(Base): # type: ignore[misc,valid-type] f"id={self.state_id}, domain='{self.domain}', entity_id='{self.entity_id}', " f"state='{self.state}', event_id='{self.event_id}', " f"last_updated='{self.last_updated.isoformat(sep=' ', timespec='seconds')}', " - f"old_state_id={self.old_state_id}" + f"old_state_id={self.old_state_id}, attributes_id={self.attributes_id}" f")>" ) @@ -182,20 +192,17 @@ class States(Base): # type: ignore[misc,valid-type] state = event.data.get("new_state") dbstate = States(entity_id=entity_id) + dbstate.attributes = None # State got deleted if state is None: dbstate.state = "" dbstate.domain = split_entity_id(entity_id)[0] - dbstate.attributes = "{}" dbstate.last_changed = event.time_fired dbstate.last_updated = event.time_fired else: dbstate.domain = state.domain dbstate.state = state.state - dbstate.attributes = json.dumps( - dict(state.attributes), cls=JSONEncoder, separators=(",", ":") - ) dbstate.last_changed = state.last_changed dbstate.last_updated = state.last_updated @@ -207,7 +214,9 @@ class States(Base): # type: ignore[misc,valid-type] return State( self.entity_id, self.state, - json.loads(self.attributes), + # Join the state_attributes table on attributes_id to get the attributes + # for newer states + json.loads(self.attributes) if self.attributes else {}, process_timestamp(self.last_changed), process_timestamp(self.last_updated), # Join the events table on event_id to get the context instead @@ -221,6 +230,53 @@ class States(Base): # type: ignore[misc,valid-type] return None +class StateAttributes(Base): # type: ignore[misc,valid-type] + """State attribute change history.""" + + __table_args__ = ( + {"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"}, + ) + __tablename__ = TABLE_STATE_ATTRIBUTES + attributes_id = Column(Integer, Identity(), primary_key=True) + hash = Column(BigInteger, index=True) + # Note that this is not named attributes to avoid confusion with the states table + shared_attrs = Column(Text().with_variant(mysql.LONGTEXT, "mysql")) + + def __repr__(self) -> str: + """Return string representation of instance for debugging.""" + return ( + f"" + ) + + @staticmethod + def from_event(event): + """Create object from a state_changed event.""" + state = event.data.get("new_state") + dbstate = StateAttributes() + # State got deleted + if state is None: + dbstate.shared_attrs = "{}" + else: + dbstate.shared_attrs = json.dumps( + dict(state.attributes), + cls=JSONEncoder, + separators=(",", ":"), + ) + dbstate.hash = fnv1a_32(dbstate.shared_attrs.encode("utf-8")) + return dbstate + + def to_native(self): + """Convert to an HA state object.""" + try: + return json.loads(self.shared_attrs) + except ValueError: + # When json.loads fails + _LOGGER.exception("Error converting row to state attributes: %s", self) + return {} + + class StatisticResult(TypedDict): """Statistic result data class. @@ -492,12 +548,18 @@ class LazyState(State): @property # type: ignore[override] def attributes(self): """State attributes.""" - if not self._attributes: + if self._attributes is None: + source = self._row.shared_attrs or self._row.attributes + if source == EMPTY_JSON_OBJECT or source is None: + self._attributes = {} + return self._attributes try: - self._attributes = json.loads(self._row.attributes) + self._attributes = json.loads(source) except ValueError: # When json.loads fails - _LOGGER.exception("Error converting row to state: %s", self._row) + _LOGGER.exception( + "Error converting row to state attributes: %s", self._row + ) self._attributes = {} return self._attributes @@ -549,18 +611,22 @@ class LazyState(State): To be used for JSON serialization. """ - if self._last_changed: - last_changed_isoformat = self._last_changed.isoformat() - else: + if self._last_changed is None and self._last_updated is None: last_changed_isoformat = process_timestamp_to_utc_isoformat( self._row.last_changed ) - if self._last_updated: - last_updated_isoformat = self._last_updated.isoformat() + if self._row.last_changed == self._row.last_updated: + last_updated_isoformat = last_changed_isoformat + else: + last_updated_isoformat = process_timestamp_to_utc_isoformat( + self._row.last_updated + ) else: - last_updated_isoformat = process_timestamp_to_utc_isoformat( - self._row.last_updated - ) + last_changed_isoformat = self.last_changed.isoformat() + if self.last_changed == self.last_updated: + last_updated_isoformat = last_changed_isoformat + else: + last_updated_isoformat = self.last_updated.isoformat() return { "entity_id": self.entity_id, "state": self.state, diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index dd80fb15479..0109d68f0f5 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -10,8 +10,17 @@ from sqlalchemy import func from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import distinct +from homeassistant.const import EVENT_STATE_CHANGED + from .const import MAX_ROWS_TO_PURGE -from .models import Events, RecorderRuns, States, StatisticsRuns, StatisticsShortTerm +from .models import ( + Events, + RecorderRuns, + StateAttributes, + States, + StatisticsRuns, + StatisticsShortTerm, +) from .repack import repack_database from .util import retryable_database_job, session_scope @@ -37,7 +46,12 @@ def purge_old_data( with session_scope(session=instance.get_session()) as session: # type: ignore[misc] # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record event_ids = _select_event_ids_to_purge(session, purge_before) - state_ids = _select_state_ids_to_purge(session, purge_before, event_ids) + state_ids, attributes_ids = _select_state_and_attributes_ids_to_purge( + session, purge_before, event_ids + ) + attributes_ids = _remove_attributes_ids_used_by_newer_states( + session, purge_before, attributes_ids + ) statistics_runs = _select_statistics_runs_to_purge(session, purge_before) short_term_statistics = _select_short_term_statistics_to_purge( session, purge_before @@ -46,6 +60,9 @@ def purge_old_data( if state_ids: _purge_state_ids(instance, session, state_ids) + if attributes_ids: + _purge_attributes_ids(instance, session, attributes_ids) + if event_ids: _purge_event_ids(session, event_ids) @@ -82,20 +99,47 @@ def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list return [event.event_id for event in events] -def _select_state_ids_to_purge( +def _select_state_and_attributes_ids_to_purge( session: Session, purge_before: datetime, event_ids: list[int] -) -> set[int]: +) -> tuple[set[int], set[int]]: """Return a list of state ids to purge.""" if not event_ids: - return set() + return set(), set() states = ( - session.query(States.state_id) + session.query(States.state_id, States.attributes_id) .filter(States.last_updated < purge_before) .filter(States.event_id.in_(event_ids)) .all() ) _LOGGER.debug("Selected %s state ids to remove", len(states)) - return {state.state_id for state in states} + state_ids = set() + attributes_ids = set() + for state in states: + state_ids.add(state.state_id) + if state.attributes_id: + attributes_ids.add(state.attributes_id) + return state_ids, attributes_ids + + +def _remove_attributes_ids_used_by_newer_states( + session: Session, purge_before: datetime, attributes_ids: set[int] +) -> set[int]: + """Remove attributes ids that are still in use for states we are not purging yet.""" + if not attributes_ids: + return set() + keep_attributes_ids = { + state.attributes_id + for state in session.query(States.attributes_id) + .filter(States.last_updated >= purge_before) + .filter(States.attributes_id.in_(attributes_ids)) + .group_by(States.attributes_id) + } + to_remove = attributes_ids - keep_attributes_ids + _LOGGER.debug( + "Selected %s shared attributes to remove", + len(to_remove), + ) + return to_remove def _select_statistics_runs_to_purge( @@ -143,7 +187,9 @@ def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int]) disconnected_rows = ( session.query(States) .filter(States.old_state_id.in_(state_ids)) - .update({"old_state_id": None}, synchronize_session=False) + .update( + {"old_state_id": None, "attributes_id": None}, synchronize_session=False + ) ) _LOGGER.debug("Updated %s states to remove old_state_id", disconnected_rows) @@ -175,6 +221,44 @@ def _evict_purged_states_from_old_states_cache( old_states.pop(old_state_reversed[purged_state_id], None) +def _evict_purged_attributes_from_attributes_cache( + instance: Recorder, purged_attributes_ids: set[int] +) -> None: + """Evict purged attribute ids from the attribute ids cache.""" + # Make a map from attributes_id to the attributes json + state_attributes_ids = ( + instance._state_attributes_ids # pylint: disable=protected-access + ) + state_attributes_ids_reversed = { + attributes_id: attributes + for attributes, attributes_id in state_attributes_ids.items() + } + + # Evict any purged attributes from the state_attributes_ids cache + for purged_attribute_id in purged_attributes_ids.intersection( + state_attributes_ids_reversed + ): + state_attributes_ids.pop( + state_attributes_ids_reversed[purged_attribute_id], None + ) + + +def _purge_attributes_ids( + instance: Recorder, session: Session, attributes_ids: set[int] +) -> None: + """Delete old attributes ids.""" + + deleted_rows = ( + session.query(StateAttributes) + .filter(StateAttributes.attributes_id.in_(attributes_ids)) + .delete(synchronize_session=False) + ) + _LOGGER.debug("Deleted %s attribute states", deleted_rows) + + # Evict any entries in the state_attributes_ids cache referring to a purged state + _evict_purged_attributes_from_attributes_cache(instance, attributes_ids) + + def _purge_statistics_runs(session: Session, statistics_runs: list[int]) -> None: """Delete by run_id.""" deleted_rows = ( @@ -248,26 +332,52 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool: return True +def _remove_attributes_ids_used_by_other_entities( + session: Session, entities: list[str], attributes_ids: set[int] +) -> set[int]: + """Remove attributes ids that are still in use for entitiy_ids we are not purging yet.""" + if not attributes_ids: + return set() + keep_attributes_ids = { + state.attributes_id + for state in session.query(States.attributes_id) + .filter(States.entity_id.not_in(entities)) + .filter(States.attributes_id.in_(attributes_ids)) + .group_by(States.attributes_id) + } + to_remove = attributes_ids - keep_attributes_ids + _LOGGER.debug( + "Selected %s shared attributes to remove", + len(to_remove), + ) + return to_remove + + def _purge_filtered_states( instance: Recorder, session: Session, excluded_entity_ids: list[str] ) -> None: """Remove filtered states and linked events.""" state_ids: list[int] + attributes_ids: list[int] event_ids: list[int | None] - state_ids, event_ids = zip( + state_ids, attributes_ids, event_ids = zip( *( - session.query(States.state_id, States.event_id) + session.query(States.state_id, States.attributes_id, States.event_id) .filter(States.entity_id.in_(excluded_entity_ids)) .limit(MAX_ROWS_TO_PURGE) .all() ) ) event_ids = [id_ for id_ in event_ids if id_ is not None] + attributes_ids_set = _remove_attributes_ids_used_by_other_entities( + session, excluded_entity_ids, {id_ for id_ in attributes_ids if id_ is not None} + ) _LOGGER.debug( "Selected %s state_ids to remove that should be filtered", len(state_ids) ) _purge_state_ids(instance, session, set(state_ids)) _purge_event_ids(session, event_ids) # type: ignore[arg-type] # type of event_ids already narrowed to 'list[int]' + _purge_attributes_ids(instance, session, attributes_ids_set) def _purge_filtered_events( @@ -290,6 +400,9 @@ def _purge_filtered_events( state_ids: set[int] = {state.state_id for state in states} _purge_state_ids(instance, session, state_ids) _purge_event_ids(session, event_ids) + if EVENT_STATE_CHANGED in excluded_event_types: + session.query(StateAttributes).delete(synchronize_session=False) + instance._state_attributes_ids = {} # pylint: disable=protected-access @retryable_database_job("purge") diff --git a/homeassistant/components/statistics/sensor.py b/homeassistant/components/statistics/sensor.py index fb5daa97475..6d73f8ea6e7 100644 --- a/homeassistant/components/statistics/sensor.py +++ b/homeassistant/components/statistics/sensor.py @@ -12,7 +12,7 @@ from typing import Any, Literal, cast import voluptuous as vol from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN -from homeassistant.components.recorder.models import States +from homeassistant.components.recorder.models import StateAttributes, States from homeassistant.components.recorder.util import execute, session_scope from homeassistant.components.sensor import ( PLATFORM_SCHEMA, @@ -482,9 +482,10 @@ class StatisticsSensor(SensorEntity): """ _LOGGER.debug("%s: initializing values from the database", self.entity_id) + states = [] with session_scope(hass=self.hass) as session: - query = session.query(States).filter( + query = session.query(States, StateAttributes).filter( States.entity_id == self._source_entity_id.lower() ) @@ -499,10 +500,18 @@ class StatisticsSensor(SensorEntity): else: _LOGGER.debug("%s: retrieving all records", self.entity_id) + query = query.outerjoin( + StateAttributes, States.attributes_id == StateAttributes.attributes_id + ) query = query.order_by(States.last_updated.desc()).limit( self._samples_max_buffer_size ) - states = execute(query, to_native=True, validate_entity_ids=False) + if results := execute(query, to_native=False, validate_entity_ids=False): + for state, attributes in results: + native = state.to_native() + if not native.attributes: + native.attributes = attributes.to_native() + states.append(native) if states: for state in reversed(states): diff --git a/homeassistant/package_constraints.txt b/homeassistant/package_constraints.txt index 2a9af7ff1d5..d8d88133149 100644 --- a/homeassistant/package_constraints.txt +++ b/homeassistant/package_constraints.txt @@ -13,11 +13,13 @@ bcrypt==3.1.7 certifi>=2021.5.30 ciso8601==2.2.0 cryptography==35.0.0 +fnvhash==0.1.0 hass-nabucasa==0.54.0 home-assistant-frontend==20220317.0 httpx==0.22.0 ifaddr==0.1.7 jinja2==3.0.3 +lru-dict==1.1.7 paho-mqtt==1.6.1 pillow==9.0.1 pip>=21.0,<22.1 diff --git a/requirements_all.txt b/requirements_all.txt index b9193e46fd7..ba4ec4536e5 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -651,6 +651,7 @@ flipr-api==1.4.2 flux_led==0.28.27 # homeassistant.components.homekit +# homeassistant.components.recorder fnvhash==0.1.0 # homeassistant.components.foobot @@ -952,6 +953,9 @@ logi_circle==0.2.2 # homeassistant.components.london_underground london-tube-status==0.2 +# homeassistant.components.recorder +lru-dict==1.1.7 + # homeassistant.components.luftdaten luftdaten==0.7.2 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 5adb6e80ff7..da1d72eb5e2 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -443,6 +443,7 @@ flipr-api==1.4.2 flux_led==0.28.27 # homeassistant.components.homekit +# homeassistant.components.recorder fnvhash==0.1.0 # homeassistant.components.foobot @@ -636,6 +637,9 @@ libsoundtouch==0.8 # homeassistant.components.logi_circle logi_circle==0.2.2 +# homeassistant.components.recorder +lru-dict==1.1.7 + # homeassistant.components.luftdaten luftdaten==0.7.2 diff --git a/tests/components/logbook/test_init.py b/tests/components/logbook/test_init.py index fab1121542f..29dea015921 100644 --- a/tests/components/logbook/test_init.py +++ b/tests/components/logbook/test_init.py @@ -43,7 +43,11 @@ from tests.common import ( async_init_recorder_component, mock_platform, ) -from tests.components.recorder.common import trigger_db_commit +from tests.components.recorder.common import ( + async_trigger_db_commit, + async_wait_recording_done_without_instance, + trigger_db_commit, +) EMPTY_CONFIG = logbook.CONFIG_SCHEMA({logbook.DOMAIN: {}}) @@ -280,12 +284,14 @@ def create_state_changed_event_from_old_new( "attributes" "state_id", "old_state_id", + "shared_attrs", ], ) row.event_type = EVENT_STATE_CHANGED row.event_data = "{}" row.attributes = attributes_json + row.shared_attrs = attributes_json row.time_fired = event_time_fired row.state = new_state and new_state.get("state") row.entity_id = entity_id @@ -636,6 +642,44 @@ async def test_logbook_entity_filter_with_automations(hass, hass_client): assert json_dict[0]["entity_id"] == entity_id_second +async def test_logbook_entity_no_longer_in_state_machine(hass, hass_client): + """Test the logbook view with an entity that hass been removed from the state machine.""" + await async_init_recorder_component(hass) + await async_setup_component(hass, "logbook", {}) + await async_setup_component(hass, "automation", {}) + await async_setup_component(hass, "script", {}) + + await async_wait_recording_done_without_instance(hass) + + entity_id_test = "alarm_control_panel.area_001" + hass.states.async_set( + entity_id_test, STATE_OFF, {ATTR_FRIENDLY_NAME: "Alarm Control Panel"} + ) + hass.states.async_set( + entity_id_test, STATE_ON, {ATTR_FRIENDLY_NAME: "Alarm Control Panel"} + ) + + async_trigger_db_commit(hass) + await async_wait_recording_done_without_instance(hass) + + hass.states.async_remove(entity_id_test) + + client = await hass_client() + + # Today time 00:00:00 + start = dt_util.utcnow().date() + start_date = datetime(start.year, start.month, start.day) + + # Test today entries with filter by end_time + end_time = start + timedelta(hours=24) + response = await client.get( + f"/api/logbook/{start_date.isoformat()}?end_time={end_time}" + ) + assert response.status == HTTPStatus.OK + json_dict = await response.json() + assert json_dict[0]["name"] == "Alarm Control Panel" + + async def test_filter_continuous_sensor_values(hass, hass_client): """Test remove continuous sensor events from logbook.""" await async_init_recorder_component(hass) diff --git a/tests/components/plant/test_init.py b/tests/components/plant/test_init.py index 14e0f3668b0..602f368abcd 100644 --- a/tests/components/plant/test_init.py +++ b/tests/components/plant/test_init.py @@ -1,9 +1,6 @@ """Unit tests for platform/plant.py.""" from datetime import datetime, timedelta -import pytest - -from homeassistant.components import recorder import homeassistant.components.plant as plant from homeassistant.const import ( ATTR_UNIT_OF_MEASUREMENT, @@ -12,12 +9,12 @@ from homeassistant.const import ( STATE_OK, STATE_PROBLEM, STATE_UNAVAILABLE, - STATE_UNKNOWN, ) from homeassistant.core import State from homeassistant.setup import async_setup_component -from tests.common import init_recorder_component +from tests.common import async_init_recorder_component +from tests.components.recorder.common import async_wait_recording_done_without_instance GOOD_DATA = { "moisture": 50, @@ -148,19 +145,13 @@ async def test_state_problem_if_unavailable(hass): assert state.attributes[plant.READING_MOISTURE] == STATE_UNAVAILABLE -@pytest.mark.skipif( - plant.ENABLE_LOAD_HISTORY is False, - reason="tests for loading from DB are unstable, thus" - "this feature is turned of until tests become" - "stable", -) async def test_load_from_db(hass): """Test bootstrapping the brightness history from the database. This test can should only be executed if the loading of the history is enabled via plant.ENABLE_LOAD_HISTORY. """ - init_recorder_component(hass) + await async_init_recorder_component(hass) plant_name = "wise_plant" for value in [20, 30, 10]: @@ -169,7 +160,7 @@ async def test_load_from_db(hass): ) await hass.async_block_till_done() # wait for the recorder to really store the data - hass.data[recorder.DATA_INSTANCE].block_till_done() + await async_wait_recording_done_without_instance(hass) assert await async_setup_component( hass, plant.DOMAIN, {plant.DOMAIN: {plant_name: GOOD_CONFIG}} @@ -177,7 +168,7 @@ async def test_load_from_db(hass): await hass.async_block_till_done() state = hass.states.get(f"plant.{plant_name}") - assert state.state == STATE_UNKNOWN + assert state.state == STATE_PROBLEM max_brightness = state.attributes.get(plant.ATTR_MAX_BRIGHTNESS_HISTORY) assert max_brightness == 30 diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index ab05c6ec05b..ef1ffd1133f 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -31,6 +31,7 @@ from homeassistant.components.recorder.const import DATA_INSTANCE from homeassistant.components.recorder.models import ( Events, RecorderRuns, + StateAttributes, States, StatisticsRuns, process_timestamp, @@ -166,10 +167,13 @@ async def test_saving_state( await async_wait_recording_done(hass, instance) with session_scope(hass=hass) as session: - db_states = list(session.query(States)) + db_states = [] + for db_state, db_state_attributes in session.query(States, StateAttributes): + db_states.append(db_state) + state = db_state.to_native() + state.attributes = db_state_attributes.to_native() assert len(db_states) == 1 assert db_states[0].event_id > 0 - state = db_states[0].to_native() assert state == _state_empty_context(hass, entity_id) @@ -400,7 +404,14 @@ def _add_entities(hass, entity_ids): wait_recording_done(hass) with session_scope(hass=hass) as session: - return [st.to_native() for st in session.query(States)] + states = [] + for state, state_attributes in session.query(States, StateAttributes).outerjoin( + StateAttributes, States.attributes_id == StateAttributes.attributes_id + ): + native_state = state.to_native() + native_state.attributes = state_attributes.to_native() + states.append(native_state) + return states def _add_events(hass, events): diff --git a/tests/components/recorder/test_models.py b/tests/components/recorder/test_models.py index 9f32f1c5746..c3de4161ea1 100644 --- a/tests/components/recorder/test_models.py +++ b/tests/components/recorder/test_models.py @@ -1,5 +1,6 @@ """The tests for the Recorder component.""" -from datetime import datetime +from datetime import datetime, timedelta +from unittest.mock import PropertyMock import pytest from sqlalchemy import create_engine @@ -8,7 +9,9 @@ from sqlalchemy.orm import scoped_session, sessionmaker from homeassistant.components.recorder.models import ( Base, Events, + LazyState, RecorderRuns, + StateAttributes, States, process_timestamp, process_timestamp_to_utc_isoformat, @@ -16,8 +19,7 @@ from homeassistant.components.recorder.models import ( from homeassistant.const import EVENT_STATE_CHANGED import homeassistant.core as ha from homeassistant.exceptions import InvalidEntityFormatError -from homeassistant.util import dt -import homeassistant.util.dt as dt_util +from homeassistant.util import dt, dt as dt_util def test_from_event_to_db_event(): @@ -40,6 +42,27 @@ def test_from_event_to_db_state(): assert state == States.from_event(event).to_native() +def test_from_event_to_db_state_attributes(): + """Test converting event to db state attributes.""" + attrs = {"this_attr": True} + state = ha.State("sensor.temperature", "18", attrs) + event = ha.Event( + EVENT_STATE_CHANGED, + {"entity_id": "sensor.temperature", "old_state": None, "new_state": state}, + context=state.context, + ) + assert StateAttributes.from_event(event).to_native() == attrs + + +def test_handling_broken_json_state_attributes(caplog): + """Test we handle broken json in state attributes.""" + state_attributes = StateAttributes( + attributes_id=444, hash=1234, shared_attrs="{NOT_PARSE}" + ) + assert state_attributes.to_native() == {} + assert "Error converting row to state attributes" in caplog.text + + def test_from_event_to_delete_state(): """Test converting deleting state event to db state.""" event = ha.Event( @@ -215,3 +238,97 @@ async def test_event_to_db_model(): native = Events.from_event(event, event_data="{}").to_native() event.data = {} assert native == event + + +async def test_lazy_state_handles_include_json(caplog): + """Test that the LazyState class handles invalid json.""" + row = PropertyMock( + entity_id="sensor.invalid", + shared_attrs="{INVALID_JSON}", + ) + assert LazyState(row).attributes == {} + assert "Error converting row to state attributes" in caplog.text + + +async def test_lazy_state_prefers_shared_attrs_over_attrs(caplog): + """Test that the LazyState prefers shared_attrs over attributes.""" + row = PropertyMock( + entity_id="sensor.invalid", + shared_attrs='{"shared":true}', + attributes='{"shared":false}', + ) + assert LazyState(row).attributes == {"shared": True} + + +async def test_lazy_state_handles_different_last_updated_and_last_changed(caplog): + """Test that the LazyState handles different last_updated and last_changed.""" + now = datetime(2021, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC) + row = PropertyMock( + entity_id="sensor.valid", + state="off", + shared_attrs='{"shared":true}', + last_updated=now, + last_changed=now - timedelta(seconds=60), + ) + lstate = LazyState(row) + assert lstate.as_dict() == { + "attributes": {"shared": True}, + "entity_id": "sensor.valid", + "last_changed": "2021-06-12T03:03:01.000323+00:00", + "last_updated": "2021-06-12T03:04:01.000323+00:00", + "state": "off", + } + assert lstate.last_updated == row.last_updated + assert lstate.last_changed == row.last_changed + assert lstate.as_dict() == { + "attributes": {"shared": True}, + "entity_id": "sensor.valid", + "last_changed": "2021-06-12T03:03:01.000323+00:00", + "last_updated": "2021-06-12T03:04:01.000323+00:00", + "state": "off", + } + + +async def test_lazy_state_handles_same_last_updated_and_last_changed(caplog): + """Test that the LazyState handles same last_updated and last_changed.""" + now = datetime(2021, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC) + row = PropertyMock( + entity_id="sensor.valid", + state="off", + shared_attrs='{"shared":true}', + last_updated=now, + last_changed=now, + ) + lstate = LazyState(row) + assert lstate.as_dict() == { + "attributes": {"shared": True}, + "entity_id": "sensor.valid", + "last_changed": "2021-06-12T03:04:01.000323+00:00", + "last_updated": "2021-06-12T03:04:01.000323+00:00", + "state": "off", + } + assert lstate.last_updated == row.last_updated + assert lstate.last_changed == row.last_changed + assert lstate.as_dict() == { + "attributes": {"shared": True}, + "entity_id": "sensor.valid", + "last_changed": "2021-06-12T03:04:01.000323+00:00", + "last_updated": "2021-06-12T03:04:01.000323+00:00", + "state": "off", + } + lstate.last_updated = datetime(2020, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC) + assert lstate.as_dict() == { + "attributes": {"shared": True}, + "entity_id": "sensor.valid", + "last_changed": "2021-06-12T03:04:01.000323+00:00", + "last_updated": "2020-06-12T03:04:01.000323+00:00", + "state": "off", + } + lstate.last_changed = datetime(2020, 6, 12, 3, 4, 1, 323, tzinfo=dt_util.UTC) + assert lstate.as_dict() == { + "attributes": {"shared": True}, + "entity_id": "sensor.valid", + "last_changed": "2020-06-12T03:04:01.000323+00:00", + "last_updated": "2020-06-12T03:04:01.000323+00:00", + "state": "off", + } diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index 77b5ad3d191..1bdb391541e 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -13,13 +13,14 @@ from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE from homeassistant.components.recorder.models import ( Events, RecorderRuns, + StateAttributes, States, StatisticsRuns, StatisticsShortTerm, ) from homeassistant.components.recorder.purge import purge_old_data from homeassistant.components.recorder.util import session_scope -from homeassistant.const import EVENT_STATE_CHANGED +from homeassistant.const import EVENT_STATE_CHANGED, STATE_ON from homeassistant.core import HomeAssistant from homeassistant.helpers.typing import ConfigType from homeassistant.util import dt as dt_util @@ -44,9 +45,12 @@ async def test_purge_old_states( # make sure we start with 6 states with session_scope(hass=hass) as session: states = session.query(States) + state_attributes = session.query(StateAttributes) + assert states.count() == 6 assert states[0].old_state_id is None assert states[-1].old_state_id == states[-2].state_id + assert state_attributes.count() == 3 events = session.query(Events).filter(Events.event_type == "state_changed") assert events.count() == 6 @@ -58,6 +62,8 @@ async def test_purge_old_states( finished = purge_old_data(instance, purge_before, repack=False) assert not finished assert states.count() == 2 + assert state_attributes.count() == 1 + assert "test.recorder2" in instance._old_states states_after_purge = session.query(States) @@ -67,6 +73,8 @@ async def test_purge_old_states( finished = purge_old_data(instance, purge_before, repack=False) assert finished assert states.count() == 2 + assert state_attributes.count() == 1 + assert "test.recorder2" in instance._old_states # run purge_old_data again @@ -74,6 +82,8 @@ async def test_purge_old_states( finished = purge_old_data(instance, purge_before, repack=False) assert not finished assert states.count() == 0 + assert state_attributes.count() == 0 + assert "test.recorder2" not in instance._old_states # Add some more states @@ -90,6 +100,9 @@ async def test_purge_old_states( assert events.count() == 6 assert "test.recorder2" in instance._old_states + state_attributes = session.query(StateAttributes) + assert state_attributes.count() == 3 + async def test_purge_old_states_encouters_database_corruption( hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT @@ -368,6 +381,14 @@ async def test_purge_edge_case( last_changed=timestamp, last_updated=timestamp, event_id=1001, + attributes_id=1002, + ) + ) + session.add( + StateAttributes( + shared_attrs="{}", + hash=1234, + attributes_id=1002, ) ) @@ -382,6 +403,9 @@ async def test_purge_edge_case( states = session.query(States) assert states.count() == 1 + state_attributes = session.query(StateAttributes) + assert state_attributes.count() == 1 + events = session.query(Events).filter(Events.event_type == "EVENT_TEST_PURGE") assert events.count() == 1 @@ -426,6 +450,14 @@ async def test_purge_cutoff_date( last_changed=timestamp_keep, last_updated=timestamp_keep, event_id=1000, + attributes_id=1000, + ) + ) + session.add( + StateAttributes( + shared_attrs="{}", + hash=1234, + attributes_id=1000, ) ) for row in range(1, rows): @@ -447,6 +479,14 @@ async def test_purge_cutoff_date( last_changed=timestamp_purge, last_updated=timestamp_purge, event_id=1000 + row, + attributes_id=1000 + row, + ) + ) + session.add( + StateAttributes( + shared_attrs="{}", + hash=1234, + attributes_id=1000 + row, ) ) @@ -462,9 +502,18 @@ async def test_purge_cutoff_date( with session_scope(hass=hass) as session: states = session.query(States) + state_attributes = session.query(StateAttributes) events = session.query(Events) assert states.filter(States.state == "purge").count() == rows - 1 assert states.filter(States.state == "keep").count() == 1 + assert ( + state_attributes.outerjoin( + States, StateAttributes.attributes_id == States.attributes_id + ) + .filter(States.state == "keep") + .count() + == 1 + ) assert events.filter(Events.event_type == "PURGE").count() == rows - 1 assert events.filter(Events.event_type == "KEEP").count() == 1 @@ -474,12 +523,47 @@ async def test_purge_cutoff_date( await async_wait_purge_done(hass, instance) states = session.query(States) + state_attributes = session.query(StateAttributes) events = session.query(Events) assert states.filter(States.state == "purge").count() == 0 + assert ( + state_attributes.outerjoin( + States, StateAttributes.attributes_id == States.attributes_id + ) + .filter(States.state == "purge") + .count() + == 0 + ) assert states.filter(States.state == "keep").count() == 1 + assert ( + state_attributes.outerjoin( + States, StateAttributes.attributes_id == States.attributes_id + ) + .filter(States.state == "keep") + .count() + == 1 + ) assert events.filter(Events.event_type == "PURGE").count() == 0 assert events.filter(Events.event_type == "KEEP").count() == 1 + # Make sure we can purge everything + instance.queue.put( + PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False) + ) + await async_recorder_block_till_done(hass, instance) + await async_wait_purge_done(hass, instance) + assert states.count() == 0 + assert state_attributes.count() == 0 + + # Make sure we can purge everything when the db is already empty + instance.queue.put( + PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False) + ) + await async_recorder_block_till_done(hass, instance) + await async_wait_purge_done(hass, instance) + assert states.count() == 0 + assert state_attributes.count() == 0 + async def test_purge_filtered_states( hass: HomeAssistant, @@ -527,6 +611,12 @@ async def test_purge_filtered_states( ) # Add states with linked old_state_ids that need to be handled timestamp = dt_util.utcnow() - timedelta(days=0) + state_attrs = StateAttributes( + hash=0, + shared_attrs=json.dumps( + {"sensor.linked_old_state_id": "sensor.linked_old_state_id"} + ), + ) state_1 = States( entity_id="sensor.linked_old_state_id", domain="sensor", @@ -535,6 +625,7 @@ async def test_purge_filtered_states( last_changed=timestamp, last_updated=timestamp, old_state_id=1, + state_attributes=state_attrs, ) timestamp = dt_util.utcnow() - timedelta(days=4) state_2 = States( @@ -545,6 +636,7 @@ async def test_purge_filtered_states( last_changed=timestamp, last_updated=timestamp, old_state_id=2, + state_attributes=state_attrs, ) state_3 = States( entity_id="sensor.linked_old_state_id", @@ -554,8 +646,9 @@ async def test_purge_filtered_states( last_changed=timestamp, last_updated=timestamp, old_state_id=62, # keep + state_attributes=state_attrs, ) - session.add_all((state_1, state_2, state_3)) + session.add_all((state_attrs, state_1, state_2, state_3)) # Add event that should be keeped session.add( Events( @@ -617,8 +710,154 @@ async def test_purge_filtered_states( assert states_sensor_excluded.count() == 0 assert session.query(States).get(72).old_state_id is None + assert session.query(States).get(72).attributes_id is None assert session.query(States).get(73).old_state_id is None - assert session.query(States).get(74).old_state_id == 62 # should have been kept + assert session.query(States).get(73).attributes_id is None + + final_keep_state = session.query(States).get(74) + assert final_keep_state.old_state_id == 62 # should have been kept + assert final_keep_state.attributes_id == 71 + + assert session.query(StateAttributes).count() == 11 + + # Do it again to make sure nothing changes + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, service_data + ) + await async_recorder_block_till_done(hass, instance) + await async_wait_purge_done(hass, instance) + final_keep_state = session.query(States).get(74) + assert final_keep_state.old_state_id == 62 # should have been kept + assert final_keep_state.attributes_id == 71 + + assert session.query(StateAttributes).count() == 11 + + # Finally make sure we can delete them all except for the ones missing an event_id + service_data = {"keep_days": 0} + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, service_data + ) + await async_recorder_block_till_done(hass, instance) + await async_wait_purge_done(hass, instance) + remaining = list(session.query(States)) + for state in remaining: + assert state.event_id is None + assert len(remaining) == 3 + assert session.query(StateAttributes).count() == 1 + + +async def test_purge_filtered_states_to_empty( + hass: HomeAssistant, + async_setup_recorder_instance: SetupRecorderInstanceT, +): + """Test filtered states are purged all the way to an empty db.""" + config: ConfigType = {"exclude": {"entities": ["sensor.excluded"]}} + instance = await async_setup_recorder_instance(hass, config) + assert instance.entity_filter("sensor.excluded") is False + + def _add_db_entries(hass: HomeAssistant) -> None: + with recorder.session_scope(hass=hass) as session: + # Add states and state_changed events that should be purged + for days in range(1, 4): + timestamp = dt_util.utcnow() - timedelta(days=days) + for event_id in range(1000, 1020): + _add_state_and_state_changed_event( + session, + "sensor.excluded", + "purgeme", + timestamp, + event_id * days, + ) + + service_data = {"keep_days": 10} + _add_db_entries(hass) + + with session_scope(hass=hass) as session: + states = session.query(States) + state_attributes = session.query(StateAttributes) + assert states.count() == 60 + assert state_attributes.count() == 60 + + # Test with 'apply_filter' = True + service_data["apply_filter"] = True + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, service_data + ) + await async_recorder_block_till_done(hass, instance) + await async_wait_purge_done(hass, instance) + assert states.count() == 0 + assert state_attributes.count() == 0 + + # Do it again to make sure nothing changes + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, service_data + ) + await async_recorder_block_till_done(hass, instance) + await async_wait_purge_done(hass, instance) + + +async def test_purge_without_state_attributes_filtered_states_to_empty( + hass: HomeAssistant, + async_setup_recorder_instance: SetupRecorderInstanceT, +): + """Test filtered legacy states without state attributes are purged all the way to an empty db.""" + config: ConfigType = {"exclude": {"entities": ["sensor.old_format"]}} + instance = await async_setup_recorder_instance(hass, config) + assert instance.entity_filter("sensor.old_format") is False + + def _add_db_entries(hass: HomeAssistant) -> None: + with recorder.session_scope(hass=hass) as session: + # Add states and state_changed events that should be purged + # in the legacy format + timestamp = dt_util.utcnow() - timedelta(days=5) + event_id = 1021 + session.add( + States( + entity_id="sensor.old_format", + domain="sensor", + state=STATE_ON, + attributes=json.dumps({"old": "not_using_state_attributes"}), + last_changed=timestamp, + last_updated=timestamp, + event_id=event_id, + state_attributes=None, + ) + ) + session.add( + Events( + event_id=event_id, + event_type=EVENT_STATE_CHANGED, + event_data="{}", + origin="LOCAL", + time_fired=timestamp, + ) + ) + + service_data = {"keep_days": 10} + _add_db_entries(hass) + + with session_scope(hass=hass) as session: + states = session.query(States) + state_attributes = session.query(StateAttributes) + assert states.count() == 1 + assert state_attributes.count() == 0 + + # Test with 'apply_filter' = True + service_data["apply_filter"] = True + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, service_data + ) + await async_recorder_block_till_done(hass, instance) + await async_wait_purge_done(hass, instance) + assert states.count() == 0 + assert state_attributes.count() == 0 + + # Do it again to make sure nothing changes + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, service_data + ) + await async_recorder_block_till_done(hass, instance) + await async_wait_purge_done(hass, instance) async def test_purge_filtered_events( @@ -923,7 +1162,7 @@ async def _add_test_states(hass: HomeAssistant, instance: recorder.Recorder): utcnow = dt_util.utcnow() five_days_ago = utcnow - timedelta(days=5) eleven_days_ago = utcnow - timedelta(days=11) - attributes = {"test_attr": 5, "test_attr_10": "nice"} + base_attributes = {"test_attr": 5, "test_attr_10": "nice"} async def set_state(entity_id, state, **kwargs): """Set the state.""" @@ -935,12 +1174,15 @@ async def _add_test_states(hass: HomeAssistant, instance: recorder.Recorder): if event_id < 2: timestamp = eleven_days_ago state = f"autopurgeme_{event_id}" + attributes = {"autopurgeme": True, **base_attributes} elif event_id < 4: timestamp = five_days_ago state = f"purgeme_{event_id}" + attributes = {"purgeme": True, **base_attributes} else: timestamp = utcnow state = f"dontpurgeme_{event_id}" + attributes = {"dontpurgeme": True, **base_attributes} with patch( "homeassistant.components.recorder.dt_util.utcnow", return_value=timestamp @@ -1069,15 +1311,20 @@ def _add_state_and_state_changed_event( event_id: int, ) -> None: """Add state and state_changed event to database for testing.""" + state_attrs = StateAttributes( + hash=event_id, shared_attrs=json.dumps({entity_id: entity_id}) + ) + session.add(state_attrs) session.add( States( entity_id=entity_id, domain="sensor", state=state, - attributes="{}", + attributes=None, last_changed=timestamp, last_updated=timestamp, event_id=event_id, + state_attributes=state_attrs, ) ) session.add(