Shutdown database engine before waiting for executor shutdown (#117339)

* Close database connection before stopping the executor

related issue #117004

* Close database connection before waiting for database executor to finish

related issue #117004

* fix test

* fix

* drop unused arg

* Revert "drop unused arg"

This reverts commit 8a9fe6a24a.

* docstring

* comment
pull/124433/head
J. Nick Koston 2024-08-22 09:47:22 -05:00 committed by GitHub
parent 890c6e97fd
commit 6f66f37fc7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 29 additions and 21 deletions

View File

@ -367,13 +367,6 @@ class Recorder(threading.Thread):
"""Add an executor job from within the event loop."""
return self.hass.loop.run_in_executor(self._db_executor, target, *args)
def _stop_executor(self) -> None:
"""Stop the executor."""
if self._db_executor is None:
return
self._db_executor.shutdown()
self._db_executor = None
@callback
def _async_check_queue(self, *_: Any) -> None:
"""Periodic check of the queue size to ensure we do not exhaust memory.
@ -1501,5 +1494,13 @@ class Recorder(threading.Thread):
try:
self._end_session()
finally:
self._stop_executor()
if self._db_executor:
# We shutdown the executor without forcefully
# joining the threads until after we have tried
# to cleanly close the connection.
self._db_executor.shutdown(join_threads_or_timeout=False)
self._close_connection()
if self._db_executor:
# After the connection is closed, we can join the threads
# or forcefully shutdown the threads if they take too long.
self._db_executor.join_threads_or_timeout()

View File

@ -63,10 +63,18 @@ def join_or_interrupt_threads(
class InterruptibleThreadPoolExecutor(ThreadPoolExecutor):
"""A ThreadPoolExecutor instance that will not deadlock on shutdown."""
def shutdown(self, *args: Any, **kwargs: Any) -> None:
"""Shutdown with interrupt support added."""
def shutdown(
self, *args: Any, join_threads_or_timeout: bool = True, **kwargs: Any
) -> None:
"""Shutdown with interrupt support added.
By default shutdown will wait for threads to finish up
to the timeout before forcefully stopping them. This can
be disabled by setting `join_threads_or_timeout` to False.
"""
super().shutdown(wait=False, cancel_futures=True)
self.join_threads_or_timeout()
if join_threads_or_timeout:
self.join_threads_or_timeout()
def join_threads_or_timeout(self) -> None:
"""Join threads or timeout."""

View File

@ -166,11 +166,10 @@ async def test_shutdown_before_startup_finishes(
await hass.async_block_till_done()
await hass.async_stop()
def _run_information_with_session():
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
return run_information_with_session(session)
run_info = await instance.async_add_executor_job(_run_information_with_session)
# The database executor is shutdown so we must run the
# query in the main thread for testing
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
run_info = run_information_with_session(session)
assert run_info.run_id == 1
assert run_info.start is not None
@ -216,8 +215,7 @@ async def test_shutdown_closes_connections(
instance = recorder.get_instance(hass)
await instance.async_db_ready
await hass.async_block_till_done()
pool = instance.engine.pool
pool.shutdown = Mock()
pool = instance.engine
def _ensure_connected():
with session_scope(hass=hass, read_only=True) as session:
@ -225,10 +223,11 @@ async def test_shutdown_closes_connections(
await instance.async_add_executor_job(_ensure_connected)
hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
await hass.async_block_till_done()
with patch.object(pool, "dispose", wraps=pool.dispose) as dispose:
hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
await hass.async_block_till_done()
assert len(pool.shutdown.mock_calls) == 1
assert len(dispose.mock_calls) == 1
with pytest.raises(RuntimeError):
assert instance.get_session()