diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index a307864467a..0858b3f93e2 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -24,8 +24,8 @@ from sqlalchemy.orm.session import Session from homeassistant.components import persistent_notification from homeassistant.const import ( ATTR_ENTITY_ID, + EVENT_HOMEASSISTANT_CLOSE, EVENT_HOMEASSISTANT_FINAL_WRITE, - EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, MATCH_ALL, ) @@ -404,9 +404,8 @@ class Recorder(threading.Thread): # Unknown what it is. return True - @callback - def _async_empty_queue(self, event: Event) -> None: - """Empty the queue if its still present at final write.""" + async def _async_close(self, event: Event) -> None: + """Empty the queue if its still present at close.""" # If the queue is full of events to be processed because # the database is so broken that every event results in a retry @@ -421,9 +420,10 @@ class Recorder(threading.Thread): except queue.Empty: break self.queue_task(StopTask()) + await self.hass.async_add_executor_job(self.join) async def _async_shutdown(self, event: Event) -> None: - """Shut down the Recorder.""" + """Shut down the Recorder at final write.""" if not self._hass_started.done(): self._hass_started.set_result(SHUTDOWN_TASK) self.queue_task(StopTask()) @@ -439,8 +439,8 @@ class Recorder(threading.Thread): def async_register(self) -> None: """Post connection initialize.""" bus = self.hass.bus - bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, self._async_empty_queue) - bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self._async_shutdown) + bus.async_listen_once(EVENT_HOMEASSISTANT_CLOSE, self._async_close) + bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, self._async_shutdown) async_at_started(self.hass, self._async_hass_started) @callback diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 7b7893a069f..476d7dff41a 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -59,6 +59,7 @@ from homeassistant.components.recorder.services import ( from homeassistant.components.recorder.util import session_scope from homeassistant.const import ( EVENT_COMPONENT_LOADED, + EVENT_HOMEASSISTANT_CLOSE, EVENT_HOMEASSISTANT_FINAL_WRITE, EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP, @@ -134,7 +135,7 @@ async def test_shutdown_before_startup_finishes( session = await hass.async_add_executor_job(instance.get_session) with patch.object(instance, "engine"): - hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP) + hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE) await hass.async_block_till_done() await hass.async_stop() @@ -193,7 +194,7 @@ async def test_shutdown_closes_connections( await instance.async_add_executor_job(_ensure_connected) - hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP) + hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE) await hass.async_block_till_done() assert len(pool.shutdown.mock_calls) == 1 @@ -460,8 +461,8 @@ async def test_force_shutdown_with_queue_of_writes_that_generate_exceptions( hass.states.async_set(entity_id, "on", attributes) hass.states.async_set(entity_id, "off", attributes) - hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP) hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE) + hass.bus.async_fire(EVENT_HOMEASSISTANT_CLOSE) await hass.async_block_till_done() assert "Error executing query" in caplog.text @@ -2327,3 +2328,49 @@ async def test_clean_shutdown_when_schema_migration_fails(hass: HomeAssistant) - instance = recorder.get_instance(hass) await hass.async_stop() assert instance.engine is None + + +async def test_events_are_recorded_until_final_write( + async_setup_recorder_instance: RecorderInstanceGenerator, + hass: HomeAssistant, +) -> None: + """Test that events are recorded until the final write.""" + instance = await async_setup_recorder_instance(hass, {}) + await hass.async_block_till_done() + hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP) + await hass.async_block_till_done() + hass.bus.async_fire("fake_event") + await async_wait_recording_done(hass) + + def get_events() -> list[Event]: + events: list[Event] = [] + with session_scope(hass=hass, read_only=True) as session: + for select_event, event_types in ( + session.query(Events, EventTypes) + .filter( + Events.event_type_id.in_( + select_event_type_ids(("fake_event", "after_final_write")) + ) + ) + .outerjoin( + EventTypes, (Events.event_type_id == EventTypes.event_type_id) + ) + ): + select_event = cast(Events, select_event) + event_types = cast(EventTypes, event_types) + + native_event = select_event.to_native() + native_event.event_type = event_types.event_type + events.append(native_event) + + return events + + events = await instance.async_add_executor_job(get_events) + assert len(events) == 1 + db_event = events[0] + assert db_event.event_type == "fake_event" + + hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE) + await hass.async_block_till_done() + + assert not instance.engine