diff --git a/homeassistant/components/nest/camera.py b/homeassistant/components/nest/camera.py index 2bee54df3dd..4cb88e63641 100644 --- a/homeassistant/components/nest/camera.py +++ b/homeassistant/components/nest/camera.py @@ -2,9 +2,9 @@ from __future__ import annotations -from abc import ABC, abstractmethod +from abc import ABC import asyncio -from collections.abc import Callable +from collections.abc import Awaitable, Callable import datetime import functools import logging @@ -46,6 +46,11 @@ PLACEHOLDER = Path(__file__).parent / "placeholder.png" # Used to schedule an alarm to refresh the stream before expiration STREAM_EXPIRATION_BUFFER = datetime.timedelta(seconds=30) +# Refresh streams with a bounded interval and backoff on failure +MIN_REFRESH_BACKOFF_INTERVAL = datetime.timedelta(minutes=1) +MAX_REFRESH_BACKOFF_INTERVAL = datetime.timedelta(minutes=10) +BACKOFF_MULTIPLIER = 1.5 + async def async_setup_entry( hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback @@ -67,6 +72,68 @@ async def async_setup_entry( async_add_entities(entities) +class StreamRefresh: + """Class that will refresh an expiring stream. + + This class will schedule an alarm for the next expiration time of a stream. + When the alarm fires, it runs the provided `refresh_cb` to extend the + lifetime of the stream and return a new expiration time. + + A simple backoff will be applied when the refresh callback fails. + """ + + def __init__( + self, + hass: HomeAssistant, + expires_at: datetime.datetime, + refresh_cb: Callable[[], Awaitable[datetime.datetime | None]], + ) -> None: + """Initialize StreamRefresh.""" + self._hass = hass + self._unsub: Callable[[], None] | None = None + self._min_refresh_interval = MIN_REFRESH_BACKOFF_INTERVAL + self._refresh_cb = refresh_cb + self._schedule_stream_refresh(expires_at - STREAM_EXPIRATION_BUFFER) + + def unsub(self) -> None: + """Invalidates the stream.""" + if self._unsub: + self._unsub() + + async def _handle_refresh(self, _: datetime.datetime) -> None: + """Alarm that fires to check if the stream should be refreshed.""" + self._unsub = None + try: + expires_at = await self._refresh_cb() + except ApiException as err: + _LOGGER.debug("Failed to refresh stream: %s", err) + # Increase backoff until the max backoff interval is reached + self._min_refresh_interval = min( + self._min_refresh_interval * BACKOFF_MULTIPLIER, + MAX_REFRESH_BACKOFF_INTERVAL, + ) + refresh_time = utcnow() + self._min_refresh_interval + else: + if expires_at is None: + return + self._min_refresh_interval = MIN_REFRESH_BACKOFF_INTERVAL # Reset backoff + # Defend against invalid stream expiration time in the past + refresh_time = max( + expires_at - STREAM_EXPIRATION_BUFFER, + utcnow() + self._min_refresh_interval, + ) + self._schedule_stream_refresh(refresh_time) + + def _schedule_stream_refresh(self, refresh_time: datetime.datetime) -> None: + """Schedules an alarm to refresh any streams before expiration.""" + _LOGGER.debug("Scheduling stream refresh for %s", refresh_time) + self._unsub = async_track_point_in_utc_time( + self._hass, + self._handle_refresh, + refresh_time, + ) + + class NestCameraBaseEntity(Camera, ABC): """Devices that support cameras.""" @@ -86,41 +153,6 @@ class NestCameraBaseEntity(Camera, ABC): self.stream_options[CONF_EXTRA_PART_WAIT_TIME] = 3 # The API "name" field is a unique device identifier. self._attr_unique_id = f"{self._device.name}-camera" - self._stream_refresh_unsub: Callable[[], None] | None = None - - @abstractmethod - def _stream_expires_at(self) -> datetime.datetime | None: - """Next time when a stream expires.""" - - @abstractmethod - async def _async_refresh_stream(self) -> None: - """Refresh any stream to extend expiration time.""" - - def _schedule_stream_refresh(self) -> None: - """Schedules an alarm to refresh any streams before expiration.""" - if self._stream_refresh_unsub is not None: - self._stream_refresh_unsub() - - expiration_time = self._stream_expires_at() - if not expiration_time: - return - refresh_time = expiration_time - STREAM_EXPIRATION_BUFFER - _LOGGER.debug("Scheduled next stream refresh for %s", refresh_time) - - self._stream_refresh_unsub = async_track_point_in_utc_time( - self.hass, - self._handle_stream_refresh, - refresh_time, - ) - - async def _handle_stream_refresh(self, _: datetime.datetime) -> None: - """Alarm that fires to check if the stream should be refreshed.""" - _LOGGER.debug("Examining streams to refresh") - self._stream_refresh_unsub = None - try: - await self._async_refresh_stream() - finally: - self._schedule_stream_refresh() async def async_added_to_hass(self) -> None: """Run when entity is added to register update signal handler.""" @@ -128,12 +160,6 @@ class NestCameraBaseEntity(Camera, ABC): self._device.add_update_listener(self.async_write_ha_state) ) - async def async_will_remove_from_hass(self) -> None: - """Invalidates the RTSP token when unloaded.""" - await super().async_will_remove_from_hass() - if self._stream_refresh_unsub: - self._stream_refresh_unsub() - class NestRTSPEntity(NestCameraBaseEntity): """Nest cameras that use RTSP.""" @@ -146,6 +172,7 @@ class NestRTSPEntity(NestCameraBaseEntity): super().__init__(device) self._create_stream_url_lock = asyncio.Lock() self._rtsp_live_stream_trait = device.traits[CameraLiveStreamTrait.NAME] + self._refresh_unsub: Callable[[], None] | None = None @property def use_stream_for_stills(self) -> bool: @@ -173,20 +200,21 @@ class NestRTSPEntity(NestCameraBaseEntity): ) except ApiException as err: raise HomeAssistantError(f"Nest API error: {err}") from err - self._schedule_stream_refresh() + refresh = StreamRefresh( + self.hass, + self._rtsp_stream.expires_at, + self._async_refresh_stream, + ) + self._refresh_unsub = refresh.unsub assert self._rtsp_stream if self._rtsp_stream.expires_at < utcnow(): _LOGGER.warning("Stream already expired") return self._rtsp_stream.rtsp_stream_url - def _stream_expires_at(self) -> datetime.datetime | None: - """Next time when a stream expires.""" - return self._rtsp_stream.expires_at if self._rtsp_stream else None - - async def _async_refresh_stream(self) -> None: + async def _async_refresh_stream(self) -> datetime.datetime | None: """Refresh stream to extend expiration time.""" if not self._rtsp_stream: - return + return None _LOGGER.debug("Extending RTSP stream") try: self._rtsp_stream = await self._rtsp_stream.extend_rtsp_stream() @@ -197,14 +225,17 @@ class NestRTSPEntity(NestCameraBaseEntity): if self.stream: await self.stream.stop() self.stream = None - return + return None # Update the stream worker with the latest valid url if self.stream: self.stream.update_source(self._rtsp_stream.rtsp_stream_url) + return self._rtsp_stream.expires_at async def async_will_remove_from_hass(self) -> None: """Invalidates the RTSP token when unloaded.""" await super().async_will_remove_from_hass() + if self._refresh_unsub is not None: + self._refresh_unsub() if self._rtsp_stream: try: await self._rtsp_stream.stop_stream() @@ -220,37 +251,23 @@ class NestWebRTCEntity(NestCameraBaseEntity): """Initialize the camera.""" super().__init__(device) self._webrtc_sessions: dict[str, WebRtcStream] = {} + self._refresh_unsub: dict[str, Callable[[], None]] = {} @property def frontend_stream_type(self) -> StreamType | None: """Return the type of stream supported by this camera.""" return StreamType.WEB_RTC - def _stream_expires_at(self) -> datetime.datetime | None: - """Next time when a stream expires.""" - if not self._webrtc_sessions: - return None - return min(stream.expires_at for stream in self._webrtc_sessions.values()) - - async def _async_refresh_stream(self) -> None: + async def _async_refresh_stream(self, session_id: str) -> datetime.datetime | None: """Refresh stream to extend expiration time.""" - now = utcnow() - for session_id, webrtc_stream in list(self._webrtc_sessions.items()): - if session_id not in self._webrtc_sessions: - continue - if now < (webrtc_stream.expires_at - STREAM_EXPIRATION_BUFFER): - _LOGGER.debug( - "Stream does not yet expire: %s", webrtc_stream.expires_at - ) - continue - _LOGGER.debug("Extending WebRTC stream %s", webrtc_stream.media_session_id) - try: - webrtc_stream = await webrtc_stream.extend_stream() - except ApiException as err: - _LOGGER.debug("Failed to extend stream: %s", err) - else: - if session_id in self._webrtc_sessions: - self._webrtc_sessions[session_id] = webrtc_stream + if not (webrtc_stream := self._webrtc_sessions.get(session_id)): + return None + _LOGGER.debug("Extending WebRTC stream %s", webrtc_stream.media_session_id) + webrtc_stream = await webrtc_stream.extend_stream() + if session_id in self._webrtc_sessions: + self._webrtc_sessions[session_id] = webrtc_stream + return webrtc_stream.expires_at + return None async def async_camera_image( self, width: int | None = None, height: int | None = None @@ -278,7 +295,12 @@ class NestWebRTCEntity(NestCameraBaseEntity): ) self._webrtc_sessions[session_id] = stream send_message(WebRTCAnswer(stream.answer_sdp)) - self._schedule_stream_refresh() + refresh = StreamRefresh( + self.hass, + stream.expires_at, + functools.partial(self._async_refresh_stream, session_id), + ) + self._refresh_unsub[session_id] = refresh.unsub @callback def close_webrtc_session(self, session_id: str) -> None: @@ -287,6 +309,8 @@ class NestWebRTCEntity(NestCameraBaseEntity): _LOGGER.debug( "Closing WebRTC session %s, %s", session_id, stream.media_session_id ) + unsub = self._refresh_unsub.pop(session_id) + unsub() async def stop_stream() -> None: try: diff --git a/tests/components/nest/test_camera.py b/tests/components/nest/test_camera.py index 500dbc0f46f..029879f1413 100644 --- a/tests/components/nest/test_camera.py +++ b/tests/components/nest/test_camera.py @@ -483,6 +483,50 @@ async def test_stream_response_already_expired( assert stream_source == "rtsp://some/url?auth=g.2.streamingToken" +async def test_extending_stream_already_expired( + hass: HomeAssistant, + auth: FakeAuth, + setup_platform: PlatformSetup, + camera_device: None, +) -> None: + """Test a API response when extending the stream returns an expired stream url.""" + now = utcnow() + stream_1_expiration = now + datetime.timedelta(seconds=180) + stream_2_expiration = now + datetime.timedelta(seconds=30) # Will be in the past + stream_3_expiration = now + datetime.timedelta(seconds=600) + auth.responses = [ + make_stream_url_response(stream_1_expiration, token_num=1), + make_stream_url_response(stream_2_expiration, token_num=2), + make_stream_url_response(stream_3_expiration, token_num=3), + ] + await setup_platform() + + assert len(hass.states.async_all()) == 1 + cam = hass.states.get("camera.my_camera") + assert cam is not None + assert cam.state == CameraState.STREAMING + + # The stream is expired, but we return it anyway + stream_source = await camera.async_get_stream_source(hass, "camera.my_camera") + assert stream_source == "rtsp://some/url?auth=g.1.streamingToken" + + # Jump to when the stream will be refreshed + await fire_alarm(hass, now + datetime.timedelta(seconds=160)) + stream_source = await camera.async_get_stream_source(hass, "camera.my_camera") + assert stream_source == "rtsp://some/url?auth=g.2.streamingToken" + + # The stream will have expired in the past, but 1 minute min refresh interval is applied. + # The stream token is not updated. + await fire_alarm(hass, now + datetime.timedelta(seconds=170)) + stream_source = await camera.async_get_stream_source(hass, "camera.my_camera") + assert stream_source == "rtsp://some/url?auth=g.2.streamingToken" + + # Now go past the min update interval and the stream is refreshed + await fire_alarm(hass, now + datetime.timedelta(seconds=225)) + stream_source = await camera.async_get_stream_source(hass, "camera.my_camera") + assert stream_source == "rtsp://some/url?auth=g.3.streamingToken" + + async def test_camera_removed( hass: HomeAssistant, auth: FakeAuth,