core/homeassistant/components/onvif/device.py

400 lines
14 KiB
Python
Raw Normal View History

2020-05-06 16:29:59 +00:00
"""ONVIF device abstraction."""
import asyncio
import datetime as dt
import os
from typing import List
from aiohttp.client_exceptions import ClientConnectionError, ServerDisconnectedError
import onvif
from onvif import ONVIFCamera
from onvif.exceptions import ONVIFError
from zeep.asyncio import AsyncTransport
from zeep.exceptions import Fault
from homeassistant.const import (
CONF_HOST,
CONF_NAME,
CONF_PASSWORD,
CONF_PORT,
CONF_USERNAME,
)
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.util.dt as dt_util
from .const import (
ABSOLUTE_MOVE,
CONTINUOUS_MOVE,
GOTOPRESET_MOVE,
LOGGER,
PAN_FACTOR,
RELATIVE_MOVE,
TILT_FACTOR,
ZOOM_FACTOR,
)
from .models import PTZ, Capabilities, DeviceInfo, Profile, Resolution, Video
class ONVIFDevice:
"""Manages an ONVIF device."""
def __init__(self, hass, config_entry=None):
"""Initialize the device."""
self.hass = hass
self.config_entry = config_entry
self.available = True
self.device = None
self.info = DeviceInfo()
self.capabilities = Capabilities()
self.profiles = []
self.max_resolution = 0
@property
def name(self) -> str:
"""Return the name of this device."""
return self.config_entry.data[CONF_NAME]
@property
def host(self) -> str:
"""Return the host of this device."""
return self.config_entry.data[CONF_HOST]
@property
def port(self) -> int:
"""Return the port of this device."""
return self.config_entry.data[CONF_PORT]
@property
def username(self) -> int:
"""Return the username of this device."""
return self.config_entry.data[CONF_USERNAME]
@property
def password(self) -> int:
"""Return the password of this device."""
return self.config_entry.data[CONF_PASSWORD]
async def async_setup(self) -> bool:
"""Set up the device."""
self.device = get_device(
self.hass,
host=self.config_entry.data[CONF_HOST],
port=self.config_entry.data[CONF_PORT],
username=self.config_entry.data[CONF_USERNAME],
password=self.config_entry.data[CONF_PASSWORD],
)
# Get all device info
try:
await self.device.update_xaddrs()
await self.async_check_date_and_time()
self.info = await self.async_get_device_info()
self.capabilities = await self.async_get_capabilities()
self.profiles = await self.async_get_profiles()
if self.capabilities.ptz:
self.device.create_ptz_service()
# Determine max resolution from profiles
self.max_resolution = max(
profile.video.resolution.width
for profile in self.profiles
if profile.video.encoding == "H264"
)
except ClientConnectionError as err:
LOGGER.warning(
"Couldn't connect to camera '%s', but will retry later. Error: %s",
self.name,
err,
)
self.available = False
except Fault as err:
LOGGER.error(
"Couldn't connect to camera '%s', please verify "
"that the credentials are correct. Error: %s",
self.name,
err,
)
return False
return True
async def async_check_date_and_time(self) -> None:
"""Warns if device and system date not synced."""
LOGGER.debug("Setting up the ONVIF device management service")
devicemgmt = self.device.create_devicemgmt_service()
LOGGER.debug("Retrieving current device date/time")
try:
system_date = dt_util.utcnow()
device_time = await devicemgmt.GetSystemDateAndTime()
if not device_time:
LOGGER.debug(
"""Couldn't get device '%s' date/time.
GetSystemDateAndTime() return null/empty""",
self.name,
)
return
if device_time.UTCDateTime:
tzone = dt_util.UTC
cdate = device_time.UTCDateTime
else:
tzone = (
dt_util.get_time_zone(device_time.TimeZone)
or dt_util.DEFAULT_TIME_ZONE
)
cdate = device_time.LocalDateTime
if cdate is None:
LOGGER.warning("Could not retrieve date/time on this camera")
else:
cam_date = dt.datetime(
cdate.Date.Year,
cdate.Date.Month,
cdate.Date.Day,
cdate.Time.Hour,
cdate.Time.Minute,
cdate.Time.Second,
0,
tzone,
)
cam_date_utc = cam_date.astimezone(dt_util.UTC)
LOGGER.debug(
"Device date/time: %s | System date/time: %s",
cam_date_utc,
system_date,
)
dt_diff = cam_date - system_date
dt_diff_seconds = dt_diff.total_seconds()
if dt_diff_seconds > 5:
LOGGER.warning(
"The date/time on the device (UTC) is '%s', "
"which is different from the system '%s', "
"this could lead to authentication issues",
cam_date_utc,
system_date,
)
except ServerDisconnectedError as err:
LOGGER.warning(
"Couldn't get device '%s' date/time. Error: %s", self.name, err
)
async def async_get_device_info(self) -> DeviceInfo:
"""Obtain information about this device."""
devicemgmt = self.device.create_devicemgmt_service()
device_info = await devicemgmt.GetDeviceInformation()
return DeviceInfo(
device_info.Manufacturer,
device_info.Model,
device_info.FirmwareVersion,
self.config_entry.unique_id,
)
async def async_get_capabilities(self):
"""Obtain information about the available services on the device."""
media_service = self.device.create_media_service()
capabilities = await media_service.GetServiceCapabilities()
ptz = False
try:
self.device.get_definition("ptz")
ptz = True
except ONVIFError:
pass
return Capabilities(capabilities.SnapshotUri, ptz)
async def async_get_profiles(self) -> List[Profile]:
"""Obtain media profiles for this device."""
media_service = self.device.create_media_service()
result = await media_service.GetProfiles()
profiles = []
for key, onvif_profile in enumerate(result):
# Only add H264 profiles
if onvif_profile.VideoEncoderConfiguration.Encoding != "H264":
continue
profile = Profile(
key,
onvif_profile.token,
onvif_profile.Name,
Video(
onvif_profile.VideoEncoderConfiguration.Encoding,
Resolution(
onvif_profile.VideoEncoderConfiguration.Resolution.Width,
onvif_profile.VideoEncoderConfiguration.Resolution.Height,
),
),
)
# Configure PTZ options
if onvif_profile.PTZConfiguration:
profile.ptz = PTZ(
onvif_profile.PTZConfiguration.DefaultContinuousPanTiltVelocitySpace
is not None,
onvif_profile.PTZConfiguration.DefaultRelativePanTiltTranslationSpace
is not None,
onvif_profile.PTZConfiguration.DefaultAbsolutePantTiltPositionSpace
is not None,
)
ptz_service = self.device.get_service("ptz")
presets = await ptz_service.GetPresets(profile.token)
profile.ptz.presets = [preset.token for preset in presets]
profiles.append(profile)
return profiles
async def async_get_snapshot_uri(self, profile: Profile) -> str:
"""Get the snapshot URI for a specified profile."""
if not self.capabilities.snapshot:
return None
media_service = self.device.create_media_service()
req = media_service.create_type("GetSnapshotUri")
req.ProfileToken = profile.token
result = await media_service.GetSnapshotUri(req)
return result.Uri
async def async_get_stream_uri(self, profile: Profile) -> str:
"""Get the stream URI for a specified profile."""
media_service = self.device.create_media_service()
req = media_service.create_type("GetStreamUri")
req.ProfileToken = profile.token
req.StreamSetup = {
"Stream": "RTP-Unicast",
"Transport": {"Protocol": "RTSP"},
}
result = await media_service.GetStreamUri(req)
return result.Uri
async def async_perform_ptz(
self,
profile: Profile,
distance,
speed,
move_mode,
continuous_duration,
preset,
pan=None,
tilt=None,
zoom=None,
):
"""Perform a PTZ action on the camera."""
if not self.capabilities.ptz:
LOGGER.warning("PTZ actions are not supported on device '%s'", self.name)
return
ptz_service = self.device.get_service("ptz")
pan_val = distance * PAN_FACTOR.get(pan, 0)
tilt_val = distance * TILT_FACTOR.get(tilt, 0)
zoom_val = distance * ZOOM_FACTOR.get(zoom, 0)
speed_val = speed
preset_val = preset
LOGGER.debug(
"Calling %s PTZ | Pan = %4.2f | Tilt = %4.2f | Zoom = %4.2f | Speed = %4.2f | Preset = %s",
move_mode,
pan_val,
tilt_val,
zoom_val,
speed_val,
preset_val,
)
try:
req = ptz_service.create_type(move_mode)
req.ProfileToken = profile.token
if move_mode == CONTINUOUS_MOVE:
# Guard against unsupported operation
if not profile.ptz.continuous:
LOGGER.warning(
"ContinuousMove not supported on device '%s'", self.name
)
return
req.Velocity = {
"PanTilt": {"x": pan_val, "y": tilt_val},
"Zoom": {"x": zoom_val},
}
await ptz_service.ContinuousMove(req)
await asyncio.sleep(continuous_duration)
req = ptz_service.create_type("Stop")
req.ProfileToken = profile.token
await ptz_service.Stop({"ProfileToken": req.ProfileToken})
elif move_mode == RELATIVE_MOVE:
# Guard against unsupported operation
if not profile.ptz.relative:
LOGGER.warning(
"ContinuousMove not supported on device '%s'", self.name
)
return
req.Translation = {
"PanTilt": {"x": pan_val, "y": tilt_val},
"Zoom": {"x": zoom_val},
}
req.Speed = {
"PanTilt": {"x": speed_val, "y": speed_val},
"Zoom": {"x": speed_val},
}
await ptz_service.RelativeMove(req)
elif move_mode == ABSOLUTE_MOVE:
# Guard against unsupported operation
if not profile.ptz.absolute:
LOGGER.warning(
"ContinuousMove not supported on device '%s'", self.name
)
return
req.Position = {
"PanTilt": {"x": pan_val, "y": tilt_val},
"Zoom": {"x": zoom_val},
}
req.Speed = {
"PanTilt": {"x": speed_val, "y": speed_val},
"Zoom": {"x": speed_val},
}
await ptz_service.AbsoluteMove(req)
elif move_mode == GOTOPRESET_MOVE:
# Guard against unsupported operation
if preset_val not in profile.ptz.presets:
LOGGER.warning(
"PTZ preset '%s' does not exist on device '%s'. Available Presets: %s",
preset_val,
self.name,
profile.ptz.presets.join(", "),
)
return
req.PresetToken = preset_val
req.Speed = {
"PanTilt": {"x": speed_val, "y": speed_val},
"Zoom": {"x": speed_val},
}
await ptz_service.GotoPreset(req)
except ONVIFError as err:
if "Bad Request" in err.reason:
LOGGER.warning("Device '%s' doesn't support PTZ.", self.name)
else:
LOGGER.error("Error trying to perform PTZ action: %s", err)
def get_device(hass, host, port, username, password) -> ONVIFCamera:
"""Get ONVIFCamera instance."""
session = async_get_clientsession(hass)
transport = AsyncTransport(None, session=session)
return ONVIFCamera(
host,
port,
username,
password,
f"{os.path.dirname(onvif.__file__)}/wsdl/",
transport=transport,
)