Fix migration when encountering a NULL entity_id/event_type (#90542)
* Fix migration when encountering a NULL entity_id/event_type reported in #beta on discord * simplifypull/90556/head
parent
6b0c98045e
commit
a2efe2445a
|
@ -1445,12 +1445,15 @@ def migrate_event_type_ids(instance: Recorder) -> bool:
|
|||
with session_scope(session=session_maker()) as session:
|
||||
if events := session.execute(find_event_type_to_migrate()).all():
|
||||
event_types = {event_type for _, event_type in events}
|
||||
if None in event_types:
|
||||
# event_type should never be None but we need to be defensive
|
||||
# so we don't fail the migration because of a bad state
|
||||
event_types.remove(None)
|
||||
event_types.add(_EMPTY_EVENT_TYPE)
|
||||
|
||||
event_type_to_id = event_type_manager.get_many(event_types, session)
|
||||
if missing_event_types := {
|
||||
# We should never see see None for the event_Type in the events table
|
||||
# but we need to be defensive so we don't fail the migration
|
||||
# because of a bad event
|
||||
_EMPTY_EVENT_TYPE if event_type is None else event_type
|
||||
event_type
|
||||
for event_type, event_id in event_type_to_id.items()
|
||||
if event_id is None
|
||||
}:
|
||||
|
@ -1476,7 +1479,9 @@ def migrate_event_type_ids(instance: Recorder) -> bool:
|
|||
{
|
||||
"event_id": event_id,
|
||||
"event_type": None,
|
||||
"event_type_id": event_type_to_id[event_type],
|
||||
"event_type_id": event_type_to_id[
|
||||
_EMPTY_EVENT_TYPE if event_type is None else event_type
|
||||
],
|
||||
}
|
||||
for event_id, event_type in events
|
||||
],
|
||||
|
@ -1508,14 +1513,17 @@ def migrate_entity_ids(instance: Recorder) -> bool:
|
|||
with session_scope(session=instance.get_session()) as session:
|
||||
if states := session.execute(find_entity_ids_to_migrate()).all():
|
||||
entity_ids = {entity_id for _, entity_id in states}
|
||||
if None in entity_ids:
|
||||
# entity_id should never be None but we need to be defensive
|
||||
# so we don't fail the migration because of a bad state
|
||||
entity_ids.remove(None)
|
||||
entity_ids.add(_EMPTY_ENTITY_ID)
|
||||
|
||||
entity_id_to_metadata_id = states_meta_manager.get_many(
|
||||
entity_ids, session, True
|
||||
)
|
||||
if missing_entity_ids := {
|
||||
# We should never see _EMPTY_ENTITY_ID in the states table
|
||||
# but we need to be defensive so we don't fail the migration
|
||||
# because of a bad state
|
||||
_EMPTY_ENTITY_ID if entity_id is None else entity_id
|
||||
entity_id
|
||||
for entity_id, metadata_id in entity_id_to_metadata_id.items()
|
||||
if metadata_id is None
|
||||
}:
|
||||
|
@ -1543,7 +1551,9 @@ def migrate_entity_ids(instance: Recorder) -> bool:
|
|||
# the history queries still need to work while the
|
||||
# migration is in progress and we will do this in
|
||||
# post_migrate_entity_ids
|
||||
"metadata_id": entity_id_to_metadata_id[entity_id],
|
||||
"metadata_id": entity_id_to_metadata_id[
|
||||
_EMPTY_ENTITY_ID if entity_id is None else entity_id
|
||||
],
|
||||
}
|
||||
for state_id, entity_id in states
|
||||
],
|
||||
|
|
|
@ -998,7 +998,7 @@ async def test_migrate_entity_ids(
|
|||
instance = await async_setup_recorder_instance(hass)
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
def _insert_events():
|
||||
def _insert_states():
|
||||
with session_scope(hass=hass) as session:
|
||||
session.add_all(
|
||||
(
|
||||
|
@ -1020,7 +1020,7 @@ async def test_migrate_entity_ids(
|
|||
)
|
||||
)
|
||||
|
||||
await instance.async_add_executor_job(_insert_events)
|
||||
await instance.async_add_executor_job(_insert_states)
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
# This is a threadsafe way to add a task to the recorder
|
||||
|
@ -1106,3 +1106,149 @@ async def test_post_migrate_entity_ids(
|
|||
assert states_by_state["one_1"] is None
|
||||
assert states_by_state["two_2"] is None
|
||||
assert states_by_state["two_1"] is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("enable_migrate_entity_ids", [True])
|
||||
async def test_migrate_null_entity_ids(
|
||||
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Test we can migrate entity_ids to the StatesMeta table."""
|
||||
instance = await async_setup_recorder_instance(hass)
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
def _insert_states():
|
||||
with session_scope(hass=hass) as session:
|
||||
session.add(
|
||||
States(
|
||||
entity_id="sensor.one",
|
||||
state="one_1",
|
||||
last_updated_ts=1.452529,
|
||||
),
|
||||
)
|
||||
session.add_all(
|
||||
States(
|
||||
entity_id=None,
|
||||
state="empty",
|
||||
last_updated_ts=time + 1.452529,
|
||||
)
|
||||
for time in range(1000)
|
||||
)
|
||||
session.add(
|
||||
States(
|
||||
entity_id="sensor.one",
|
||||
state="one_1",
|
||||
last_updated_ts=2.452529,
|
||||
),
|
||||
)
|
||||
|
||||
await instance.async_add_executor_job(_insert_states)
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
# This is a threadsafe way to add a task to the recorder
|
||||
instance.queue_task(EntityIDMigrationTask())
|
||||
await async_recorder_block_till_done(hass)
|
||||
await async_recorder_block_till_done(hass)
|
||||
|
||||
def _fetch_migrated_states():
|
||||
with session_scope(hass=hass) as session:
|
||||
states = (
|
||||
session.query(
|
||||
States.state,
|
||||
States.metadata_id,
|
||||
States.last_updated_ts,
|
||||
StatesMeta.entity_id,
|
||||
)
|
||||
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
|
||||
.all()
|
||||
)
|
||||
assert len(states) == 1002
|
||||
result = {}
|
||||
for state in states:
|
||||
result.setdefault(state.entity_id, []).append(
|
||||
{
|
||||
"state_id": state.entity_id,
|
||||
"last_updated_ts": state.last_updated_ts,
|
||||
"state": state.state,
|
||||
}
|
||||
)
|
||||
return result
|
||||
|
||||
states_by_entity_id = await instance.async_add_executor_job(_fetch_migrated_states)
|
||||
assert len(states_by_entity_id[migration._EMPTY_ENTITY_ID]) == 1000
|
||||
assert len(states_by_entity_id["sensor.one"]) == 2
|
||||
|
||||
|
||||
@pytest.mark.parametrize("enable_migrate_event_type_ids", [True])
|
||||
async def test_migrate_null_event_type_ids(
|
||||
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Test we can migrate event_types to the EventTypes table when the event_type is NULL."""
|
||||
instance = await async_setup_recorder_instance(hass)
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
def _insert_events():
|
||||
with session_scope(hass=hass) as session:
|
||||
session.add(
|
||||
Events(
|
||||
event_type="event_type_one",
|
||||
origin_idx=0,
|
||||
time_fired_ts=1.452529,
|
||||
),
|
||||
)
|
||||
session.add_all(
|
||||
Events(
|
||||
event_type=None,
|
||||
origin_idx=0,
|
||||
time_fired_ts=time + 1.452529,
|
||||
)
|
||||
for time in range(1000)
|
||||
)
|
||||
session.add(
|
||||
Events(
|
||||
event_type="event_type_one",
|
||||
origin_idx=0,
|
||||
time_fired_ts=2.452529,
|
||||
),
|
||||
)
|
||||
|
||||
await instance.async_add_executor_job(_insert_events)
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
# This is a threadsafe way to add a task to the recorder
|
||||
|
||||
instance.queue_task(EventTypeIDMigrationTask())
|
||||
await async_recorder_block_till_done(hass)
|
||||
await async_recorder_block_till_done(hass)
|
||||
|
||||
def _fetch_migrated_events():
|
||||
with session_scope(hass=hass) as session:
|
||||
events = (
|
||||
session.query(Events.event_id, Events.time_fired, EventTypes.event_type)
|
||||
.filter(
|
||||
Events.event_type_id.in_(
|
||||
select_event_type_ids(
|
||||
(
|
||||
"event_type_one",
|
||||
migration._EMPTY_EVENT_TYPE,
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
.outerjoin(EventTypes, Events.event_type_id == EventTypes.event_type_id)
|
||||
.all()
|
||||
)
|
||||
assert len(events) == 1002
|
||||
result = {}
|
||||
for event in events:
|
||||
result.setdefault(event.event_type, []).append(
|
||||
{
|
||||
"event_id": event.event_id,
|
||||
"time_fired": event.time_fired,
|
||||
"event_type": event.event_type,
|
||||
}
|
||||
)
|
||||
return result
|
||||
|
||||
events_by_type = await instance.async_add_executor_job(_fetch_migrated_events)
|
||||
assert len(events_by_type["event_type_one"]) == 2
|
||||
assert len(events_by_type[migration._EMPTY_EVENT_TYPE]) == 1000
|
||||
|
|
Loading…
Reference in New Issue