Improve nest camera stream expiration to be defensive against errors (#130265)

pull/130713/head
Allen Porter 2024-11-10 03:01:59 -08:00 committed by Franck Nijhof
parent 83baa1a788
commit d408b7ac62
No known key found for this signature in database
GPG Key ID: D62583BA8AB11CA3
2 changed files with 144 additions and 76 deletions

View File

@ -2,9 +2,9 @@
from __future__ import annotations from __future__ import annotations
from abc import ABC, abstractmethod from abc import ABC
import asyncio import asyncio
from collections.abc import Callable from collections.abc import Awaitable, Callable
import datetime import datetime
import functools import functools
import logging import logging
@ -46,6 +46,11 @@ PLACEHOLDER = Path(__file__).parent / "placeholder.png"
# Used to schedule an alarm to refresh the stream before expiration # Used to schedule an alarm to refresh the stream before expiration
STREAM_EXPIRATION_BUFFER = datetime.timedelta(seconds=30) STREAM_EXPIRATION_BUFFER = datetime.timedelta(seconds=30)
# Refresh streams with a bounded interval and backoff on failure
MIN_REFRESH_BACKOFF_INTERVAL = datetime.timedelta(minutes=1)
MAX_REFRESH_BACKOFF_INTERVAL = datetime.timedelta(minutes=10)
BACKOFF_MULTIPLIER = 1.5
async def async_setup_entry( async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
@ -67,6 +72,68 @@ async def async_setup_entry(
async_add_entities(entities) async_add_entities(entities)
class StreamRefresh:
"""Class that will refresh an expiring stream.
This class will schedule an alarm for the next expiration time of a stream.
When the alarm fires, it runs the provided `refresh_cb` to extend the
lifetime of the stream and return a new expiration time.
A simple backoff will be applied when the refresh callback fails.
"""
def __init__(
self,
hass: HomeAssistant,
expires_at: datetime.datetime,
refresh_cb: Callable[[], Awaitable[datetime.datetime | None]],
) -> None:
"""Initialize StreamRefresh."""
self._hass = hass
self._unsub: Callable[[], None] | None = None
self._min_refresh_interval = MIN_REFRESH_BACKOFF_INTERVAL
self._refresh_cb = refresh_cb
self._schedule_stream_refresh(expires_at - STREAM_EXPIRATION_BUFFER)
def unsub(self) -> None:
"""Invalidates the stream."""
if self._unsub:
self._unsub()
async def _handle_refresh(self, _: datetime.datetime) -> None:
"""Alarm that fires to check if the stream should be refreshed."""
self._unsub = None
try:
expires_at = await self._refresh_cb()
except ApiException as err:
_LOGGER.debug("Failed to refresh stream: %s", err)
# Increase backoff until the max backoff interval is reached
self._min_refresh_interval = min(
self._min_refresh_interval * BACKOFF_MULTIPLIER,
MAX_REFRESH_BACKOFF_INTERVAL,
)
refresh_time = utcnow() + self._min_refresh_interval
else:
if expires_at is None:
return
self._min_refresh_interval = MIN_REFRESH_BACKOFF_INTERVAL # Reset backoff
# Defend against invalid stream expiration time in the past
refresh_time = max(
expires_at - STREAM_EXPIRATION_BUFFER,
utcnow() + self._min_refresh_interval,
)
self._schedule_stream_refresh(refresh_time)
def _schedule_stream_refresh(self, refresh_time: datetime.datetime) -> None:
"""Schedules an alarm to refresh any streams before expiration."""
_LOGGER.debug("Scheduling stream refresh for %s", refresh_time)
self._unsub = async_track_point_in_utc_time(
self._hass,
self._handle_refresh,
refresh_time,
)
class NestCameraBaseEntity(Camera, ABC): class NestCameraBaseEntity(Camera, ABC):
"""Devices that support cameras.""" """Devices that support cameras."""
@ -86,41 +153,6 @@ class NestCameraBaseEntity(Camera, ABC):
self.stream_options[CONF_EXTRA_PART_WAIT_TIME] = 3 self.stream_options[CONF_EXTRA_PART_WAIT_TIME] = 3
# The API "name" field is a unique device identifier. # The API "name" field is a unique device identifier.
self._attr_unique_id = f"{self._device.name}-camera" self._attr_unique_id = f"{self._device.name}-camera"
self._stream_refresh_unsub: Callable[[], None] | None = None
@abstractmethod
def _stream_expires_at(self) -> datetime.datetime | None:
"""Next time when a stream expires."""
@abstractmethod
async def _async_refresh_stream(self) -> None:
"""Refresh any stream to extend expiration time."""
def _schedule_stream_refresh(self) -> None:
"""Schedules an alarm to refresh any streams before expiration."""
if self._stream_refresh_unsub is not None:
self._stream_refresh_unsub()
expiration_time = self._stream_expires_at()
if not expiration_time:
return
refresh_time = expiration_time - STREAM_EXPIRATION_BUFFER
_LOGGER.debug("Scheduled next stream refresh for %s", refresh_time)
self._stream_refresh_unsub = async_track_point_in_utc_time(
self.hass,
self._handle_stream_refresh,
refresh_time,
)
async def _handle_stream_refresh(self, _: datetime.datetime) -> None:
"""Alarm that fires to check if the stream should be refreshed."""
_LOGGER.debug("Examining streams to refresh")
self._stream_refresh_unsub = None
try:
await self._async_refresh_stream()
finally:
self._schedule_stream_refresh()
async def async_added_to_hass(self) -> None: async def async_added_to_hass(self) -> None:
"""Run when entity is added to register update signal handler.""" """Run when entity is added to register update signal handler."""
@ -128,12 +160,6 @@ class NestCameraBaseEntity(Camera, ABC):
self._device.add_update_listener(self.async_write_ha_state) self._device.add_update_listener(self.async_write_ha_state)
) )
async def async_will_remove_from_hass(self) -> None:
"""Invalidates the RTSP token when unloaded."""
await super().async_will_remove_from_hass()
if self._stream_refresh_unsub:
self._stream_refresh_unsub()
class NestRTSPEntity(NestCameraBaseEntity): class NestRTSPEntity(NestCameraBaseEntity):
"""Nest cameras that use RTSP.""" """Nest cameras that use RTSP."""
@ -146,6 +172,7 @@ class NestRTSPEntity(NestCameraBaseEntity):
super().__init__(device) super().__init__(device)
self._create_stream_url_lock = asyncio.Lock() self._create_stream_url_lock = asyncio.Lock()
self._rtsp_live_stream_trait = device.traits[CameraLiveStreamTrait.NAME] self._rtsp_live_stream_trait = device.traits[CameraLiveStreamTrait.NAME]
self._refresh_unsub: Callable[[], None] | None = None
@property @property
def use_stream_for_stills(self) -> bool: def use_stream_for_stills(self) -> bool:
@ -173,20 +200,21 @@ class NestRTSPEntity(NestCameraBaseEntity):
) )
except ApiException as err: except ApiException as err:
raise HomeAssistantError(f"Nest API error: {err}") from err raise HomeAssistantError(f"Nest API error: {err}") from err
self._schedule_stream_refresh() refresh = StreamRefresh(
self.hass,
self._rtsp_stream.expires_at,
self._async_refresh_stream,
)
self._refresh_unsub = refresh.unsub
assert self._rtsp_stream assert self._rtsp_stream
if self._rtsp_stream.expires_at < utcnow(): if self._rtsp_stream.expires_at < utcnow():
_LOGGER.warning("Stream already expired") _LOGGER.warning("Stream already expired")
return self._rtsp_stream.rtsp_stream_url return self._rtsp_stream.rtsp_stream_url
def _stream_expires_at(self) -> datetime.datetime | None: async def _async_refresh_stream(self) -> datetime.datetime | None:
"""Next time when a stream expires."""
return self._rtsp_stream.expires_at if self._rtsp_stream else None
async def _async_refresh_stream(self) -> None:
"""Refresh stream to extend expiration time.""" """Refresh stream to extend expiration time."""
if not self._rtsp_stream: if not self._rtsp_stream:
return return None
_LOGGER.debug("Extending RTSP stream") _LOGGER.debug("Extending RTSP stream")
try: try:
self._rtsp_stream = await self._rtsp_stream.extend_rtsp_stream() self._rtsp_stream = await self._rtsp_stream.extend_rtsp_stream()
@ -197,14 +225,17 @@ class NestRTSPEntity(NestCameraBaseEntity):
if self.stream: if self.stream:
await self.stream.stop() await self.stream.stop()
self.stream = None self.stream = None
return return None
# Update the stream worker with the latest valid url # Update the stream worker with the latest valid url
if self.stream: if self.stream:
self.stream.update_source(self._rtsp_stream.rtsp_stream_url) self.stream.update_source(self._rtsp_stream.rtsp_stream_url)
return self._rtsp_stream.expires_at
async def async_will_remove_from_hass(self) -> None: async def async_will_remove_from_hass(self) -> None:
"""Invalidates the RTSP token when unloaded.""" """Invalidates the RTSP token when unloaded."""
await super().async_will_remove_from_hass() await super().async_will_remove_from_hass()
if self._refresh_unsub is not None:
self._refresh_unsub()
if self._rtsp_stream: if self._rtsp_stream:
try: try:
await self._rtsp_stream.stop_stream() await self._rtsp_stream.stop_stream()
@ -220,37 +251,23 @@ class NestWebRTCEntity(NestCameraBaseEntity):
"""Initialize the camera.""" """Initialize the camera."""
super().__init__(device) super().__init__(device)
self._webrtc_sessions: dict[str, WebRtcStream] = {} self._webrtc_sessions: dict[str, WebRtcStream] = {}
self._refresh_unsub: dict[str, Callable[[], None]] = {}
@property @property
def frontend_stream_type(self) -> StreamType | None: def frontend_stream_type(self) -> StreamType | None:
"""Return the type of stream supported by this camera.""" """Return the type of stream supported by this camera."""
return StreamType.WEB_RTC return StreamType.WEB_RTC
def _stream_expires_at(self) -> datetime.datetime | None: async def _async_refresh_stream(self, session_id: str) -> datetime.datetime | None:
"""Next time when a stream expires."""
if not self._webrtc_sessions:
return None
return min(stream.expires_at for stream in self._webrtc_sessions.values())
async def _async_refresh_stream(self) -> None:
"""Refresh stream to extend expiration time.""" """Refresh stream to extend expiration time."""
now = utcnow() if not (webrtc_stream := self._webrtc_sessions.get(session_id)):
for session_id, webrtc_stream in list(self._webrtc_sessions.items()): return None
if session_id not in self._webrtc_sessions:
continue
if now < (webrtc_stream.expires_at - STREAM_EXPIRATION_BUFFER):
_LOGGER.debug(
"Stream does not yet expire: %s", webrtc_stream.expires_at
)
continue
_LOGGER.debug("Extending WebRTC stream %s", webrtc_stream.media_session_id) _LOGGER.debug("Extending WebRTC stream %s", webrtc_stream.media_session_id)
try:
webrtc_stream = await webrtc_stream.extend_stream() webrtc_stream = await webrtc_stream.extend_stream()
except ApiException as err:
_LOGGER.debug("Failed to extend stream: %s", err)
else:
if session_id in self._webrtc_sessions: if session_id in self._webrtc_sessions:
self._webrtc_sessions[session_id] = webrtc_stream self._webrtc_sessions[session_id] = webrtc_stream
return webrtc_stream.expires_at
return None
async def async_camera_image( async def async_camera_image(
self, width: int | None = None, height: int | None = None self, width: int | None = None, height: int | None = None
@ -278,7 +295,12 @@ class NestWebRTCEntity(NestCameraBaseEntity):
) )
self._webrtc_sessions[session_id] = stream self._webrtc_sessions[session_id] = stream
send_message(WebRTCAnswer(stream.answer_sdp)) send_message(WebRTCAnswer(stream.answer_sdp))
self._schedule_stream_refresh() refresh = StreamRefresh(
self.hass,
stream.expires_at,
functools.partial(self._async_refresh_stream, session_id),
)
self._refresh_unsub[session_id] = refresh.unsub
@callback @callback
def close_webrtc_session(self, session_id: str) -> None: def close_webrtc_session(self, session_id: str) -> None:
@ -287,6 +309,8 @@ class NestWebRTCEntity(NestCameraBaseEntity):
_LOGGER.debug( _LOGGER.debug(
"Closing WebRTC session %s, %s", session_id, stream.media_session_id "Closing WebRTC session %s, %s", session_id, stream.media_session_id
) )
unsub = self._refresh_unsub.pop(session_id)
unsub()
async def stop_stream() -> None: async def stop_stream() -> None:
try: try:

