core/homeassistant/components/ezviz/camera.py

240 lines
7.7 KiB
Python

"""This component provides basic support for Ezviz IP cameras."""
import asyncio
import logging
# pylint: disable=import-error
from haffmpeg.tools import IMAGE_JPEG, ImageFrame
from pyezviz.camera import EzvizCamera
from pyezviz.client import EzvizClient, PyEzvizError
import voluptuous as vol
from homeassistant.components.camera import PLATFORM_SCHEMA, SUPPORT_STREAM, Camera
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
from homeassistant.helpers import config_validation as cv
_LOGGER = logging.getLogger(__name__)
CONF_CAMERAS = "cameras"
DEFAULT_CAMERA_USERNAME = "admin"
DEFAULT_RTSP_PORT = "554"
DATA_FFMPEG = "ffmpeg"
EZVIZ_DATA = "ezviz"
ENTITIES = "entities"
CAMERA_SCHEMA = vol.Schema(
{vol.Required(CONF_USERNAME): cv.string, vol.Required(CONF_PASSWORD): cv.string}
)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_CAMERAS, default={}): {cv.string: CAMERA_SCHEMA},
}
)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Ezviz IP Cameras."""
conf_cameras = config[CONF_CAMERAS]
account = config[CONF_USERNAME]
password = config[CONF_PASSWORD]
try:
ezviz_client = EzvizClient(account, password)
ezviz_client.login()
cameras = ezviz_client.load_cameras()
except PyEzvizError as exp:
_LOGGER.error(exp)
return
# now, let's build the HASS devices
camera_entities = []
# Add the cameras as devices in HASS
for camera in cameras:
camera_username = DEFAULT_CAMERA_USERNAME
camera_password = ""
camera_rtsp_stream = ""
camera_serial = camera["serial"]
# There seem to be a bug related to localRtspPort in Ezviz API...
local_rtsp_port = DEFAULT_RTSP_PORT
if camera["local_rtsp_port"] and camera["local_rtsp_port"] != 0:
local_rtsp_port = camera["local_rtsp_port"]
if camera_serial in conf_cameras:
camera_username = conf_cameras[camera_serial][CONF_USERNAME]
camera_password = conf_cameras[camera_serial][CONF_PASSWORD]
camera_rtsp_stream = f"rtsp://{camera_username}:{camera_password}@{camera['local_ip']}:{local_rtsp_port}"
_LOGGER.debug(
"Camera %s source stream: %s", camera["serial"], camera_rtsp_stream
)
else:
_LOGGER.info(
"Found camera with serial %s without configuration. Add it to configuration.yaml to see the camera stream",
camera_serial,
)
camera["username"] = camera_username
camera["password"] = camera_password
camera["rtsp_stream"] = camera_rtsp_stream
camera["ezviz_camera"] = EzvizCamera(ezviz_client, camera_serial)
camera_entities.append(HassEzvizCamera(**camera))
add_entities(camera_entities)
class HassEzvizCamera(Camera):
"""An implementation of a Foscam IP camera."""
def __init__(self, **data):
"""Initialize an Ezviz camera."""
super().__init__()
self._username = data["username"]
self._password = data["password"]
self._rtsp_stream = data["rtsp_stream"]
self._ezviz_camera = data["ezviz_camera"]
self._serial = data["serial"]
self._name = data["name"]
self._status = data["status"]
self._privacy = data["privacy"]
self._audio = data["audio"]
self._ir_led = data["ir_led"]
self._state_led = data["state_led"]
self._follow_move = data["follow_move"]
self._alarm_notify = data["alarm_notify"]
self._alarm_sound_mod = data["alarm_sound_mod"]
self._encrypted = data["encrypted"]
self._local_ip = data["local_ip"]
self._detection_sensibility = data["detection_sensibility"]
self._device_sub_category = data["device_sub_category"]
self._local_rtsp_port = data["local_rtsp_port"]
self._ffmpeg = None
def update(self):
"""Update the camera states."""
data = self._ezviz_camera.status()
self._name = data["name"]
self._status = data["status"]
self._privacy = data["privacy"]
self._audio = data["audio"]
self._ir_led = data["ir_led"]
self._state_led = data["state_led"]
self._follow_move = data["follow_move"]
self._alarm_notify = data["alarm_notify"]
self._alarm_sound_mod = data["alarm_sound_mod"]
self._encrypted = data["encrypted"]
self._local_ip = data["local_ip"]
self._detection_sensibility = data["detection_sensibility"]
self._device_sub_category = data["device_sub_category"]
self._local_rtsp_port = data["local_rtsp_port"]
async def async_added_to_hass(self):
"""Subscribe to ffmpeg and add camera to list."""
self._ffmpeg = self.hass.data[DATA_FFMPEG]
@property
def should_poll(self) -> bool:
"""Return True if entity has to be polled for state.
False if entity pushes its state to HA.
"""
return True
@property
def device_state_attributes(self):
"""Return the Ezviz-specific camera state attributes."""
return {
# if privacy == true, the device closed the lid or did a 180° tilt
"privacy": self._privacy,
# is the camera listening ?
"audio": self._audio,
# infrared led on ?
"ir_led": self._ir_led,
# state led on ?
"state_led": self._state_led,
# if true, the camera will move automatically to follow movements
"follow_move": self._follow_move,
# if true, if some movement is detected, the app is notified
"alarm_notify": self._alarm_notify,
# if true, if some movement is detected, the camera makes some sound
"alarm_sound_mod": self._alarm_sound_mod,
# are the camera's stored videos/images encrypted?
"encrypted": self._encrypted,
# camera's local ip on local network
"local_ip": self._local_ip,
# from 1 to 9, the higher is the sensibility, the more it will detect small movements
"detection_sensibility": self._detection_sensibility,
}
@property
def available(self):
"""Return True if entity is available."""
return self._status
@property
def brand(self):
"""Return the camera brand."""
return "Ezviz"
@property
def supported_features(self):
"""Return supported features."""
if self._rtsp_stream:
return SUPPORT_STREAM
return 0
@property
def model(self):
"""Return the camera model."""
return self._device_sub_category
@property
def is_on(self):
"""Return true if on."""
return self._status
@property
def name(self):
"""Return the name of this camera."""
return self._name
async def async_camera_image(self):
"""Return a frame from the camera stream."""
ffmpeg = ImageFrame(self._ffmpeg.binary, loop=self.hass.loop)
image = await asyncio.shield(
ffmpeg.get_image(self._rtsp_stream, output_format=IMAGE_JPEG)
)
return image
async def stream_source(self):
"""Return the stream source."""
if self._local_rtsp_port:
rtsp_stream_source = (
f"rtsp://{self._username}:{self._password}@"
f"{self._local_ip}:{self._local_rtsp_port}"
)
_LOGGER.debug(
"Camera %s source stream: %s", self._serial, rtsp_stream_source
)
self._rtsp_stream = rtsp_stream_source
return rtsp_stream_source
return None