"""Support for Amcrest IP cameras.""" import asyncio from datetime import timedelta import logging from urllib3.exceptions import HTTPError from amcrest import AmcrestError import voluptuous as vol from homeassistant.components.camera import ( Camera, CAMERA_SERVICE_SCHEMA, SUPPORT_ON_OFF, SUPPORT_STREAM) from homeassistant.components.ffmpeg import DATA_FFMPEG from homeassistant.const import ( CONF_NAME, STATE_ON, STATE_OFF) from homeassistant.helpers.aiohttp_client import ( async_aiohttp_proxy_stream, async_aiohttp_proxy_web, async_get_clientsession) from homeassistant.helpers.dispatcher import async_dispatcher_connect from .const import ( CAMERA_WEB_SESSION_TIMEOUT, CAMERAS, DATA_AMCREST, DEVICES, SERVICE_UPDATE) from .helpers import log_update_error, service_signal _LOGGER = logging.getLogger(__name__) SCAN_INTERVAL = timedelta(seconds=15) STREAM_SOURCE_LIST = [ 'snapshot', 'mjpeg', 'rtsp', ] _SRV_EN_REC = 'enable_recording' _SRV_DS_REC = 'disable_recording' _SRV_EN_AUD = 'enable_audio' _SRV_DS_AUD = 'disable_audio' _SRV_EN_MOT_REC = 'enable_motion_recording' _SRV_DS_MOT_REC = 'disable_motion_recording' _SRV_GOTO = 'goto_preset' _SRV_CBW = 'set_color_bw' _SRV_TOUR_ON = 'start_tour' _SRV_TOUR_OFF = 'stop_tour' _ATTR_PRESET = 'preset' _ATTR_COLOR_BW = 'color_bw' _CBW_COLOR = 'color' _CBW_AUTO = 'auto' _CBW_BW = 'bw' _CBW = [_CBW_COLOR, _CBW_AUTO, _CBW_BW] _SRV_GOTO_SCHEMA = CAMERA_SERVICE_SCHEMA.extend({ vol.Required(_ATTR_PRESET): vol.All(vol.Coerce(int), vol.Range(min=1)), }) _SRV_CBW_SCHEMA = CAMERA_SERVICE_SCHEMA.extend({ vol.Required(_ATTR_COLOR_BW): vol.In(_CBW), }) CAMERA_SERVICES = { _SRV_EN_REC: (CAMERA_SERVICE_SCHEMA, 'async_enable_recording', ()), _SRV_DS_REC: (CAMERA_SERVICE_SCHEMA, 'async_disable_recording', ()), _SRV_EN_AUD: (CAMERA_SERVICE_SCHEMA, 'async_enable_audio', ()), _SRV_DS_AUD: (CAMERA_SERVICE_SCHEMA, 'async_disable_audio', ()), _SRV_EN_MOT_REC: ( CAMERA_SERVICE_SCHEMA, 'async_enable_motion_recording', ()), _SRV_DS_MOT_REC: ( CAMERA_SERVICE_SCHEMA, 'async_disable_motion_recording', ()), _SRV_GOTO: (_SRV_GOTO_SCHEMA, 'async_goto_preset', (_ATTR_PRESET,)), _SRV_CBW: (_SRV_CBW_SCHEMA, 'async_set_color_bw', (_ATTR_COLOR_BW,)), _SRV_TOUR_ON: (CAMERA_SERVICE_SCHEMA, 'async_start_tour', ()), _SRV_TOUR_OFF: (CAMERA_SERVICE_SCHEMA, 'async_stop_tour', ()), } _BOOL_TO_STATE = {True: STATE_ON, False: STATE_OFF} async def async_setup_platform(hass, config, async_add_entities, discovery_info=None): """Set up an Amcrest IP Camera.""" if discovery_info is None: return name = discovery_info[CONF_NAME] device = hass.data[DATA_AMCREST][DEVICES][name] async_add_entities([ AmcrestCam(name, device, hass.data[DATA_FFMPEG])], True) class AmcrestCam(Camera): """An implementation of an Amcrest IP camera.""" def __init__(self, name, device, ffmpeg): """Initialize an Amcrest camera.""" super().__init__() self._name = name self._api = device.api self._ffmpeg = ffmpeg self._ffmpeg_arguments = device.ffmpeg_arguments self._stream_source = device.stream_source self._resolution = device.resolution self._token = self._auth = device.authentication self._control_light = device.control_light self._is_recording = False self._motion_detection_enabled = None self._brand = None self._model = None self._audio_enabled = None self._motion_recording_enabled = None self._color_bw = None self._rtsp_url = None self._snapshot_lock = asyncio.Lock() self._unsub_dispatcher = [] self._update_succeeded = False async def async_camera_image(self): """Return a still image response from the camera.""" available = self.available if not available or not self.is_on: _LOGGER.warning( 'Attempt to take snaphot when %s camera is %s', self.name, 'offline' if not available else 'off') return None async with self._snapshot_lock: try: # Send the request to snap a picture and return raw jpg data response = await self.hass.async_add_executor_job( self._api.snapshot) return response.data except (AmcrestError, HTTPError) as error: log_update_error( _LOGGER, 'get image from', self.name, 'camera', error) return None async def handle_async_mjpeg_stream(self, request): """Return an MJPEG stream.""" # The snapshot implementation is handled by the parent class if self._stream_source == 'snapshot': return await super().handle_async_mjpeg_stream(request) if not self.available: _LOGGER.warning( 'Attempt to stream %s when %s camera is offline', self._stream_source, self.name) return None if self._stream_source == 'mjpeg': # stream an MJPEG image stream directly from the camera websession = async_get_clientsession(self.hass) streaming_url = self._api.mjpeg_url(typeno=self._resolution) stream_coro = websession.get( streaming_url, auth=self._token, timeout=CAMERA_WEB_SESSION_TIMEOUT) return await async_aiohttp_proxy_web( self.hass, request, stream_coro) # streaming via ffmpeg from haffmpeg.camera import CameraMjpeg streaming_url = self._rtsp_url stream = CameraMjpeg(self._ffmpeg.binary, loop=self.hass.loop) await stream.open_camera( streaming_url, extra_cmd=self._ffmpeg_arguments) try: stream_reader = await stream.get_reader() return await async_aiohttp_proxy_stream( self.hass, request, stream_reader, self._ffmpeg.ffmpeg_stream_content_type) finally: await stream.close() # Entity property overrides @property def should_poll(self) -> bool: """Return True if entity has to be polled for state. False if entity pushes its state to HA. """ return True @property def name(self): """Return the name of this camera.""" return self._name @property def device_state_attributes(self): """Return the Amcrest-specific camera state attributes.""" attr = {} if self._audio_enabled is not None: attr['audio'] = _BOOL_TO_STATE.get(self._audio_enabled) if self._motion_recording_enabled is not None: attr['motion_recording'] = _BOOL_TO_STATE.get( self._motion_recording_enabled) if self._color_bw is not None: attr[_ATTR_COLOR_BW] = self._color_bw return attr @property def available(self): """Return True if entity is available.""" return self._api.available @property def supported_features(self): """Return supported features.""" return SUPPORT_ON_OFF | SUPPORT_STREAM # Camera property overrides @property def is_recording(self): """Return true if the device is recording.""" return self._is_recording @property def brand(self): """Return the camera brand.""" return self._brand @property def motion_detection_enabled(self): """Return the camera motion detection status.""" return self._motion_detection_enabled @property def model(self): """Return the camera model.""" return self._model async def stream_source(self): """Return the source of the stream.""" return self._rtsp_url @property def is_on(self): """Return true if on.""" return self.is_streaming # Other Entity method overrides async def async_on_demand_update(self): """Update state.""" self.async_schedule_update_ha_state(True) async def async_added_to_hass(self): """Subscribe to signals and add camera to list.""" for service, params in CAMERA_SERVICES.items(): self._unsub_dispatcher.append(async_dispatcher_connect( self.hass, service_signal(service, self.entity_id), getattr(self, params[1]))) self._unsub_dispatcher.append(async_dispatcher_connect( self.hass, service_signal(SERVICE_UPDATE, self._name), self.async_on_demand_update)) self.hass.data[DATA_AMCREST][CAMERAS].append(self.entity_id) async def async_will_remove_from_hass(self): """Remove camera from list and disconnect from signals.""" self.hass.data[DATA_AMCREST][CAMERAS].remove(self.entity_id) for unsub_dispatcher in self._unsub_dispatcher: unsub_dispatcher() def update(self): """Update entity status.""" if not self.available or self._update_succeeded: if not self.available: self._update_succeeded = False return _LOGGER.debug('Updating %s camera', self.name) try: if self._brand is None: resp = self._api.vendor_information.strip() if resp.startswith('vendor='): self._brand = resp.split('=')[-1] else: self._brand = 'unknown' if self._model is None: resp = self._api.device_type.strip() if resp.startswith('type='): self._model = resp.split('=')[-1] else: self._model = 'unknown' self.is_streaming = self._api.video_enabled self._is_recording = self._api.record_mode == 'Manual' self._motion_detection_enabled = ( self._api.is_motion_detector_on()) self._audio_enabled = self._api.audio_enabled self._motion_recording_enabled = ( self._api.is_record_on_motion_detection()) self._color_bw = _CBW[self._api.day_night_color] self._rtsp_url = self._api.rtsp_url(typeno=self._resolution) except AmcrestError as error: log_update_error( _LOGGER, 'get', self.name, 'camera attributes', error) self._update_succeeded = False else: self._update_succeeded = True # Other Camera method overrides def turn_off(self): """Turn off camera.""" self._enable_video_stream(False) def turn_on(self): """Turn on camera.""" self._enable_video_stream(True) def enable_motion_detection(self): """Enable motion detection in the camera.""" self._enable_motion_detection(True) def disable_motion_detection(self): """Disable motion detection in camera.""" self._enable_motion_detection(False) # Additional Amcrest Camera service methods async def async_enable_recording(self): """Call the job and enable recording.""" await self.hass.async_add_executor_job(self._enable_recording, True) async def async_disable_recording(self): """Call the job and disable recording.""" await self.hass.async_add_executor_job(self._enable_recording, False) async def async_enable_audio(self): """Call the job and enable audio.""" await self.hass.async_add_executor_job(self._enable_audio, True) async def async_disable_audio(self): """Call the job and disable audio.""" await self.hass.async_add_executor_job(self._enable_audio, False) async def async_enable_motion_recording(self): """Call the job and enable motion recording.""" await self.hass.async_add_executor_job(self._enable_motion_recording, True) async def async_disable_motion_recording(self): """Call the job and disable motion recording.""" await self.hass.async_add_executor_job(self._enable_motion_recording, False) async def async_goto_preset(self, preset): """Call the job and move camera to preset position.""" await self.hass.async_add_executor_job(self._goto_preset, preset) async def async_set_color_bw(self, color_bw): """Call the job and set camera color mode.""" await self.hass.async_add_executor_job(self._set_color_bw, color_bw) async def async_start_tour(self): """Call the job and start camera tour.""" await self.hass.async_add_executor_job(self._start_tour, True) async def async_stop_tour(self): """Call the job and stop camera tour.""" await self.hass.async_add_executor_job(self._start_tour, False) # Methods to send commands to Amcrest camera and handle errors def _enable_video_stream(self, enable): """Enable or disable camera video stream.""" # Given the way the camera's state is determined by # is_streaming and is_recording, we can't leave # recording on if video stream is being turned off. if self.is_recording and not enable: self._enable_recording(False) try: self._api.video_enabled = enable except AmcrestError as error: log_update_error( _LOGGER, 'enable' if enable else 'disable', self.name, 'camera video stream', error) else: self.is_streaming = enable self.schedule_update_ha_state() if self._control_light: self._enable_light(self._audio_enabled or self.is_streaming) def _enable_recording(self, enable): """Turn recording on or off.""" # Given the way the camera's state is determined by # is_streaming and is_recording, we can't leave # video stream off if recording is being turned on. if not self.is_streaming and enable: self._enable_video_stream(True) rec_mode = {'Automatic': 0, 'Manual': 1} try: self._api.record_mode = rec_mode[ 'Manual' if enable else 'Automatic'] except AmcrestError as error: log_update_error( _LOGGER, 'enable' if enable else 'disable', self.name, 'camera recording', error) else: self._is_recording = enable self.schedule_update_ha_state() def _enable_motion_detection(self, enable): """Enable or disable motion detection.""" try: self._api.motion_detection = str(enable).lower() except AmcrestError as error: log_update_error( _LOGGER, 'enable' if enable else 'disable', self.name, 'camera motion detection', error) else: self._motion_detection_enabled = enable self.schedule_update_ha_state() def _enable_audio(self, enable): """Enable or disable audio stream.""" try: self._api.audio_enabled = enable except AmcrestError as error: log_update_error( _LOGGER, 'enable' if enable else 'disable', self.name, 'camera audio stream', error) else: self._audio_enabled = enable self.schedule_update_ha_state() if self._control_light: self._enable_light(self._audio_enabled or self.is_streaming) def _enable_light(self, enable): """Enable or disable indicator light.""" try: self._api.command( 'configManager.cgi?action=setConfig&LightGlobal[0].Enable={}' .format(str(enable).lower())) except AmcrestError as error: log_update_error( _LOGGER, 'enable' if enable else 'disable', self.name, 'indicator light', error) def _enable_motion_recording(self, enable): """Enable or disable motion recording.""" try: self._api.motion_recording = str(enable).lower() except AmcrestError as error: log_update_error( _LOGGER, 'enable' if enable else 'disable', self.name, 'camera motion recording', error) else: self._motion_recording_enabled = enable self.schedule_update_ha_state() def _goto_preset(self, preset): """Move camera position and zoom to preset.""" try: self._api.go_to_preset( action='start', preset_point_number=preset) except AmcrestError as error: log_update_error( _LOGGER, 'move', self.name, 'camera to preset {}'.format(preset), error) def _set_color_bw(self, cbw): """Set camera color mode.""" try: self._api.day_night_color = _CBW.index(cbw) except AmcrestError as error: log_update_error( _LOGGER, 'set', self.name, 'camera color mode to {}'.format(cbw), error) else: self._color_bw = cbw self.schedule_update_ha_state() def _start_tour(self, start): """Start camera tour.""" try: self._api.tour(start=start) except AmcrestError as error: log_update_error( _LOGGER, 'start' if start else 'stop', self.name, 'camera tour', error)