core/homeassistant/components/nest/camera_sdm.py

129 lines
4.2 KiB
Python

"""Support for Google Nest SDM Cameras."""
import logging
from typing import Optional
from google_nest_sdm.camera_traits import CameraImageTrait, CameraLiveStreamTrait
from google_nest_sdm.device import Device
from haffmpeg.tools import IMAGE_JPEG
from homeassistant.components.camera import SUPPORT_STREAM, Camera
from homeassistant.components.ffmpeg import async_get_image
from homeassistant.config_entries import ConfigEntry
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.util.dt import utcnow
from .const import DOMAIN, SIGNAL_NEST_UPDATE
from .device_info import DeviceInfo
_LOGGER = logging.getLogger(__name__)
async def async_setup_sdm_entry(
hass: HomeAssistantType, entry: ConfigEntry, async_add_entities
) -> None:
"""Set up the cameras."""
subscriber = hass.data[DOMAIN][entry.entry_id]
device_manager = await subscriber.async_get_device_manager()
# Fetch initial data so we have data when entities subscribe.
entities = []
for device in device_manager.devices.values():
if (
CameraImageTrait.NAME in device.traits
or CameraLiveStreamTrait.NAME in device.traits
):
entities.append(NestCamera(device))
async_add_entities(entities)
class NestCamera(Camera):
"""Devices that support cameras."""
def __init__(self, device: Device):
"""Initialize the camera."""
super().__init__()
self._device = device
self._device_info = DeviceInfo(device)
self._stream = None
@property
def should_poll(self) -> bool:
"""Disable polling since entities have state pushed via pubsub."""
return False
@property
def unique_id(self) -> Optional[str]:
"""Return a unique ID."""
# The API "name" field is a unique device identifier.
return f"{self._device.name}-camera"
@property
def name(self):
"""Return the name of the camera."""
return self._device_info.device_name
@property
def device_info(self):
"""Return device specific attributes."""
return self._device_info.device_info
@property
def brand(self):
"""Return the camera brand."""
return self._device_info.device_brand
@property
def model(self):
"""Return the camera model."""
return self._device_info.device_model
@property
def supported_features(self):
"""Flag supported features."""
if CameraLiveStreamTrait.NAME in self._device.traits:
return SUPPORT_STREAM
return 0
async def stream_source(self):
"""Return the source of the stream."""
if CameraLiveStreamTrait.NAME not in self._device.traits:
return None
trait = self._device.traits[CameraLiveStreamTrait.NAME]
now = utcnow()
if not self._stream:
logging.debug("Fetching stream url")
self._stream = await trait.generate_rtsp_stream()
elif self._stream.expires_at < now:
logging.debug("Stream expired, extending stream")
new_stream = await self._stream.extend_rtsp_stream()
self._stream = new_stream
return self._stream.rtsp_stream_url
async def async_will_remove_from_hass(self):
"""Invalidates the RTSP token when unloaded."""
if self._stream:
logging.debug("Invalidating stream")
await self._stream.stop_rtsp_stream()
async def async_added_to_hass(self):
"""Run when entity is added to register update signal handler."""
# Event messages trigger the SIGNAL_NEST_UPDATE, which is intercepted
# here to re-fresh the signals from _device. Unregister this callback
# when the entity is removed.
self.async_on_remove(
async_dispatcher_connect(
self.hass, SIGNAL_NEST_UPDATE, self.async_write_ha_state
)
)
async def async_camera_image(self):
"""Return bytes of camera image."""
stream_url = await self.stream_source()
if not stream_url:
return None
return await async_get_image(self.hass, stream_url, output_format=IMAGE_JPEG)