core/homeassistant/components/opencv/image_processing.py

193 lines
5.6 KiB
Python
Raw Normal View History

2019-02-28 12:16:21 +00:00
"""Support for OpenCV classification on images."""
from datetime import timedelta
import logging
import numpy
import requests
import voluptuous as vol
from homeassistant.components.image_processing import (
2019-07-31 19:25:30 +00:00
CONF_ENTITY_ID,
CONF_NAME,
CONF_SOURCE,
PLATFORM_SCHEMA,
ImageProcessingEntity,
)
2018-01-09 14:30:00 +00:00
from homeassistant.core import split_entity_id
import homeassistant.helpers.config_validation as cv
try:
# Verify that the OpenCV python package is pre-installed
import cv2
CV2_IMPORTED = True
except ImportError:
CV2_IMPORTED = False
_LOGGER = logging.getLogger(__name__)
2019-07-31 19:25:30 +00:00
ATTR_MATCHES = "matches"
ATTR_TOTAL_MATCHES = "total_matches"
2019-07-31 19:25:30 +00:00
CASCADE_URL = (
"https://raw.githubusercontent.com/opencv/opencv/master/data/"
"lbpcascades/lbpcascade_frontalface.xml"
2019-07-31 19:25:30 +00:00
)
2019-07-31 19:25:30 +00:00
CONF_CLASSIFIER = "classifier"
CONF_FILE = "file"
CONF_MIN_SIZE = "min_size"
CONF_NEIGHBORS = "neighbors"
CONF_SCALE = "scale"
2019-07-31 19:25:30 +00:00
DEFAULT_CLASSIFIER_PATH = "lbp_frontalface.xml"
DEFAULT_MIN_SIZE = (30, 30)
DEFAULT_NEIGHBORS = 4
DEFAULT_SCALE = 1.1
DEFAULT_TIMEOUT = 10
SCAN_INTERVAL = timedelta(seconds=2)
2019-07-31 19:25:30 +00:00
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_CLASSIFIER): {
cv.string: vol.Any(
cv.isfile,
vol.Schema(
{
vol.Required(CONF_FILE): cv.isfile,
vol.Optional(CONF_SCALE, DEFAULT_SCALE): float,
vol.Optional(
CONF_NEIGHBORS, DEFAULT_NEIGHBORS
): cv.positive_int,
vol.Optional(CONF_MIN_SIZE, DEFAULT_MIN_SIZE): vol.Schema(
(int, int)
),
}
),
)
}
}
2019-07-31 19:25:30 +00:00
)
def _create_processor_from_config(hass, camera_entity, config):
"""Create an OpenCV processor from configuration."""
classifier_config = config.get(CONF_CLASSIFIER)
2020-04-07 21:14:28 +00:00
name = f"{config[CONF_NAME]} {split_entity_id(camera_entity)[1].replace('_', ' ')}"
2019-07-31 19:25:30 +00:00
processor = OpenCVImageProcessor(hass, camera_entity, name, classifier_config)
return processor
def _get_default_classifier(dest_path):
"""Download the default OpenCV classifier."""
2018-01-09 14:30:00 +00:00
_LOGGER.info("Downloading default classifier")
req = requests.get(CASCADE_URL, stream=True)
2019-07-31 19:25:30 +00:00
with open(dest_path, "wb") as fil:
for chunk in req.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
fil.write(chunk)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the OpenCV image processing platform."""
if not CV2_IMPORTED:
2018-01-09 14:30:00 +00:00
_LOGGER.error(
"No OpenCV library found! Install or compile for your system "
2019-07-31 19:25:30 +00:00
"following instructions here: http://opencv.org/releases.html"
)
return
entities = []
if CONF_CLASSIFIER not in config:
dest_path = hass.config.path(DEFAULT_CLASSIFIER_PATH)
_get_default_classifier(dest_path)
2019-07-31 19:25:30 +00:00
config[CONF_CLASSIFIER] = {"Face": dest_path}
for camera in config[CONF_SOURCE]:
2019-07-31 19:25:30 +00:00
entities.append(
OpenCVImageProcessor(
hass,
camera[CONF_ENTITY_ID],
camera.get(CONF_NAME),
config[CONF_CLASSIFIER],
)
)
add_entities(entities)
class OpenCVImageProcessor(ImageProcessingEntity):
"""Representation of an OpenCV image processor."""
def __init__(self, hass, camera_entity, name, classifiers):
"""Initialize the OpenCV entity."""
self.hass = hass
self._camera_entity = camera_entity
if name:
self._name = name
else:
2020-04-07 21:14:28 +00:00
self._name = f"OpenCV {split_entity_id(camera_entity)[1]}"
self._classifiers = classifiers
self._matches = {}
self._total_matches = 0
self._last_image = None
@property
def camera_entity(self):
"""Return camera entity id from process pictures."""
return self._camera_entity
@property
def name(self):
"""Return the name of the image processor."""
return self._name
@property
def state(self):
"""Return the state of the entity."""
return self._total_matches
@property
def state_attributes(self):
"""Return device specific state attributes."""
2019-07-31 19:25:30 +00:00
return {ATTR_MATCHES: self._matches, ATTR_TOTAL_MATCHES: self._total_matches}
def process_image(self, image):
"""Process the image."""
2019-07-31 19:25:30 +00:00
cv_image = cv2.imdecode(numpy.asarray(bytearray(image)), cv2.IMREAD_UNCHANGED)
matches = {}
total_matches = 0
for name, classifier in self._classifiers.items():
scale = DEFAULT_SCALE
neighbors = DEFAULT_NEIGHBORS
min_size = DEFAULT_MIN_SIZE
if isinstance(classifier, dict):
path = classifier[CONF_FILE]
scale = classifier.get(CONF_SCALE, scale)
neighbors = classifier.get(CONF_NEIGHBORS, neighbors)
min_size = classifier.get(CONF_MIN_SIZE, min_size)
else:
path = classifier
cascade = cv2.CascadeClassifier(path)
detections = cascade.detectMultiScale(
2019-07-31 19:25:30 +00:00
cv_image, scaleFactor=scale, minNeighbors=neighbors, minSize=min_size
)
regions = []
# pylint: disable=invalid-name
for (x, y, w, h) in detections:
regions.append((int(x), int(y), int(w), int(h)))
total_matches += 1
matches[name] = regions
self._matches = matches
self._total_matches = total_matches