Guard against invalid ULIDs in contexts while recording events (#90889)
parent
19f71b3bb9
commit
96c5e845e5
|
@ -3,23 +3,36 @@ from __future__ import annotations
|
|||
|
||||
from contextlib import suppress
|
||||
from functools import lru_cache
|
||||
import logging
|
||||
from uuid import UUID
|
||||
|
||||
from homeassistant.util.ulid import bytes_to_ulid, ulid_to_bytes
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def ulid_to_bytes_or_none(ulid: str | None) -> bytes | None:
|
||||
"""Convert an ulid to bytes."""
|
||||
if ulid is None:
|
||||
return None
|
||||
return ulid_to_bytes(ulid)
|
||||
try:
|
||||
return ulid_to_bytes(ulid)
|
||||
except ValueError as ex:
|
||||
_LOGGER.error("Error converting ulid %s to bytes: %s", ulid, ex, exc_info=True)
|
||||
return None
|
||||
|
||||
|
||||
def bytes_to_ulid_or_none(_bytes: bytes | None) -> str | None:
|
||||
"""Convert bytes to a ulid."""
|
||||
if _bytes is None:
|
||||
return None
|
||||
return bytes_to_ulid(_bytes)
|
||||
try:
|
||||
return bytes_to_ulid(_bytes)
|
||||
except ValueError as ex:
|
||||
_LOGGER.error(
|
||||
"Error converting bytes %s to ulid: %s", _bytes, ex, exc_info=True
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
@lru_cache(maxsize=16)
|
||||
|
|
|
@ -66,7 +66,7 @@ from homeassistant.const import (
|
|||
STATE_LOCKED,
|
||||
STATE_UNLOCKED,
|
||||
)
|
||||
from homeassistant.core import CoreState, Event, HomeAssistant, callback
|
||||
from homeassistant.core import Context, CoreState, Event, HomeAssistant, callback
|
||||
from homeassistant.helpers import entity_registry as er, recorder as recorder_helper
|
||||
from homeassistant.setup import async_setup_component, setup_component
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
@ -854,6 +854,31 @@ def test_saving_event_with_oversized_data(
|
|||
assert json_loads(events["test_event_too_big"]) == {}
|
||||
|
||||
|
||||
def test_saving_event_invalid_context_ulid(
|
||||
hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture
|
||||
) -> None:
|
||||
"""Test we handle invalid manually injected context ids."""
|
||||
hass = hass_recorder()
|
||||
event_data = {"test_attr": 5, "test_attr_10": "nice"}
|
||||
hass.bus.fire("test_event", event_data, context=Context(id="invalid"))
|
||||
wait_recording_done(hass)
|
||||
events = {}
|
||||
|
||||
with session_scope(hass=hass) as session:
|
||||
for _, data, event_type in (
|
||||
session.query(Events.event_id, EventData.shared_data, EventTypes.event_type)
|
||||
.outerjoin(EventData, Events.data_id == EventData.data_id)
|
||||
.outerjoin(EventTypes, Events.event_type_id == EventTypes.event_type_id)
|
||||
.where(EventTypes.event_type.in_(["test_event"]))
|
||||
):
|
||||
events[event_type] = data
|
||||
|
||||
assert "invalid" in caplog.text
|
||||
|
||||
assert len(events) == 1
|
||||
assert json_loads(events["test_event"]) == event_data
|
||||
|
||||
|
||||
def test_recorder_setup_failure(hass: HomeAssistant) -> None:
|
||||
"""Test some exceptions."""
|
||||
recorder_helper.async_initialize_recorder(hass)
|
||||
|
|
|
@ -14,9 +14,11 @@ from homeassistant.components.recorder.db_schema import (
|
|||
)
|
||||
from homeassistant.components.recorder.models import (
|
||||
LazyState,
|
||||
bytes_to_ulid_or_none,
|
||||
process_datetime_to_timestamp,
|
||||
process_timestamp,
|
||||
process_timestamp_to_utc_isoformat,
|
||||
ulid_to_bytes_or_none,
|
||||
)
|
||||
from homeassistant.const import EVENT_STATE_CHANGED
|
||||
import homeassistant.core as ha
|
||||
|
@ -415,3 +417,27 @@ async def test_process_datetime_to_timestamp_mirrors_utc_isoformat_behavior(
|
|||
process_datetime_to_timestamp(datetime_hst_timezone)
|
||||
== dt_util.parse_datetime("2016-07-09T21:00:00+00:00").timestamp()
|
||||
)
|
||||
|
||||
|
||||
def test_ulid_to_bytes_or_none(caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""Test ulid_to_bytes_or_none."""
|
||||
|
||||
assert (
|
||||
ulid_to_bytes_or_none("01EYQZJXZ5Z1Z1Z1Z1Z1Z1Z1Z1")
|
||||
== b"\x01w\xaf\xf9w\xe5\xf8~\x1f\x87\xe1\xf8~\x1f\x87\xe1"
|
||||
)
|
||||
assert ulid_to_bytes_or_none("invalid") is None
|
||||
assert "invalid" in caplog.text
|
||||
assert ulid_to_bytes_or_none(None) is None
|
||||
|
||||
|
||||
def test_bytes_to_ulid_or_none(caplog: pytest.LogCaptureFixture) -> None:
|
||||
"""Test bytes_to_ulid_or_none."""
|
||||
|
||||
assert (
|
||||
bytes_to_ulid_or_none(b"\x01w\xaf\xf9w\xe5\xf8~\x1f\x87\xe1\xf8~\x1f\x87\xe1")
|
||||
== "01EYQZJXZ5Z1Z1Z1Z1Z1Z1Z1Z1"
|
||||
)
|
||||
assert bytes_to_ulid_or_none(b"invalid") is None
|
||||
assert "invalid" in caplog.text
|
||||
assert bytes_to_ulid_or_none(None) is None
|
||||
|
|
Loading…
Reference in New Issue