From e30940ef2adc2a2f4b006e85092f53bcbcb68bfb Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 3 May 2022 15:56:22 -0500 Subject: [PATCH] Move processing of recorder service call arguments into services.py (#71260) --- homeassistant/components/recorder/__init__.py | 2 +- homeassistant/components/recorder/core.py | 170 ++++++++---------- homeassistant/components/recorder/services.py | 18 +- homeassistant/components/recorder/tasks.py | 12 +- .../components/recorder/websocket_api.py | 2 +- tests/components/recorder/test_init.py | 2 +- tests/components/recorder/test_purge.py | 6 +- 7 files changed, 104 insertions(+), 108 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 00f12710c18..ca1ecd8c71a 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -179,4 +179,4 @@ async def _process_recorder_platform( ) -> None: """Process a recorder platform.""" instance: Recorder = hass.data[DATA_INSTANCE] - instance.queue.put(AddRecorderPlatformTask(domain, platform)) + instance.queue_task(AddRecorderPlatformTask(domain, platform)) diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index b96eee2c67f..7f07a4483cb 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -28,7 +28,6 @@ from homeassistant.const import ( MATCH_ALL, ) from homeassistant.core import CALLBACK_TYPE, CoreState, Event, HomeAssistant, callback -from homeassistant.helpers.entityfilter import generate_filter from homeassistant.helpers.event import ( async_track_time_change, async_track_time_interval, @@ -38,9 +37,6 @@ import homeassistant.util.dt as dt_util from . import migration, statistics from .const import ( - ATTR_APPLY_FILTER, - ATTR_KEEP_DAYS, - ATTR_REPACK, DB_WORKER_PREFIX, KEEPALIVE_TIME, MAX_QUEUE_BACKLOG, @@ -70,7 +66,6 @@ from .tasks import ( ExternalStatisticsTask, KeepAliveTask, PerodicCleanupTask, - PurgeEntitiesTask, PurgeTask, RecorderTask, StatisticsTask, @@ -112,6 +107,7 @@ SHUTDOWN_TASK = object() COMMIT_TASK = CommitTask() KEEP_ALIVE_TASK = KeepAliveTask() +WAIT_TASK = WaitTask() DB_LOCK_TIMEOUT = 30 DB_LOCK_QUEUE_CHECK_TIMEOUT = 1 @@ -152,7 +148,7 @@ class Recorder(threading.Thread): self.keep_days = keep_days self._hass_started: asyncio.Future[object] = asyncio.Future() self.commit_interval = commit_interval - self.queue: queue.SimpleQueue[RecorderTask] = queue.SimpleQueue() + self._queue: queue.SimpleQueue[RecorderTask] = queue.SimpleQueue() self.db_url = uri self.db_max_retries = db_max_retries self.db_retry_wait = db_retry_wait @@ -175,21 +171,42 @@ class Recorder(threading.Thread): self.event_session: Session | None = None self.get_session: Callable[[], Session] | None = None self._completed_first_database_setup: bool | None = None - self._event_listener: CALLBACK_TYPE | None = None self.async_migration_event = asyncio.Event() self.migration_in_progress = False - self._queue_watcher: CALLBACK_TYPE | None = None self._db_supports_row_number = True self._database_lock_task: DatabaseLockTask | None = None self._db_executor: DBInterruptibleThreadPoolExecutor | None = None self._exclude_attributes_by_domain = exclude_attributes_by_domain + self._event_listener: CALLBACK_TYPE | None = None + self._queue_watcher: CALLBACK_TYPE | None = None self._keep_alive_listener: CALLBACK_TYPE | None = None self._commit_listener: CALLBACK_TYPE | None = None self._periodic_listener: CALLBACK_TYPE | None = None self._nightly_listener: CALLBACK_TYPE | None = None self.enabled = True + @property + def backlog(self) -> int: + """Return the number of items in the recorder backlog.""" + return self._queue.qsize() + + @property + def _using_file_sqlite(self) -> bool: + """Short version to check if we are using sqlite3 as a file.""" + return self.db_url != SQLITE_URL_PREFIX and self.db_url.startswith( + SQLITE_URL_PREFIX + ) + + @property + def recording(self) -> bool: + """Return if the recorder is recording.""" + return self._event_listener is not None + + def queue_task(self, task: RecorderTask) -> None: + """Add a task to the recorder queue.""" + self._queue.put(task) + def set_enable(self, enable: bool) -> None: """Enable or disable recording events and states.""" self.enabled = enable @@ -222,7 +239,7 @@ class Recorder(threading.Thread): def _async_keep_alive(self, now: datetime) -> None: """Queue a keep alive.""" if self._event_listener: - self.queue.put(KEEP_ALIVE_TASK) + self.queue_task(KEEP_ALIVE_TASK) @callback def _async_commit(self, now: datetime) -> None: @@ -232,7 +249,7 @@ class Recorder(threading.Thread): and not self._database_lock_task and self._event_session_has_pending_writes() ): - self.queue.put(COMMIT_TASK) + self.queue_task(COMMIT_TASK) @callback def async_add_executor_job( @@ -253,7 +270,7 @@ class Recorder(threading.Thread): The queue grows during migraton or if something really goes wrong. """ - size = self.queue.qsize() + size = self.backlog _LOGGER.debug("Recorder queue size is: %s", size) if size <= MAX_QUEUE_BACKLOG: return @@ -314,73 +331,52 @@ class Recorder(threading.Thread): # Unknown what it is. return True - def do_adhoc_purge(self, **kwargs: Any) -> None: - """Trigger an adhoc purge retaining keep_days worth of data.""" - keep_days = kwargs.get(ATTR_KEEP_DAYS, self.keep_days) - repack = cast(bool, kwargs[ATTR_REPACK]) - apply_filter = cast(bool, kwargs[ATTR_APPLY_FILTER]) - - purge_before = dt_util.utcnow() - timedelta(days=keep_days) - self.queue.put(PurgeTask(purge_before, repack, apply_filter)) - - def do_adhoc_purge_entities( - self, entity_ids: set[str], domains: list[str], entity_globs: list[str] - ) -> None: - """Trigger an adhoc purge of requested entities.""" - entity_filter = generate_filter(domains, list(entity_ids), [], [], entity_globs) - self.queue.put(PurgeEntitiesTask(entity_filter)) - def do_adhoc_statistics(self, **kwargs: Any) -> None: """Trigger an adhoc statistics run.""" if not (start := kwargs.get("start")): start = statistics.get_start_time() - self.queue.put(StatisticsTask(start)) + self.queue_task(StatisticsTask(start)) + + def _empty_queue(self, event: Event) -> None: + """Empty the queue if its still present at final write.""" + + # If the queue is full of events to be processed because + # the database is so broken that every event results in a retry + # we will never be able to get though the events to shutdown in time. + # + # We drain all the events in the queue and then insert + # an empty one to ensure the next thing the recorder sees + # is a request to shutdown. + while True: + try: + self._queue.get_nowait() + except queue.Empty: + break + self.queue_task(StopTask()) + + async def _async_shutdown(self, event: Event) -> None: + """Shut down the Recorder.""" + if not self._hass_started.done(): + self._hass_started.set_result(SHUTDOWN_TASK) + self.queue_task(StopTask()) + self._async_stop_listeners() + await self.hass.async_add_executor_job(self.join) + + @callback + def _async_hass_started(self, event: Event) -> None: + """Notify that hass has started.""" + self._hass_started.set_result(None) @callback def async_register(self) -> None: """Post connection initialize.""" - - def _empty_queue(event: Event) -> None: - """Empty the queue if its still present at final write.""" - - # If the queue is full of events to be processed because - # the database is so broken that every event results in a retry - # we will never be able to get though the events to shutdown in time. - # - # We drain all the events in the queue and then insert - # an empty one to ensure the next thing the recorder sees - # is a request to shutdown. - while True: - try: - self.queue.get_nowait() - except queue.Empty: - break - self.queue.put(StopTask()) - - self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, _empty_queue) - - async def _async_shutdown(event: Event) -> None: - """Shut down the Recorder.""" - if not self._hass_started.done(): - self._hass_started.set_result(SHUTDOWN_TASK) - self.queue.put(StopTask()) - self._async_stop_listeners() - await self.hass.async_add_executor_job(self.join) - - self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_shutdown) - + bus = self.hass.bus + bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, self._empty_queue) + bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self._async_shutdown) if self.hass.state == CoreState.running: self._hass_started.set_result(None) return - - @callback - def _async_hass_started(event: Event) -> None: - """Notify that hass has started.""" - self._hass_started.set_result(None) - - self.hass.bus.async_listen_once( - EVENT_HOMEASSISTANT_STARTED, _async_hass_started - ) + bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, self._async_hass_started) @callback def async_connection_failed(self) -> None: @@ -414,9 +410,9 @@ class Recorder(threading.Thread): # until after the database is vacuumed repack = self.auto_repack and is_second_sunday(now) purge_before = dt_util.utcnow() - timedelta(days=self.keep_days) - self.queue.put(PurgeTask(purge_before, repack=repack, apply_filter=False)) + self.queue_task(PurgeTask(purge_before, repack=repack, apply_filter=False)) else: - self.queue.put(PerodicCleanupTask()) + self.queue_task(PerodicCleanupTask()) @callback def async_periodic_statistics(self, now: datetime) -> None: @@ -425,33 +421,33 @@ class Recorder(threading.Thread): Short term statistics run every 5 minutes """ start = statistics.get_start_time() - self.queue.put(StatisticsTask(start)) + self.queue_task(StatisticsTask(start)) @callback def async_adjust_statistics( self, statistic_id: str, start_time: datetime, sum_adjustment: float ) -> None: """Adjust statistics.""" - self.queue.put(AdjustStatisticsTask(statistic_id, start_time, sum_adjustment)) + self.queue_task(AdjustStatisticsTask(statistic_id, start_time, sum_adjustment)) @callback def async_clear_statistics(self, statistic_ids: list[str]) -> None: """Clear statistics for a list of statistic_ids.""" - self.queue.put(ClearStatisticsTask(statistic_ids)) + self.queue_task(ClearStatisticsTask(statistic_ids)) @callback def async_update_statistics_metadata( self, statistic_id: str, unit_of_measurement: str | None ) -> None: """Update statistics metadata for a statistic_id.""" - self.queue.put(UpdateStatisticsMetadataTask(statistic_id, unit_of_measurement)) + self.queue_task(UpdateStatisticsMetadataTask(statistic_id, unit_of_measurement)) @callback def async_external_statistics( self, metadata: StatisticMetaData, stats: Iterable[StatisticData] ) -> None: """Schedule external statistics.""" - self.queue.put(ExternalStatisticsTask(metadata, stats)) + self.queue_task(ExternalStatisticsTask(metadata, stats)) @callback def using_sqlite(self) -> bool: @@ -553,7 +549,7 @@ class Recorder(threading.Thread): # has changed. This reduces the disk io. self.stop_requested = False while not self.stop_requested: - task = self.queue.get() + task = self._queue.get() _LOGGER.debug("Processing task: %s", task) try: self._process_one_task_or_recover(task) @@ -643,7 +639,7 @@ class Recorder(threading.Thread): # Notify that lock is being held, wait until database can be used again. self.hass.add_job(_async_set_database_locked, task) while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT): - if self.queue.qsize() > MAX_QUEUE_BACKLOG * 0.9: + if self.backlog > MAX_QUEUE_BACKLOG * 0.9: _LOGGER.warning( "Database queue backlog reached more than 90% of maximum queue " "length while waiting for backup to finish; recorder will now " @@ -654,7 +650,7 @@ class Recorder(threading.Thread): break _LOGGER.info( "Database queue backlog reached %d entries during backup", - self.queue.qsize(), + self.backlog, ) def _process_one_event(self, event: Event) -> None: @@ -908,7 +904,7 @@ class Recorder(threading.Thread): @callback def event_listener(self, event: Event) -> None: """Listen for new events and put them in the process queue.""" - self.queue.put(EventTask(event)) + self.queue_task(EventTask(event)) def block_till_done(self) -> None: """Block till all events processed. @@ -923,7 +919,7 @@ class Recorder(threading.Thread): is in the database. """ self._queue_watch.clear() - self.queue.put(WaitTask()) + self.queue_task(WAIT_TASK) self._queue_watch.wait() async def lock_database(self) -> bool: @@ -940,7 +936,7 @@ class Recorder(threading.Thread): database_locked = asyncio.Event() task = DatabaseLockTask(database_locked, threading.Event(), False) - self.queue.put(task) + self.queue_task(task) try: await asyncio.wait_for(database_locked.wait(), timeout=DB_LOCK_TIMEOUT) except asyncio.TimeoutError as err: @@ -1013,13 +1009,6 @@ class Recorder(threading.Thread): self.get_session = scoped_session(sessionmaker(bind=self.engine, future=True)) _LOGGER.debug("Connected to recorder database") - @property - def _using_file_sqlite(self) -> bool: - """Short version to check if we are using sqlite3 as a file.""" - return self.db_url != SQLITE_URL_PREFIX and self.db_url.startswith( - SQLITE_URL_PREFIX - ) - def _close_connection(self) -> None: """Close the connection.""" assert self.engine is not None @@ -1053,7 +1042,7 @@ class Recorder(threading.Thread): while start < last_period: end = start + timedelta(minutes=5) _LOGGER.debug("Compiling missing statistics for %s-%s", start, end) - self.queue.put(StatisticsTask(start)) + self.queue_task(StatisticsTask(start)) start = end def _end_session(self) -> None: @@ -1075,8 +1064,3 @@ class Recorder(threading.Thread): self._stop_executor() self._end_session() self._close_connection() - - @property - def recording(self) -> bool: - """Return if the recorder is recording.""" - return self._event_listener is not None diff --git a/homeassistant/components/recorder/services.py b/homeassistant/components/recorder/services.py index 3da2c63a27c..14337290c9b 100644 --- a/homeassistant/components/recorder/services.py +++ b/homeassistant/components/recorder/services.py @@ -1,21 +1,26 @@ """Support for recorder services.""" from __future__ import annotations +from datetime import timedelta +from typing import cast + import voluptuous as vol from homeassistant.core import HomeAssistant, ServiceCall, callback import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.entityfilter import generate_filter from homeassistant.helpers.service import async_extract_entity_ids +import homeassistant.util.dt as dt_util from .const import ATTR_APPLY_FILTER, ATTR_KEEP_DAYS, ATTR_REPACK, DOMAIN from .core import Recorder +from .tasks import PurgeEntitiesTask, PurgeTask SERVICE_PURGE = "purge" SERVICE_PURGE_ENTITIES = "purge_entities" SERVICE_ENABLE = "enable" SERVICE_DISABLE = "disable" - SERVICE_PURGE_SCHEMA = vol.Schema( { vol.Optional(ATTR_KEEP_DAYS): cv.positive_int, @@ -44,7 +49,12 @@ SERVICE_DISABLE_SCHEMA = vol.Schema({}) def _async_register_purge_service(hass: HomeAssistant, instance: Recorder) -> None: async def async_handle_purge_service(service: ServiceCall) -> None: """Handle calls to the purge service.""" - instance.do_adhoc_purge(**service.data) + kwargs = service.data + keep_days = kwargs.get(ATTR_KEEP_DAYS, instance.keep_days) + repack = cast(bool, kwargs[ATTR_REPACK]) + apply_filter = cast(bool, kwargs[ATTR_APPLY_FILTER]) + purge_before = dt_util.utcnow() - timedelta(days=keep_days) + instance.queue_task(PurgeTask(purge_before, repack, apply_filter)) hass.services.async_register( DOMAIN, SERVICE_PURGE, async_handle_purge_service, schema=SERVICE_PURGE_SCHEMA @@ -60,8 +70,8 @@ def _async_register_purge_entities_service( entity_ids = await async_extract_entity_ids(hass, service) domains = service.data.get(ATTR_DOMAINS, []) entity_globs = service.data.get(ATTR_ENTITY_GLOBS, []) - - instance.do_adhoc_purge_entities(entity_ids, domains, entity_globs) + entity_filter = generate_filter(domains, list(entity_ids), [], [], entity_globs) + instance.queue_task(PurgeEntitiesTask(entity_filter)) hass.services.async_register( DOMAIN, diff --git a/homeassistant/components/recorder/tasks.py b/homeassistant/components/recorder/tasks.py index fdffa63bcd3..bed49e36f16 100644 --- a/homeassistant/components/recorder/tasks.py +++ b/homeassistant/components/recorder/tasks.py @@ -78,7 +78,9 @@ class PurgeTask(RecorderTask): periodic_db_cleanups(instance) return # Schedule a new purge task if this one didn't finish - instance.queue.put(PurgeTask(self.purge_before, self.repack, self.apply_filter)) + instance.queue_task( + PurgeTask(self.purge_before, self.repack, self.apply_filter) + ) @dataclass @@ -92,7 +94,7 @@ class PurgeEntitiesTask(RecorderTask): if purge.purge_entity_data(instance, self.entity_filter): return # Schedule a new purge task if this one didn't finish - instance.queue.put(PurgeEntitiesTask(self.entity_filter)) + instance.queue_task(PurgeEntitiesTask(self.entity_filter)) @dataclass @@ -115,7 +117,7 @@ class StatisticsTask(RecorderTask): if statistics.compile_statistics(instance, self.start): return # Schedule a new statistics task if this one didn't finish - instance.queue.put(StatisticsTask(self.start)) + instance.queue_task(StatisticsTask(self.start)) @dataclass @@ -130,7 +132,7 @@ class ExternalStatisticsTask(RecorderTask): if statistics.add_external_statistics(instance, self.metadata, self.statistics): return # Schedule a new statistics task if this one didn't finish - instance.queue.put(ExternalStatisticsTask(self.metadata, self.statistics)) + instance.queue_task(ExternalStatisticsTask(self.metadata, self.statistics)) @dataclass @@ -151,7 +153,7 @@ class AdjustStatisticsTask(RecorderTask): ): return # Schedule a new adjust statistics task if this one didn't finish - instance.queue.put( + instance.queue_task( AdjustStatisticsTask( self.statistic_id, self.start_time, self.sum_adjustment ) diff --git a/homeassistant/components/recorder/websocket_api.py b/homeassistant/components/recorder/websocket_api.py index 585641665af..a851d2681f4 100644 --- a/homeassistant/components/recorder/websocket_api.py +++ b/homeassistant/components/recorder/websocket_api.py @@ -148,7 +148,7 @@ def ws_info( """Return status of the recorder.""" instance: Recorder = hass.data[DATA_INSTANCE] - backlog = instance.queue.qsize() if instance and instance.queue else None + backlog = instance.backlog if instance else None migration_in_progress = async_migration_in_progress(hass) recording = instance.recording if instance else False thread_alive = instance.is_alive() if instance else False diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 1e8ff89f20d..9bfca76394b 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -1395,7 +1395,7 @@ async def test_database_lock_timeout(hass, recorder_mock): self.event.wait() block_task = BlockQueue() - instance.queue.put(block_task) + instance.queue_task(block_task) with patch.object(recorder.core, "DB_LOCK_TIMEOUT", 0.1): try: with pytest.raises(TimeoutError): diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index 7c3a2adcdc1..2d711f17cf3 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -557,7 +557,7 @@ async def test_purge_cutoff_date( assert events.filter(Events.event_type == "PURGE").count() == rows - 1 assert events.filter(Events.event_type == "KEEP").count() == 1 - instance.queue.put(PurgeTask(cutoff, repack=False, apply_filter=False)) + instance.queue_task(PurgeTask(cutoff, repack=False, apply_filter=False)) await hass.async_block_till_done() await async_recorder_block_till_done(hass) await async_wait_purge_done(hass) @@ -588,7 +588,7 @@ async def test_purge_cutoff_date( assert events.filter(Events.event_type == "KEEP").count() == 1 # Make sure we can purge everything - instance.queue.put(PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False)) + instance.queue_task(PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False)) await async_recorder_block_till_done(hass) await async_wait_purge_done(hass) @@ -599,7 +599,7 @@ async def test_purge_cutoff_date( assert state_attributes.count() == 0 # Make sure we can purge everything when the db is already empty - instance.queue.put(PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False)) + instance.queue_task(PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False)) await async_recorder_block_till_done(hass) await async_wait_purge_done(hass)