Load pending state attributes and event data ids at startup (#88444)

* Load pending state attributes and event data ids at startup

Since we queue all events to be processed after startup
we can have a thundering herd of queries to prime the
LRUs of event data and state attributes ids. Since we
know we are about to process a chunk of events we can
fetch all the ids in two queries

* lru

* fix hang

* Fix recorder LRU being destroyed if event session is reopened

We would clear the LRU in _close_event_session but
it would never get replaced with an LRU again so
it would leak memory if the event session is reopened

* Fix recorder LRU being destroyed if event session is reopened

We would clear the LRU in _close_event_session but
it would never get replaced with an LRU again so
it would leak memory if the event session is reopened

* cleanup
pull/88465/head
J. Nick Koston 2023-02-19 20:26:38 -06:00 committed by GitHub
parent c4f92f5ad4
commit c2b770bcb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 190 additions and 74 deletions

View File

@ -27,7 +27,7 @@ MAX_QUEUE_BACKLOG = 65000
# in https://github.com/sqlite/sqlite/commit/efdba1a8b3c6c967e7fae9c1989c40d420ce64cc
# We can increase this back to 1000 once most
# have upgraded their sqlite version
MAX_ROWS_TO_PURGE = 998
SQLITE_MAX_BIND_VARS = 998
DB_WORKER_PREFIX = "DbWorker"

View File

@ -52,6 +52,7 @@ from .const import (
MAX_QUEUE_BACKLOG,
MYSQLDB_PYMYSQL_URL_PREFIX,
MYSQLDB_URL_PREFIX,
SQLITE_MAX_BIND_VARS,
SQLITE_URL_PREFIX,
SupportedDialect,
)
@ -75,7 +76,12 @@ from .models import (
process_timestamp,
)
from .pool import POOL_SIZE, MutexPool, RecorderPool
from .queries import find_shared_attributes_id, find_shared_data_id
from .queries import (
find_shared_attributes_id,
find_shared_data_id,
get_shared_attributes,
get_shared_event_datas,
)
from .run_history import RunHistory
from .tasks import (
AdjustLRUSizeTask,
@ -98,6 +104,7 @@ from .tasks import (
)
from .util import (
build_mysqldb_conv,
chunked,
dburl_to_path,
end_incomplete_runs,
is_second_sunday,
@ -681,22 +688,93 @@ class Recorder(threading.Thread):
self._adjust_lru_size()
self.hass.add_job(self._async_set_recorder_ready_migration_done)
self._run_event_loop()
self._shutdown()
def _run_event_loop(self) -> None:
"""Run the event loop for the recorder."""
# Use a session for the event read loop
# with a commit every time the event time
# has changed. This reduces the disk io.
queue_ = self._queue
startup_tasks: list[RecorderTask] = []
while not queue_.empty() and (task := queue_.get_nowait()):
startup_tasks.append(task)
self._pre_process_startup_tasks(startup_tasks)
for task in startup_tasks:
self._guarded_process_one_task_or_recover(task)
self.stop_requested = False
while not self.stop_requested:
task = self._queue.get()
_LOGGER.debug("Processing task: %s", task)
try:
self._process_one_task_or_recover(task)
except Exception as err: # pylint: disable=broad-except
_LOGGER.exception("Error while processing event %s: %s", task, err)
self._guarded_process_one_task_or_recover(queue_.get())
self._shutdown()
def _pre_process_startup_tasks(self, startup_tasks: list[RecorderTask]) -> None:
"""Pre process startup tasks."""
# Prime all the state_attributes and event_data caches
# before we start processing events
state_change_events: list[Event] = []
non_state_change_events: list[Event] = []
for task in startup_tasks:
if isinstance(task, EventTask):
event_ = task.event
if event_.event_type == EVENT_STATE_CHANGED:
state_change_events.append(event_)
else:
non_state_change_events.append(event_)
self._pre_process_state_change_events(state_change_events)
self._pre_process_non_state_change_events(non_state_change_events)
def _pre_process_state_change_events(self, events: list[Event]) -> None:
"""Load startup state attributes from the database.
Since the _state_attributes_ids cache is empty at startup
we restore it from the database to avoid having to look up
the attributes in the database for every state change
until its primed.
"""
assert self.event_session is not None
if hashes := [
StateAttributes.hash_shared_attrs_bytes(shared_attrs_bytes)
for event in events
if (
shared_attrs_bytes := self._serialize_state_attributes_from_event(event)
)
]:
with self.event_session.no_autoflush:
for hash_chunk in chunked(hashes, SQLITE_MAX_BIND_VARS):
for id_, shared_attrs in self.event_session.execute(
get_shared_attributes(hash_chunk)
).fetchall():
self._state_attributes_ids[shared_attrs] = id_
def _pre_process_non_state_change_events(self, events: list[Event]) -> None:
"""Load startup event attributes from the database.
Since the _event_data_ids cache is empty at startup
we restore it from the database to avoid having to look up
the data in the database for every event until its primed.
"""
assert self.event_session is not None
if hashes := [
EventData.hash_shared_data_bytes(shared_event_bytes)
for event in events
if (shared_event_bytes := self._serialize_event_data_from_event(event))
]:
with self.event_session.no_autoflush:
for hash_chunk in chunked(hashes, SQLITE_MAX_BIND_VARS):
for id_, shared_data in self.event_session.execute(
get_shared_event_datas(hash_chunk)
).fetchall():
self._event_data_ids[shared_data] = id_
def _guarded_process_one_task_or_recover(self, task: RecorderTask) -> None:
"""Process a task, guarding against exceptions to ensure the loop does not collapse."""
_LOGGER.debug("Processing task: %s", task)
try:
self._process_one_task_or_recover(task)
except Exception as err: # pylint: disable=broad-except
_LOGGER.exception("Error while processing event %s: %s", task, err)
def _process_one_task_or_recover(self, task: RecorderTask) -> None:
"""Process an event, reconnect, or recover a malformed database."""
@ -854,6 +932,14 @@ class Recorder(threading.Thread):
return cast(int, data_id[0])
return None
def _serialize_event_data_from_event(self, event: Event) -> bytes | None:
"""Serialize event data."""
try:
return EventData.shared_data_bytes_from_event(event, self.dialect_name)
except JSON_ENCODE_EXCEPTIONS as ex:
_LOGGER.warning("Event is not JSON serializable: %s: %s", event, ex)
return None
def _process_non_state_changed_event_into_session(self, event: Event) -> None:
"""Process any event into the session except state changed."""
assert self.event_session is not None
@ -861,15 +947,8 @@ class Recorder(threading.Thread):
if not event.data:
self.event_session.add(dbevent)
return
try:
shared_data_bytes = EventData.shared_data_bytes_from_event(
event, self.dialect_name
)
except JSON_ENCODE_EXCEPTIONS as ex:
_LOGGER.warning("Event is not JSON serializable: %s: %s", event, ex)
if not (shared_data_bytes := self._serialize_event_data_from_event(event)):
return
shared_data = shared_data_bytes.decode("utf-8")
# Matching attributes found in the pending commit
if pending_event_data := self._pending_event_data.get(shared_data):
@ -892,12 +971,10 @@ class Recorder(threading.Thread):
self.event_session.add(dbevent)
def _process_state_changed_event_into_session(self, event: Event) -> None:
"""Process a state_changed event into the session."""
assert self.event_session is not None
def _serialize_state_attributes_from_event(self, event: Event) -> bytes | None:
"""Serialize state changed event data."""
try:
dbstate = States.from_event(event)
shared_attrs_bytes = StateAttributes.shared_attrs_bytes_from_event(
return StateAttributes.shared_attrs_bytes_from_event(
event,
self._entity_sources,
self._exclude_attributes_by_domain,
@ -909,6 +986,15 @@ class Recorder(threading.Thread):
event.data.get("new_state"),
ex,
)
return None
def _process_state_changed_event_into_session(self, event: Event) -> None:
"""Process a state_changed event into the session."""
assert self.event_session is not None
dbstate = States.from_event(event)
if not (
shared_attrs_bytes := self._serialize_state_attributes_from_event(event)
):
return
shared_attrs = shared_attrs_bytes.decode("utf-8")
@ -1256,6 +1342,7 @@ class Recorder(threading.Thread):
def _shutdown(self) -> None:
"""Save end time for current run."""
_LOGGER.debug("Shutting down recorder")
self.hass.add_job(self._async_stop_listeners)
self._stop_executor()
try:

View File

@ -3,6 +3,7 @@ from __future__ import annotations
from collections.abc import Callable
from datetime import datetime, timedelta
from functools import lru_cache
import logging
import time
from typing import Any, cast
@ -284,6 +285,7 @@ class EventData(Base):
return json_bytes(event.data)
@staticmethod
@lru_cache
def hash_shared_data_bytes(shared_data_bytes: bytes) -> int:
"""Return the hash of json encoded shared data."""
return cast(int, fnv1a_32(shared_data_bytes))
@ -492,6 +494,7 @@ class StateAttributes(Base):
return bytes_result
@staticmethod
@lru_cache(maxsize=2048)
def hash_shared_attrs_bytes(shared_attrs_bytes: bytes) -> int:
"""Return the hash of json encoded shared attributes."""
return cast(int, fnv1a_32(shared_attrs_bytes))

View File

@ -3,10 +3,9 @@ from __future__ import annotations
from collections.abc import Callable, Iterable
from datetime import datetime
from functools import partial
from itertools import islice, zip_longest
from itertools import zip_longest
import logging
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING
from sqlalchemy.engine.row import Row
from sqlalchemy.orm.session import Session
@ -15,7 +14,7 @@ from sqlalchemy.sql.expression import distinct
from homeassistant.const import EVENT_STATE_CHANGED
import homeassistant.util.dt as dt_util
from .const import MAX_ROWS_TO_PURGE
from .const import SQLITE_MAX_BIND_VARS
from .db_schema import Events, StateAttributes, States
from .models import DatabaseEngine
from .queries import (
@ -40,7 +39,7 @@ from .queries import (
find_statistics_runs_to_purge,
)
from .repack import repack_database
from .util import retryable_database_job, session_scope
from .util import chunked, retryable_database_job, session_scope
if TYPE_CHECKING:
from . import Recorder
@ -52,22 +51,6 @@ DEFAULT_STATES_BATCHES_PER_PURGE = 20 # We expect ~95% de-dupe rate
DEFAULT_EVENTS_BATCHES_PER_PURGE = 15 # We expect ~92% de-dupe rate
def take(take_num: int, iterable: Iterable) -> list[Any]:
"""Return first n items of the iterable as a list.
From itertools recipes
"""
return list(islice(iterable, take_num))
def chunked(iterable: Iterable, chunked_num: int) -> Iterable[Any]:
"""Break *iterable* into lists of length *n*.
From more-itertools
"""
return iter(partial(take, chunked_num, iter(iterable)), [])
@retryable_database_job("purge")
def purge_old_data(
instance: Recorder,
@ -86,7 +69,7 @@ def purge_old_data(
purge_before.isoformat(sep=" ", timespec="seconds"),
)
with session_scope(session=instance.get_session()) as session:
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
# Purge a max of SQLITE_MAX_BIND_VARS, based on the oldest states or events record
has_more_to_purge = False
if _purging_legacy_format(session):
_LOGGER.debug(
@ -174,7 +157,7 @@ def _purge_states_and_attributes_ids(
# There are more states relative to attributes_ids so
# we purge enough state_ids to try to generate a full
# size batch of attributes_ids that will be around the size
# MAX_ROWS_TO_PURGE
# SQLITE_MAX_BIND_VARS
attributes_ids_batch: set[int] = set()
for _ in range(states_batch_size):
state_ids, attributes_ids = _select_state_attributes_ids_to_purge(
@ -208,7 +191,7 @@ def _purge_events_and_data_ids(
# There are more events relative to data_ids so
# we purge enough event_ids to try to generate a full
# size batch of data_ids that will be around the size
# MAX_ROWS_TO_PURGE
# SQLITE_MAX_BIND_VARS
data_ids_batch: set[int] = set()
for _ in range(events_batch_size):
event_ids, data_ids = _select_event_data_ids_to_purge(session, purge_before)
@ -310,7 +293,7 @@ def _select_unused_attributes_ids(
#
# We used to generate a query based on how many attribute_ids to find but
# that meant sqlalchemy Transparent SQL Compilation Caching was working against
# us by cached up to MAX_ROWS_TO_PURGE different statements which could be
# us by cached up to SQLITE_MAX_BIND_VARS different statements which could be
# up to 500MB for large database due to the complexity of the ORM objects.
#
# We now break the query into groups of 100 and use a lambda_stmt to ensure
@ -525,8 +508,8 @@ def _evict_purged_attributes_from_attributes_cache(
def _purge_batch_attributes_ids(
instance: Recorder, session: Session, attributes_ids: set[int]
) -> None:
"""Delete old attributes ids in batches of MAX_ROWS_TO_PURGE."""
for attributes_ids_chunk in chunked(attributes_ids, MAX_ROWS_TO_PURGE):
"""Delete old attributes ids in batches of SQLITE_MAX_BIND_VARS."""
for attributes_ids_chunk in chunked(attributes_ids, SQLITE_MAX_BIND_VARS):
deleted_rows = session.execute(
delete_states_attributes_rows(attributes_ids_chunk)
)
@ -539,8 +522,8 @@ def _purge_batch_attributes_ids(
def _purge_batch_data_ids(
instance: Recorder, session: Session, data_ids: set[int]
) -> None:
"""Delete old event data ids in batches of MAX_ROWS_TO_PURGE."""
for data_ids_chunk in chunked(data_ids, MAX_ROWS_TO_PURGE):
"""Delete old event data ids in batches of SQLITE_MAX_BIND_VARS."""
for data_ids_chunk in chunked(data_ids, SQLITE_MAX_BIND_VARS):
deleted_rows = session.execute(delete_event_data_rows(data_ids_chunk))
_LOGGER.debug("Deleted %s data events", deleted_rows)
@ -624,7 +607,7 @@ def _purge_filtered_states(
*(
session.query(States.state_id, States.attributes_id, States.event_id)
.filter(States.entity_id.in_(excluded_entity_ids))
.limit(MAX_ROWS_TO_PURGE)
.limit(SQLITE_MAX_BIND_VARS)
.all()
)
)
@ -650,7 +633,7 @@ def _purge_filtered_events(
*(
session.query(Events.event_id, Events.data_id)
.filter(Events.event_type.in_(excluded_event_types))
.limit(MAX_ROWS_TO_PURGE)
.limit(SQLITE_MAX_BIND_VARS)
.all()
)
)
@ -685,7 +668,7 @@ def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool])
]
_LOGGER.debug("Purging entity data for %s", selected_entity_ids)
if len(selected_entity_ids) > 0:
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states
# Purge a max of SQLITE_MAX_BIND_VARS, based on the oldest states
# or events record.
_purge_filtered_states(
instance, session, selected_entity_ids, database_engine

View File

@ -8,7 +8,7 @@ from sqlalchemy import delete, distinct, func, lambda_stmt, select, union_all, u
from sqlalchemy.sql.lambdas import StatementLambdaElement
from sqlalchemy.sql.selectable import Select
from .const import MAX_ROWS_TO_PURGE
from .const import SQLITE_MAX_BIND_VARS
from .db_schema import (
EventData,
Events,
@ -20,6 +20,24 @@ from .db_schema import (
)
def get_shared_attributes(hashes: list[int]) -> StatementLambdaElement:
"""Load shared attributes from the database."""
return lambda_stmt(
lambda: select(
StateAttributes.attributes_id, StateAttributes.shared_attrs
).where(StateAttributes.hash.in_(hashes))
)
def get_shared_event_datas(hashes: list[int]) -> StatementLambdaElement:
"""Load shared event data from the database."""
return lambda_stmt(
lambda: select(EventData.data_id, EventData.shared_data).where(
EventData.hash.in_(hashes)
)
)
def find_shared_attributes_id(
data_hash: int, shared_attrs: str
) -> StatementLambdaElement:
@ -587,7 +605,7 @@ def find_events_to_purge(purge_before: float) -> StatementLambdaElement:
return lambda_stmt(
lambda: select(Events.event_id, Events.data_id)
.filter(Events.time_fired_ts < purge_before)
.limit(MAX_ROWS_TO_PURGE)
.limit(SQLITE_MAX_BIND_VARS)
)
@ -596,7 +614,7 @@ def find_states_to_purge(purge_before: float) -> StatementLambdaElement:
return lambda_stmt(
lambda: select(States.state_id, States.attributes_id)
.filter(States.last_updated_ts < purge_before)
.limit(MAX_ROWS_TO_PURGE)
.limit(SQLITE_MAX_BIND_VARS)
)
@ -608,7 +626,7 @@ def find_short_term_statistics_to_purge(
return lambda_stmt(
lambda: select(StatisticsShortTerm.id)
.filter(StatisticsShortTerm.start_ts < purge_before_ts)
.limit(MAX_ROWS_TO_PURGE)
.limit(SQLITE_MAX_BIND_VARS)
)
@ -619,7 +637,7 @@ def find_statistics_runs_to_purge(
return lambda_stmt(
lambda: select(StatisticsRuns.run_id)
.filter(StatisticsRuns.start < purge_before)
.limit(MAX_ROWS_TO_PURGE)
.limit(SQLITE_MAX_BIND_VARS)
)
@ -640,7 +658,7 @@ def find_legacy_event_state_and_attributes_and_data_ids_to_purge(
)
.outerjoin(States, Events.event_id == States.event_id)
.filter(Events.time_fired_ts < purge_before)
.limit(MAX_ROWS_TO_PURGE)
.limit(SQLITE_MAX_BIND_VARS)
)

View File

@ -56,7 +56,7 @@ from .const import (
DOMAIN,
EVENT_RECORDER_5MIN_STATISTICS_GENERATED,
EVENT_RECORDER_HOURLY_STATISTICS_GENERATED,
MAX_ROWS_TO_PURGE,
SQLITE_MAX_BIND_VARS,
SupportedDialect,
)
from .db_schema import (
@ -441,7 +441,7 @@ def _find_duplicates(
)
.filter(subquery.c.is_duplicate == 1)
.order_by(table.metadata_id, table.start, table.id.desc())
.limit(1000 * MAX_ROWS_TO_PURGE)
.limit(1000 * SQLITE_MAX_BIND_VARS)
)
duplicates = execute(query)
original_as_dict = {}
@ -505,10 +505,10 @@ def _delete_duplicates_from_table(
if not duplicate_ids:
break
all_non_identical_duplicates.extend(non_identical_duplicates)
for i in range(0, len(duplicate_ids), MAX_ROWS_TO_PURGE):
for i in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS):
deleted_rows = (
session.query(table)
.filter(table.id.in_(duplicate_ids[i : i + MAX_ROWS_TO_PURGE]))
.filter(table.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS]))
.delete(synchronize_session=False)
)
total_deleted_rows += deleted_rows
@ -584,7 +584,7 @@ def _find_statistics_meta_duplicates(session: Session) -> list[int]:
)
.filter(subquery.c.is_duplicate == 1)
.order_by(StatisticsMeta.statistic_id, StatisticsMeta.id.desc())
.limit(1000 * MAX_ROWS_TO_PURGE)
.limit(1000 * SQLITE_MAX_BIND_VARS)
)
duplicates = execute(query)
statistic_id = None
@ -609,10 +609,12 @@ def _delete_statistics_meta_duplicates(session: Session) -> int:
duplicate_ids = _find_statistics_meta_duplicates(session)
if not duplicate_ids:
break
for i in range(0, len(duplicate_ids), MAX_ROWS_TO_PURGE):
for i in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS):
deleted_rows = (
session.query(StatisticsMeta)
.filter(StatisticsMeta.id.in_(duplicate_ids[i : i + MAX_ROWS_TO_PURGE]))
.filter(
StatisticsMeta.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS])
)
.delete(synchronize_session=False)
)
total_deleted_rows += deleted_rows

View File

@ -1,10 +1,12 @@
"""SQLAlchemy util functions."""
from __future__ import annotations
from collections.abc import Callable, Generator, Sequence
from collections.abc import Callable, Generator, Iterable, Sequence
from contextlib import contextmanager
from datetime import date, datetime, timedelta
import functools
from functools import partial
from itertools import islice
import logging
import os
import time
@ -761,3 +763,19 @@ def resolve_period(
end_time += offset
return (start_time, end_time)
def take(take_num: int, iterable: Iterable) -> list[Any]:
"""Return first n items of the iterable as a list.
From itertools recipes
"""
return list(islice(iterable, take_num))
def chunked(iterable: Iterable, chunked_num: int) -> Iterable[Any]:
"""Break *iterable* into lists of length *n*.
From more-itertools
"""
return iter(partial(take, chunked_num, iter(iterable)), [])

View File

@ -374,6 +374,8 @@ async def test_schema_migrate(
"homeassistant.components.recorder.Recorder._process_state_changed_event_into_session",
), patch(
"homeassistant.components.recorder.Recorder._process_non_state_changed_event_into_session",
), patch(
"homeassistant.components.recorder.Recorder._pre_process_startup_tasks",
):
recorder_helper.async_initialize_recorder(hass)
hass.async_create_task(

View File

@ -9,7 +9,10 @@ from sqlalchemy.exc import DatabaseError, OperationalError
from sqlalchemy.orm.session import Session
from homeassistant.components import recorder
from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE, SupportedDialect
from homeassistant.components.recorder.const import (
SQLITE_MAX_BIND_VARS,
SupportedDialect,
)
from homeassistant.components.recorder.db_schema import (
EventData,
Events,
@ -591,7 +594,7 @@ async def test_purge_cutoff_date(
service_data = {"keep_days": 2}
# Force multiple purge batches to be run
rows = MAX_ROWS_TO_PURGE + 1
rows = SQLITE_MAX_BIND_VARS + 1
cutoff = dt_util.utcnow() - timedelta(days=service_data["keep_days"])
await _add_db_entries(hass, cutoff, rows)
@ -1548,11 +1551,11 @@ async def test_purge_many_old_events(
"""Test deleting old events."""
instance = await async_setup_recorder_instance(hass)
await _add_test_events(hass, MAX_ROWS_TO_PURGE)
await _add_test_events(hass, SQLITE_MAX_BIND_VARS)
with session_scope(hass=hass) as session:
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
assert events.count() == MAX_ROWS_TO_PURGE * 6
assert events.count() == SQLITE_MAX_BIND_VARS * 6
purge_before = dt_util.utcnow() - timedelta(days=4)
@ -1565,7 +1568,7 @@ async def test_purge_many_old_events(
events_batch_size=3,
)
assert not finished
assert events.count() == MAX_ROWS_TO_PURGE * 3
assert events.count() == SQLITE_MAX_BIND_VARS * 3
# we should only have 2 groups of events left
finished = purge_old_data(
@ -1576,7 +1579,7 @@ async def test_purge_many_old_events(
events_batch_size=3,
)
assert finished
assert events.count() == MAX_ROWS_TO_PURGE * 2
assert events.count() == SQLITE_MAX_BIND_VARS * 2
# we should now purge everything
finished = purge_old_data(