"""This component provides basic support for Ezviz IP cameras.""" import asyncio import logging from haffmpeg.tools import IMAGE_JPEG, ImageFrame from pyezviz.camera import EzvizCamera from pyezviz.client import EzvizClient, PyEzvizError import voluptuous as vol from homeassistant.components.camera import PLATFORM_SCHEMA, SUPPORT_STREAM, Camera from homeassistant.const import CONF_PASSWORD, CONF_USERNAME from homeassistant.helpers import config_validation as cv _LOGGER = logging.getLogger(__name__) CONF_CAMERAS = "cameras" DEFAULT_CAMERA_USERNAME = "admin" DEFAULT_RTSP_PORT = "554" DATA_FFMPEG = "ffmpeg" EZVIZ_DATA = "ezviz" ENTITIES = "entities" CAMERA_SCHEMA = vol.Schema( {vol.Required(CONF_USERNAME): cv.string, vol.Required(CONF_PASSWORD): cv.string} ) PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( { vol.Required(CONF_USERNAME): cv.string, vol.Required(CONF_PASSWORD): cv.string, vol.Optional(CONF_CAMERAS, default={}): {cv.string: CAMERA_SCHEMA}, } ) def setup_platform(hass, config, add_entities, discovery_info=None): """Set up the Ezviz IP Cameras.""" conf_cameras = config[CONF_CAMERAS] account = config[CONF_USERNAME] password = config[CONF_PASSWORD] try: ezviz_client = EzvizClient(account, password) ezviz_client.login() cameras = ezviz_client.load_cameras() except PyEzvizError as exp: _LOGGER.error(exp) return # now, let's build the HASS devices camera_entities = [] # Add the cameras as devices in HASS for camera in cameras: camera_username = DEFAULT_CAMERA_USERNAME camera_password = "" camera_rtsp_stream = "" camera_serial = camera["serial"] # There seem to be a bug related to localRtspPort in Ezviz API... local_rtsp_port = DEFAULT_RTSP_PORT if camera["local_rtsp_port"] and camera["local_rtsp_port"] != 0: local_rtsp_port = camera["local_rtsp_port"] if camera_serial in conf_cameras: camera_username = conf_cameras[camera_serial][CONF_USERNAME] camera_password = conf_cameras[camera_serial][CONF_PASSWORD] camera_rtsp_stream = f"rtsp://{camera_username}:{camera_password}@{camera['local_ip']}:{local_rtsp_port}" _LOGGER.debug( "Camera %s source stream: %s", camera["serial"], camera_rtsp_stream ) else: _LOGGER.info( "Found camera with serial %s without configuration. Add it to configuration.yaml to see the camera stream", camera_serial, ) camera["username"] = camera_username camera["password"] = camera_password camera["rtsp_stream"] = camera_rtsp_stream camera["ezviz_camera"] = EzvizCamera(ezviz_client, camera_serial) camera_entities.append(HassEzvizCamera(**camera)) add_entities(camera_entities) class HassEzvizCamera(Camera): """An implementation of a Foscam IP camera.""" def __init__(self, **data): """Initialize an Ezviz camera.""" super().__init__() self._username = data["username"] self._password = data["password"] self._rtsp_stream = data["rtsp_stream"] self._ezviz_camera = data["ezviz_camera"] self._serial = data["serial"] self._name = data["name"] self._status = data["status"] self._privacy = data["privacy"] self._audio = data["audio"] self._ir_led = data["ir_led"] self._state_led = data["state_led"] self._follow_move = data["follow_move"] self._alarm_notify = data["alarm_notify"] self._alarm_sound_mod = data["alarm_sound_mod"] self._encrypted = data["encrypted"] self._local_ip = data["local_ip"] self._detection_sensibility = data["detection_sensibility"] self._device_sub_category = data["device_sub_category"] self._local_rtsp_port = data["local_rtsp_port"] self._ffmpeg = None def update(self): """Update the camera states.""" data = self._ezviz_camera.status() self._name = data["name"] self._status = data["status"] self._privacy = data["privacy"] self._audio = data["audio"] self._ir_led = data["ir_led"] self._state_led = data["state_led"] self._follow_move = data["follow_move"] self._alarm_notify = data["alarm_notify"] self._alarm_sound_mod = data["alarm_sound_mod"] self._encrypted = data["encrypted"] self._local_ip = data["local_ip"] self._detection_sensibility = data["detection_sensibility"] self._device_sub_category = data["device_sub_category"] self._local_rtsp_port = data["local_rtsp_port"] async def async_added_to_hass(self): """Subscribe to ffmpeg and add camera to list.""" self._ffmpeg = self.hass.data[DATA_FFMPEG] @property def should_poll(self) -> bool: """Return True if entity has to be polled for state. False if entity pushes its state to HA. """ return True @property def device_state_attributes(self): """Return the Ezviz-specific camera state attributes.""" return { # if privacy == true, the device closed the lid or did a 180° tilt "privacy": self._privacy, # is the camera listening ? "audio": self._audio, # infrared led on ? "ir_led": self._ir_led, # state led on ? "state_led": self._state_led, # if true, the camera will move automatically to follow movements "follow_move": self._follow_move, # if true, if some movement is detected, the app is notified "alarm_notify": self._alarm_notify, # if true, if some movement is detected, the camera makes some sound "alarm_sound_mod": self._alarm_sound_mod, # are the camera's stored videos/images encrypted? "encrypted": self._encrypted, # camera's local ip on local network "local_ip": self._local_ip, # from 1 to 9, the higher is the sensibility, the more it will detect small movements "detection_sensibility": self._detection_sensibility, } @property def available(self): """Return True if entity is available.""" return self._status @property def brand(self): """Return the camera brand.""" return "Ezviz" @property def supported_features(self): """Return supported features.""" if self._rtsp_stream: return SUPPORT_STREAM return 0 @property def model(self): """Return the camera model.""" return self._device_sub_category @property def is_on(self): """Return true if on.""" return self._status @property def name(self): """Return the name of this camera.""" return self._name async def async_camera_image(self): """Return a frame from the camera stream.""" ffmpeg = ImageFrame(self._ffmpeg.binary, loop=self.hass.loop) image = await asyncio.shield( ffmpeg.get_image(self._rtsp_stream, output_format=IMAGE_JPEG) ) return image async def stream_source(self): """Return the stream source.""" if self._local_rtsp_port: rtsp_stream_source = ( f"rtsp://{self._username}:{self._password}@" f"{self._local_ip}:{self._local_rtsp_port}" ) _LOGGER.debug( "Camera %s source stream: %s", self._serial, rtsp_stream_source ) self._rtsp_stream = rtsp_stream_source return rtsp_stream_source return None