Continue recording events until final write (#91260)

pull/84934/head^2
J. Nick Koston 2023-04-16 09:22:47 -10:00 committed by GitHub
parent 00191ace6c
commit fdc6cf3472
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 10 deletions

View File

@ -24,8 +24,8 @@ from sqlalchemy.orm.session import Session
from homeassistant.components import persistent_notification
from homeassistant.const import (
ATTR_ENTITY_ID,
EVENT_HOMEASSISTANT_CLOSE,
EVENT_HOMEASSISTANT_FINAL_WRITE,
EVENT_HOMEASSISTANT_STOP,
EVENT_STATE_CHANGED,
MATCH_ALL,
)
@ -404,9 +404,8 @@ class Recorder(threading.Thread):
# Unknown what it is.
return True
@callback
def _async_empty_queue(self, event: Event) -> None:
"""Empty the queue if its still present at final write."""
async def _async_close(self, event: Event) -> None:
"""Empty the queue if its still present at close."""
# If the queue is full of events to be processed because
# the database is so broken that every event results in a retry
@ -421,9 +420,10 @@ class Recorder(threading.Thread):
except queue.Empty:
break
self.queue_task(StopTask())
await self.hass.async_add_executor_job(self.join)
async def _async_shutdown(self, event: Event) -> None:
"""Shut down the Recorder."""
"""Shut down the Recorder at final write."""
if not self._hass_started.done():
self._hass_started.set_result(SHUTDOWN_TASK)
self.queue_task(StopTask())
@ -439,8 +439,8 @@ class Recorder(threading.Thread):
def async_register(self) -> None:
"""Post connection initialize."""
bus = self.hass.bus
bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, self._async_empty_queue)
bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self._async_shutdown)
bus.async_listen_once(EVENT_HOMEASSISTANT_CLOSE, self._async_close)
bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, self._async_shutdown)
async_at_started(self.hass, self._async_hass_started)
@callback

View File

@ -59,6 +59,7 @@ from homeassistant.components.recorder.services import (
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import (
EVENT_COMPONENT_LOADED,
EVENT_HOMEASSISTANT_CLOSE,
EVENT_HOMEASSISTANT_FINAL_WRITE,
EVENT_HOMEASSISTANT_STARTED,
EVENT_HOMEASSISTANT_STOP,
@ -134,7 +135,7 @@ async def test_shutdown_before_startup_finishes(
session = await hass.async_add_executor_job(instance.get_session)
with patch.object(instance, "engine"):
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
await hass.async_block_till_done()
await hass.async_stop()
@ -193,7 +194,7 @@ async def test_shutdown_closes_connections(
await instance.async_add_executor_job(_ensure_connected)
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
await hass.async_block_till_done()
assert len(pool.shutdown.mock_calls) == 1
@ -460,8 +461,8 @@ async def test_force_shutdown_with_queue_of_writes_that_generate_exceptions(
hass.states.async_set(entity_id, "on", attributes)
hass.states.async_set(entity_id, "off", attributes)
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
hass.bus.async_fire(EVENT_HOMEASSISTANT_CLOSE)
await hass.async_block_till_done()
assert "Error executing query" in caplog.text
@ -2327,3 +2328,49 @@ async def test_clean_shutdown_when_schema_migration_fails(hass: HomeAssistant) -
instance = recorder.get_instance(hass)
await hass.async_stop()
assert instance.engine is None
async def test_events_are_recorded_until_final_write(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
) -> None:
"""Test that events are recorded until the final write."""
instance = await async_setup_recorder_instance(hass, {})
await hass.async_block_till_done()
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
await hass.async_block_till_done()
hass.bus.async_fire("fake_event")
await async_wait_recording_done(hass)
def get_events() -> list[Event]:
events: list[Event] = []
with session_scope(hass=hass, read_only=True) as session:
for select_event, event_types in (
session.query(Events, EventTypes)
.filter(
Events.event_type_id.in_(
select_event_type_ids(("fake_event", "after_final_write"))
)
)
.outerjoin(
EventTypes, (Events.event_type_id == EventTypes.event_type_id)
)
):
select_event = cast(Events, select_event)
event_types = cast(EventTypes, event_types)
native_event = select_event.to_native()
native_event.event_type = event_types.event_type
events.append(native_event)
return events
events = await instance.async_add_executor_job(get_events)
assert len(events) == 1
db_event = events[0]
assert db_event.event_type == "fake_event"
hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
await hass.async_block_till_done()
assert not instance.engine