core/homeassistant/components/camera/mjpeg.py

83 lines
2.7 KiB
Python
Raw Normal View History

2015-11-09 04:15:06 +00:00
"""
Support for IP Cameras.
For more details about this platform, please refer to the documentation at
2015-11-09 12:12:18 +00:00
https://home-assistant.io/components/camera.mjpeg/
2015-11-09 04:15:06 +00:00
"""
import logging
2016-02-19 05:27:50 +00:00
from contextlib import closing
2015-11-29 21:49:05 +00:00
import requests
2015-11-09 04:15:06 +00:00
from requests.auth import HTTPBasicAuth
2015-11-29 21:49:05 +00:00
from homeassistant.components.camera import DOMAIN, Camera
2016-02-19 05:27:50 +00:00
from homeassistant.helpers import validate_config
CONTENT_TYPE_HEADER = 'Content-Type'
2015-11-09 04:15:06 +00:00
_LOGGER = logging.getLogger(__name__)
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices_callback, discovery_info=None):
2016-03-07 16:45:06 +00:00
"""Setup a MJPEG IP Camera."""
2015-11-09 04:15:06 +00:00
if not validate_config({DOMAIN: config}, {DOMAIN: ['mjpeg_url']},
_LOGGER):
return None
add_devices_callback([MjpegCamera(config)])
# pylint: disable=too-many-instance-attributes
class MjpegCamera(Camera):
2016-03-07 19:29:54 +00:00
"""An implementation of an IP camera that is reachable over a URL."""
2015-11-09 04:15:06 +00:00
def __init__(self, device_info):
2016-03-07 19:29:54 +00:00
"""Initialize a MJPEG camera."""
2015-11-09 04:15:06 +00:00
super().__init__()
self._name = device_info.get('name', 'Mjpeg Camera')
self._username = device_info.get('username')
self._password = device_info.get('password')
self._mjpeg_url = device_info['mjpeg_url']
def camera_stream(self):
2016-03-07 16:45:06 +00:00
"""Return a MJPEG stream image response directly from the camera."""
if self._username and self._password:
return requests.get(self._mjpeg_url,
auth=HTTPBasicAuth(self._username,
self._password),
stream=True)
else:
return requests.get(self._mjpeg_url,
stream=True)
2015-11-09 04:15:06 +00:00
def camera_image(self):
2016-03-07 16:45:06 +00:00
"""Return a still image response from the camera."""
def process_response(response):
2016-03-07 16:45:06 +00:00
"""Take in a response object, return the jpg from it."""
data = b''
for chunk in response.iter_content(1024):
data += chunk
jpg_start = data.find(b'\xff\xd8')
jpg_end = data.find(b'\xff\xd9')
if jpg_start != -1 and jpg_end != -1:
jpg = data[jpg_start:jpg_end + 2]
return jpg
with closing(self.camera_stream()) as response:
return process_response(response)
2016-05-14 07:58:36 +00:00
def mjpeg_stream(self, response):
2016-03-07 16:45:06 +00:00
"""Generate an HTTP MJPEG stream from the camera."""
2016-05-14 07:58:36 +00:00
stream = self.camera_stream()
return response(
stream.iter_content(chunk_size=1024),
mimetype=stream.headers[CONTENT_TYPE_HEADER],
direct_passthrough=True
)
2015-11-09 04:15:06 +00:00
@property
def name(self):
2016-03-07 16:45:06 +00:00
"""Return the name of this camera."""
2015-11-09 04:15:06 +00:00
return self._name