View File

@ -483,6 +483,50 @@ async def test_stream_response_already_expired(
assert stream_source == "rtsp://some/url?auth=g.2.streamingToken" assert stream_source == "rtsp://some/url?auth=g.2.streamingToken"
async def test_extending_stream_already_expired(
hass: HomeAssistant,
auth: FakeAuth,
setup_platform: PlatformSetup,
camera_device: None,
) -> None:
"""Test a API response when extending the stream returns an expired stream url."""
now = utcnow()
stream_1_expiration = now + datetime.timedelta(seconds=180)
stream_2_expiration = now + datetime.timedelta(seconds=30) # Will be in the past
stream_3_expiration = now + datetime.timedelta(seconds=600)
auth.responses = [
make_stream_url_response(stream_1_expiration, token_num=1),
make_stream_url_response(stream_2_expiration, token_num=2),
make_stream_url_response(stream_3_expiration, token_num=3),
]
await setup_platform()
assert len(hass.states.async_all()) == 1
cam = hass.states.get("camera.my_camera")
assert cam is not None
assert cam.state == CameraState.STREAMING
# The stream is expired, but we return it anyway
stream_source = await camera.async_get_stream_source(hass, "camera.my_camera")
assert stream_source == "rtsp://some/url?auth=g.1.streamingToken"
# Jump to when the stream will be refreshed
await fire_alarm(hass, now + datetime.timedelta(seconds=160))
stream_source = await camera.async_get_stream_source(hass, "camera.my_camera")
assert stream_source == "rtsp://some/url?auth=g.2.streamingToken"
# The stream will have expired in the past, but 1 minute min refresh interval is applied.
# The stream token is not updated.
await fire_alarm(hass, now + datetime.timedelta(seconds=170))
stream_source = await camera.async_get_stream_source(hass, "camera.my_camera")
assert stream_source == "rtsp://some/url?auth=g.2.streamingToken"
# Now go past the min update interval and the stream is refreshed
await fire_alarm(hass, now + datetime.timedelta(seconds=225))
stream_source = await camera.async_get_stream_source(hass, "camera.my_camera")
assert stream_source == "rtsp://some/url?auth=g.3.streamingToken"
async def test_camera_removed( async def test_camera_removed(
hass: HomeAssistant, hass: HomeAssistant,
auth: FakeAuth, auth: FakeAuth,