core/homeassistant/components/opencv.py

184 lines
5.3 KiB
Python

"""
Support for OpenCV image/video processing.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/opencv/
"""
import logging
import os
import voluptuous as vol
import requests
from homeassistant.const import (
CONF_NAME,
CONF_ENTITY_ID,
CONF_FILE_PATH
)
from homeassistant.helpers import (
discovery,
config_validation as cv,
)
REQUIREMENTS = ['opencv-python==3.2.0.6', 'numpy==1.12.0']
_LOGGER = logging.getLogger(__name__)
ATTR_MATCHES = 'matches'
BASE_PATH = os.path.realpath(__file__)
CASCADE_URL = \
'https://raw.githubusercontent.com/opencv/opencv/master/data/' +\
'lbpcascades/lbpcascade_frontalface.xml'
CONF_CLASSIFIER = 'classifier'
CONF_COLOR = 'color'
CONF_GROUPS = 'classifier_group'
CONF_MIN_SIZE = 'min_size'
CONF_NEIGHBORS = 'neighbors'
CONF_SCALE = 'scale'
DATA_CLASSIFIER_GROUPS = 'classifier_groups'
DEFAULT_COLOR = (255, 255, 0)
DEFAULT_CLASSIFIER_PATH = 'lbp_frontalface.xml'
DEFAULT_NAME = 'OpenCV'
DEFAULT_MIN_SIZE = (30, 30)
DEFAULT_NEIGHBORS = 4
DEFAULT_SCALE = 1.1
DOMAIN = 'opencv'
CLASSIFIER_GROUP_CONFIG = {
vol.Required(CONF_CLASSIFIER): vol.All(
cv.ensure_list,
[vol.Schema({
vol.Optional(CONF_COLOR, default=DEFAULT_COLOR):
vol.Schema((int, int, int)),
vol.Optional(CONF_FILE_PATH, default=None): cv.isfile,
vol.Optional(CONF_NAME, default=DEFAULT_NAME):
cv.string,
vol.Optional(CONF_MIN_SIZE, default=DEFAULT_MIN_SIZE):
vol.Schema((int, int)),
vol.Optional(CONF_NEIGHBORS, default=DEFAULT_NEIGHBORS):
cv.positive_int,
vol.Optional(CONF_SCALE, default=DEFAULT_SCALE):
float
})]),
vol.Required(CONF_ENTITY_ID): cv.entity_ids,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
}
CLASSIFIER_GROUP_SCHEMA = vol.Schema(CLASSIFIER_GROUP_CONFIG)
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Required(CONF_GROUPS): vol.All(
cv.ensure_list,
[CLASSIFIER_GROUP_SCHEMA]
),
})
}, extra=vol.ALLOW_EXTRA)
# NOTE:
# pylint cannot find any of the members of cv2, using disable=no-member
# to pass linting
def cv_image_to_bytes(cv_image):
"""Convert OpenCV image to bytes."""
import cv2 # pylint: disable=import-error
# pylint: disable=no-member
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 90]
# pylint: disable=no-member
success, data = cv2.imencode('.jpg', cv_image, encode_param)
if success:
return data.tobytes()
return None
def cv_image_from_bytes(image):
"""Convert image bytes to OpenCV image."""
import cv2 # pylint: disable=import-error
import numpy
# pylint: disable=no-member
return cv2.imdecode(numpy.asarray(bytearray(image)), cv2.IMREAD_UNCHANGED)
def process_image(image, classifier_group, is_camera):
"""Process the image given a classifier group."""
import cv2 # pylint: disable=import-error
import numpy
# pylint: disable=no-member
cv_image = cv2.imdecode(numpy.asarray(bytearray(image)),
cv2.IMREAD_UNCHANGED)
group_matches = {}
for classifier_config in classifier_group:
classifier_path = classifier_config[CONF_FILE_PATH]
classifier_name = classifier_config[CONF_NAME]
color = classifier_config[CONF_COLOR]
scale = classifier_config[CONF_SCALE]
neighbors = classifier_config[CONF_NEIGHBORS]
min_size = classifier_config[CONF_MIN_SIZE]
# pylint: disable=no-member
classifier = cv2.CascadeClassifier(classifier_path)
detections = classifier.detectMultiScale(cv_image,
scaleFactor=scale,
minNeighbors=neighbors,
minSize=min_size)
regions = []
# pylint: disable=invalid-name
for (x, y, w, h) in detections:
if is_camera:
# pylint: disable=no-member
cv2.rectangle(cv_image,
(x, y),
(x + w, y + h),
color,
2)
else:
regions.append((int(x), int(y), int(w), int(h)))
group_matches[classifier_name] = regions
if is_camera:
return cv_image_to_bytes(cv_image)
else:
return group_matches
def setup(hass, config):
"""Set up the OpenCV platform entities."""
default_classifier = hass.config.path(DEFAULT_CLASSIFIER_PATH)
if not os.path.isfile(default_classifier):
_LOGGER.info('Downloading default classifier')
req = requests.get(CASCADE_URL, stream=True)
with open(default_classifier, 'wb') as fil:
for chunk in req.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
fil.write(chunk)
for group in config[DOMAIN][CONF_GROUPS]:
grp = {}
for classifier, config in group.items():
config = dict(config)
if config[CONF_FILE_PATH] is None:
config[CONF_FILE_PATH] = default_classifier
grp[classifier] = config
discovery.load_platform(hass, 'image_processing', DOMAIN, grp)
return True