Update nest sdm camera to refresh in background (#42865)
Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>pull/43069/head
parent
9f4480a634
commit
94db07ca8c
|
@ -1,16 +1,19 @@
|
||||||
"""Support for Google Nest SDM Cameras."""
|
"""Support for Google Nest SDM Cameras."""
|
||||||
|
|
||||||
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from google_nest_sdm.camera_traits import CameraImageTrait, CameraLiveStreamTrait
|
from google_nest_sdm.camera_traits import CameraImageTrait, CameraLiveStreamTrait
|
||||||
from google_nest_sdm.device import Device
|
from google_nest_sdm.device import Device
|
||||||
from haffmpeg.tools import IMAGE_JPEG
|
from haffmpeg.tools import IMAGE_JPEG
|
||||||
|
import requests
|
||||||
|
|
||||||
from homeassistant.components.camera import SUPPORT_STREAM, Camera
|
from homeassistant.components.camera import SUPPORT_STREAM, Camera
|
||||||
from homeassistant.components.ffmpeg import async_get_image
|
from homeassistant.components.ffmpeg import async_get_image
|
||||||
from homeassistant.config_entries import ConfigEntry
|
from homeassistant.config_entries import ConfigEntry
|
||||||
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||||
|
from homeassistant.helpers.event import async_track_point_in_utc_time
|
||||||
from homeassistant.helpers.typing import HomeAssistantType
|
from homeassistant.helpers.typing import HomeAssistantType
|
||||||
from homeassistant.util.dt import utcnow
|
from homeassistant.util.dt import utcnow
|
||||||
|
|
||||||
|
@ -19,6 +22,9 @@ from .device_info import DeviceInfo
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Used to schedule an alarm to refresh the stream before expiration
|
||||||
|
STREAM_EXPIRATION_BUFFER = datetime.timedelta(seconds=30)
|
||||||
|
|
||||||
|
|
||||||
async def async_setup_sdm_entry(
|
async def async_setup_sdm_entry(
|
||||||
hass: HomeAssistantType, entry: ConfigEntry, async_add_entities
|
hass: HomeAssistantType, entry: ConfigEntry, async_add_entities
|
||||||
|
@ -49,6 +55,7 @@ class NestCamera(Camera):
|
||||||
self._device = device
|
self._device = device
|
||||||
self._device_info = DeviceInfo(device)
|
self._device_info = DeviceInfo(device)
|
||||||
self._stream = None
|
self._stream = None
|
||||||
|
self._stream_refresh_unsub = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def should_poll(self) -> bool:
|
def should_poll(self) -> bool:
|
||||||
|
@ -93,21 +100,50 @@ class NestCamera(Camera):
|
||||||
if CameraLiveStreamTrait.NAME not in self._device.traits:
|
if CameraLiveStreamTrait.NAME not in self._device.traits:
|
||||||
return None
|
return None
|
||||||
trait = self._device.traits[CameraLiveStreamTrait.NAME]
|
trait = self._device.traits[CameraLiveStreamTrait.NAME]
|
||||||
now = utcnow()
|
|
||||||
if not self._stream:
|
if not self._stream:
|
||||||
logging.debug("Fetching stream url")
|
_LOGGER.debug("Fetching stream url")
|
||||||
self._stream = await trait.generate_rtsp_stream()
|
self._stream = await trait.generate_rtsp_stream()
|
||||||
elif self._stream.expires_at < now:
|
self._schedule_stream_refresh()
|
||||||
logging.debug("Stream expired, extending stream")
|
if self._stream.expires_at < utcnow():
|
||||||
new_stream = await self._stream.extend_rtsp_stream()
|
_LOGGER.warning("Stream already expired")
|
||||||
self._stream = new_stream
|
|
||||||
return self._stream.rtsp_stream_url
|
return self._stream.rtsp_stream_url
|
||||||
|
|
||||||
|
def _schedule_stream_refresh(self):
|
||||||
|
"""Schedules an alarm to refresh the stream url before expiration."""
|
||||||
|
_LOGGER.debug("New stream url expires at %s", self._stream.expires_at)
|
||||||
|
refresh_time = self._stream.expires_at - STREAM_EXPIRATION_BUFFER
|
||||||
|
# Schedule an alarm to extend the stream
|
||||||
|
if self._stream_refresh_unsub is not None:
|
||||||
|
self._stream_refresh_unsub()
|
||||||
|
|
||||||
|
self._stream_refresh_unsub = async_track_point_in_utc_time(
|
||||||
|
self.hass,
|
||||||
|
self._handle_stream_refresh,
|
||||||
|
refresh_time,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _handle_stream_refresh(self, now):
|
||||||
|
"""Alarm that fires to check if the stream should be refreshed."""
|
||||||
|
if not self._stream:
|
||||||
|
return
|
||||||
|
_LOGGER.debug("Extending stream url")
|
||||||
|
self._stream_refresh_unsub = None
|
||||||
|
try:
|
||||||
|
self._stream = await self._stream.extend_rtsp_stream()
|
||||||
|
except requests.HTTPError as err:
|
||||||
|
_LOGGER.debug("Failed to extend stream: %s", err)
|
||||||
|
# Next attempt to catch a url will get a new one
|
||||||
|
self._stream = None
|
||||||
|
return
|
||||||
|
self._schedule_stream_refresh()
|
||||||
|
|
||||||
async def async_will_remove_from_hass(self):
|
async def async_will_remove_from_hass(self):
|
||||||
"""Invalidates the RTSP token when unloaded."""
|
"""Invalidates the RTSP token when unloaded."""
|
||||||
if self._stream:
|
if self._stream:
|
||||||
logging.debug("Invalidating stream")
|
_LOGGER.debug("Invalidating stream")
|
||||||
await self._stream.stop_rtsp_stream()
|
await self._stream.stop_rtsp_stream()
|
||||||
|
if self._stream_refresh_unsub:
|
||||||
|
self._stream_refresh_unsub()
|
||||||
|
|
||||||
async def async_added_to_hass(self):
|
async def async_added_to_hass(self):
|
||||||
"""Run when entity is added to register update signal handler."""
|
"""Run when entity is added to register update signal handler."""
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
"documentation": "https://www.home-assistant.io/integrations/nest",
|
"documentation": "https://www.home-assistant.io/integrations/nest",
|
||||||
"requirements": [
|
"requirements": [
|
||||||
"python-nest==4.1.0",
|
"python-nest==4.1.0",
|
||||||
"google-nest-sdm==0.1.12"
|
"google-nest-sdm==0.1.13"
|
||||||
],
|
],
|
||||||
"codeowners": [
|
"codeowners": [
|
||||||
"@awarecan",
|
"@awarecan",
|
||||||
|
|
|
@ -687,7 +687,7 @@ google-cloud-pubsub==2.1.0
|
||||||
google-cloud-texttospeech==0.4.0
|
google-cloud-texttospeech==0.4.0
|
||||||
|
|
||||||
# homeassistant.components.nest
|
# homeassistant.components.nest
|
||||||
google-nest-sdm==0.1.12
|
google-nest-sdm==0.1.13
|
||||||
|
|
||||||
# homeassistant.components.google_travel_time
|
# homeassistant.components.google_travel_time
|
||||||
googlemaps==2.5.1
|
googlemaps==2.5.1
|
||||||
|
|
|
@ -355,7 +355,7 @@ google-api-python-client==1.6.4
|
||||||
google-cloud-pubsub==2.1.0
|
google-cloud-pubsub==2.1.0
|
||||||
|
|
||||||
# homeassistant.components.nest
|
# homeassistant.components.nest
|
||||||
google-nest-sdm==0.1.12
|
google-nest-sdm==0.1.13
|
||||||
|
|
||||||
# homeassistant.components.gree
|
# homeassistant.components.gree
|
||||||
greeclimate==0.9.5
|
greeclimate==0.9.5
|
||||||
|
|
|
@ -10,6 +10,7 @@ from typing import List
|
||||||
|
|
||||||
from google_nest_sdm.auth import AbstractAuth
|
from google_nest_sdm.auth import AbstractAuth
|
||||||
from google_nest_sdm.device import Device
|
from google_nest_sdm.device import Device
|
||||||
|
from requests import HTTPError
|
||||||
|
|
||||||
from homeassistant.components import camera
|
from homeassistant.components import camera
|
||||||
from homeassistant.components.camera import STATE_IDLE
|
from homeassistant.components.camera import STATE_IDLE
|
||||||
|
@ -18,6 +19,7 @@ from homeassistant.util.dt import utcnow
|
||||||
from .common import async_setup_sdm_platform
|
from .common import async_setup_sdm_platform
|
||||||
|
|
||||||
from tests.async_mock import patch
|
from tests.async_mock import patch
|
||||||
|
from tests.common import async_fire_time_changed
|
||||||
|
|
||||||
PLATFORM = "camera"
|
PLATFORM = "camera"
|
||||||
CAMERA_DEVICE_TYPE = "sdm.devices.types.CAMERA"
|
CAMERA_DEVICE_TYPE = "sdm.devices.types.CAMERA"
|
||||||
|
@ -42,16 +44,20 @@ DOMAIN = "nest"
|
||||||
class FakeResponse:
|
class FakeResponse:
|
||||||
"""A fake web response used for returning results of commands."""
|
"""A fake web response used for returning results of commands."""
|
||||||
|
|
||||||
def __init__(self, json):
|
def __init__(self, json=None, error=None):
|
||||||
"""Initialize the FakeResponse."""
|
"""Initialize the FakeResponse."""
|
||||||
self._json = json
|
self._json = json
|
||||||
|
self._error = error
|
||||||
|
|
||||||
def raise_for_status(self):
|
def raise_for_status(self):
|
||||||
"""Mimics a successful response status."""
|
"""Mimics a successful response status."""
|
||||||
|
if self._error:
|
||||||
|
raise self._error
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def json(self):
|
async def json(self):
|
||||||
"""Return a dict with the response."""
|
"""Return a dict with the response."""
|
||||||
|
assert self._json
|
||||||
return self._json
|
return self._json
|
||||||
|
|
||||||
|
|
||||||
|
@ -91,6 +97,13 @@ async def async_setup_camera(hass, traits={}, auth=None):
|
||||||
return await async_setup_sdm_platform(hass, PLATFORM, devices)
|
return await async_setup_sdm_platform(hass, PLATFORM, devices)
|
||||||
|
|
||||||
|
|
||||||
|
async def fire_alarm(hass, point_in_time):
|
||||||
|
"""Fire an alarm and wait for callbacks to run."""
|
||||||
|
with patch("homeassistant.util.dt.utcnow", return_value=point_in_time):
|
||||||
|
async_fire_time_changed(hass, point_in_time)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
|
||||||
async def test_no_devices(hass):
|
async def test_no_devices(hass):
|
||||||
"""Test configuration that returns no devices."""
|
"""Test configuration that returns no devices."""
|
||||||
await async_setup_camera(hass)
|
await async_setup_camera(hass)
|
||||||
|
@ -169,30 +182,40 @@ async def test_camera_stream(hass, aiohttp_client):
|
||||||
async def test_refresh_expired_stream_token(hass, aiohttp_client):
|
async def test_refresh_expired_stream_token(hass, aiohttp_client):
|
||||||
"""Test a camera stream expiration and refresh."""
|
"""Test a camera stream expiration and refresh."""
|
||||||
now = utcnow()
|
now = utcnow()
|
||||||
past = now - datetime.timedelta(seconds=100)
|
stream_1_expiration = now + datetime.timedelta(seconds=90)
|
||||||
future = now + datetime.timedelta(seconds=100)
|
stream_2_expiration = now + datetime.timedelta(seconds=180)
|
||||||
|
stream_3_expiration = now + datetime.timedelta(seconds=360)
|
||||||
responses = [
|
responses = [
|
||||||
|
# Stream URL #1
|
||||||
FakeResponse(
|
FakeResponse(
|
||||||
{
|
{
|
||||||
"results": {
|
"results": {
|
||||||
"streamUrls": {
|
"streamUrls": {
|
||||||
"rtspUrl": "rtsp://some/url?auth=g.0.streamingToken"
|
"rtspUrl": "rtsp://some/url?auth=g.1.streamingToken"
|
||||||
},
|
},
|
||||||
"streamExtensionToken": "g.1.extensionToken",
|
"streamExtensionToken": "g.1.extensionToken",
|
||||||
"streamToken": "g.0.streamingToken",
|
"streamToken": "g.1.streamingToken",
|
||||||
"expiresAt": past.isoformat(timespec="seconds"),
|
"expiresAt": stream_1_expiration.isoformat(timespec="seconds"),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
# Stream URL #2
|
||||||
FakeResponse(
|
FakeResponse(
|
||||||
{
|
{
|
||||||
"results": {
|
"results": {
|
||||||
"streamUrls": {
|
"streamExtensionToken": "g.2.extensionToken",
|
||||||
"rtspUrl": "rtsp://some/url?auth=g.2.streamingToken"
|
|
||||||
},
|
|
||||||
"streamExtensionToken": "g.3.extensionToken",
|
|
||||||
"streamToken": "g.2.streamingToken",
|
"streamToken": "g.2.streamingToken",
|
||||||
"expiresAt": future.isoformat(timespec="seconds"),
|
"expiresAt": stream_2_expiration.isoformat(timespec="seconds"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
),
|
||||||
|
# Stream URL #3
|
||||||
|
FakeResponse(
|
||||||
|
{
|
||||||
|
"results": {
|
||||||
|
"streamExtensionToken": "g.3.extensionToken",
|
||||||
|
"streamToken": "g.3.streamingToken",
|
||||||
|
"expiresAt": stream_3_expiration.isoformat(timespec="seconds"),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
@ -209,16 +232,32 @@ async def test_refresh_expired_stream_token(hass, aiohttp_client):
|
||||||
assert cam.state == STATE_IDLE
|
assert cam.state == STATE_IDLE
|
||||||
|
|
||||||
stream_source = await camera.async_get_stream_source(hass, "camera.my_camera")
|
stream_source = await camera.async_get_stream_source(hass, "camera.my_camera")
|
||||||
assert stream_source == "rtsp://some/url?auth=g.0.streamingToken"
|
assert stream_source == "rtsp://some/url?auth=g.1.streamingToken"
|
||||||
|
|
||||||
# On second fetch, notice the stream is expired and fetch again
|
# Fire alarm before stream_1_expiration. The stream url is not refreshed
|
||||||
|
next_update = now + datetime.timedelta(seconds=25)
|
||||||
|
await fire_alarm(hass, next_update)
|
||||||
|
stream_source = await camera.async_get_stream_source(hass, "camera.my_camera")
|
||||||
|
assert stream_source == "rtsp://some/url?auth=g.1.streamingToken"
|
||||||
|
|
||||||
|
# Alarm is near stream_1_expiration which causes the stream extension
|
||||||
|
next_update = now + datetime.timedelta(seconds=65)
|
||||||
|
await fire_alarm(hass, next_update)
|
||||||
stream_source = await camera.async_get_stream_source(hass, "camera.my_camera")
|
stream_source = await camera.async_get_stream_source(hass, "camera.my_camera")
|
||||||
assert stream_source == "rtsp://some/url?auth=g.2.streamingToken"
|
assert stream_source == "rtsp://some/url?auth=g.2.streamingToken"
|
||||||
|
|
||||||
# Stream is not expired; Same url returned
|
# Next alarm is well before stream_2_expiration, no change
|
||||||
|
next_update = now + datetime.timedelta(seconds=100)
|
||||||
|
await fire_alarm(hass, next_update)
|
||||||
stream_source = await camera.async_get_stream_source(hass, "camera.my_camera")
|
stream_source = await camera.async_get_stream_source(hass, "camera.my_camera")
|
||||||
assert stream_source == "rtsp://some/url?auth=g.2.streamingToken"
|
assert stream_source == "rtsp://some/url?auth=g.2.streamingToken"
|
||||||
|
|
||||||
|
# Alarm is near stream_2_expiration, causing it to be extended
|
||||||
|
next_update = now + datetime.timedelta(seconds=155)
|
||||||
|
await fire_alarm(hass, next_update)
|
||||||
|
stream_source = await camera.async_get_stream_source(hass, "camera.my_camera")
|
||||||
|
assert stream_source == "rtsp://some/url?auth=g.3.streamingToken"
|
||||||
|
|
||||||
|
|
||||||
async def test_camera_removed(hass, aiohttp_client):
|
async def test_camera_removed(hass, aiohttp_client):
|
||||||
"""Test case where entities are removed and stream tokens expired."""
|
"""Test case where entities are removed and stream tokens expired."""
|
||||||
|
@ -256,3 +295,61 @@ async def test_camera_removed(hass, aiohttp_client):
|
||||||
for config_entry in hass.config_entries.async_entries(DOMAIN):
|
for config_entry in hass.config_entries.async_entries(DOMAIN):
|
||||||
await hass.config_entries.async_remove(config_entry.entry_id)
|
await hass.config_entries.async_remove(config_entry.entry_id)
|
||||||
assert len(hass.states.async_all()) == 0
|
assert len(hass.states.async_all()) == 0
|
||||||
|
|
||||||
|
|
||||||
|
async def test_refresh_expired_stream_failure(hass, aiohttp_client):
|
||||||
|
"""Tests a failure when refreshing the stream."""
|
||||||
|
now = utcnow()
|
||||||
|
stream_1_expiration = now + datetime.timedelta(seconds=90)
|
||||||
|
stream_2_expiration = now + datetime.timedelta(seconds=180)
|
||||||
|
responses = [
|
||||||
|
FakeResponse(
|
||||||
|
{
|
||||||
|
"results": {
|
||||||
|
"streamUrls": {
|
||||||
|
"rtspUrl": "rtsp://some/url?auth=g.1.streamingToken"
|
||||||
|
},
|
||||||
|
"streamExtensionToken": "g.1.extensionToken",
|
||||||
|
"streamToken": "g.1.streamingToken",
|
||||||
|
"expiresAt": stream_1_expiration.isoformat(timespec="seconds"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
),
|
||||||
|
# Extending the stream fails
|
||||||
|
FakeResponse(error=HTTPError(response="Some Error")),
|
||||||
|
# Next attempt to get a stream fetches a new url
|
||||||
|
FakeResponse(
|
||||||
|
{
|
||||||
|
"results": {
|
||||||
|
"streamUrls": {
|
||||||
|
"rtspUrl": "rtsp://some/url?auth=g.2.streamingToken"
|
||||||
|
},
|
||||||
|
"streamExtensionToken": "g.2.extensionToken",
|
||||||
|
"streamToken": "g.2.streamingToken",
|
||||||
|
"expiresAt": stream_2_expiration.isoformat(timespec="seconds"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
),
|
||||||
|
]
|
||||||
|
await async_setup_camera(
|
||||||
|
hass,
|
||||||
|
DEVICE_TRAITS,
|
||||||
|
auth=FakeAuth(responses),
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(hass.states.async_all()) == 1
|
||||||
|
cam = hass.states.get("camera.my_camera")
|
||||||
|
assert cam is not None
|
||||||
|
assert cam.state == STATE_IDLE
|
||||||
|
|
||||||
|
stream_source = await camera.async_get_stream_source(hass, "camera.my_camera")
|
||||||
|
assert stream_source == "rtsp://some/url?auth=g.1.streamingToken"
|
||||||
|
|
||||||
|
# Fire alarm when stream is nearing expiration, causing it to be extended.
|
||||||
|
# The stream expires.
|
||||||
|
next_update = now + datetime.timedelta(seconds=65)
|
||||||
|
await fire_alarm(hass, next_update)
|
||||||
|
|
||||||
|
# The stream is entirely refreshed
|
||||||
|
stream_source = await camera.async_get_stream_source(hass, "camera.my_camera")
|
||||||
|
assert stream_source == "rtsp://some/url?auth=g.2.streamingToken"
|
||||||
|
|
Loading…
Reference in New Issue