core/homeassistant/components/doods/image_processing.py

399 lines
13 KiB
Python

"""Support for the DOODS service."""
from __future__ import annotations
import io
import logging
import os
import time
from PIL import Image, ImageDraw, UnidentifiedImageError
from pydoods import PyDOODS
import voluptuous as vol
from homeassistant.components.image_processing import (
CONF_CONFIDENCE,
PLATFORM_SCHEMA,
ImageProcessingEntity,
)
from homeassistant.const import (
CONF_COVERS,
CONF_ENTITY_ID,
CONF_NAME,
CONF_SOURCE,
CONF_TIMEOUT,
CONF_URL,
)
from homeassistant.core import HomeAssistant, split_entity_id
from homeassistant.helpers import template
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util.pil import draw_box
_LOGGER = logging.getLogger(__name__)
ATTR_MATCHES = "matches"
ATTR_SUMMARY = "summary"
ATTR_TOTAL_MATCHES = "total_matches"
ATTR_PROCESS_TIME = "process_time"
CONF_AUTH_KEY = "auth_key"
CONF_DETECTOR = "detector"
CONF_LABELS = "labels"
CONF_AREA = "area"
CONF_TOP = "top"
CONF_BOTTOM = "bottom"
CONF_RIGHT = "right"
CONF_LEFT = "left"
CONF_FILE_OUT = "file_out"
AREA_SCHEMA = vol.Schema(
{
vol.Optional(CONF_BOTTOM, default=1): cv.small_float,
vol.Optional(CONF_LEFT, default=0): cv.small_float,
vol.Optional(CONF_RIGHT, default=1): cv.small_float,
vol.Optional(CONF_TOP, default=0): cv.small_float,
vol.Optional(CONF_COVERS, default=True): cv.boolean,
}
)
LABEL_SCHEMA = vol.Schema(
{
vol.Required(CONF_NAME): cv.string,
vol.Optional(CONF_AREA): AREA_SCHEMA,
vol.Optional(CONF_CONFIDENCE): vol.Range(min=0, max=100),
}
)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_URL): cv.string,
vol.Required(CONF_DETECTOR): cv.string,
vol.Required(CONF_TIMEOUT, default=90): cv.positive_int,
vol.Optional(CONF_AUTH_KEY, default=""): cv.string,
vol.Optional(CONF_FILE_OUT, default=[]): vol.All(cv.ensure_list, [cv.template]),
vol.Optional(CONF_CONFIDENCE, default=0.0): vol.Range(min=0, max=100),
vol.Optional(CONF_LABELS, default=[]): vol.All(
cv.ensure_list, [vol.Any(cv.string, LABEL_SCHEMA)]
),
vol.Optional(CONF_AREA): AREA_SCHEMA,
}
)
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Doods client."""
url = config[CONF_URL]
auth_key = config[CONF_AUTH_KEY]
detector_name = config[CONF_DETECTOR]
timeout = config[CONF_TIMEOUT]
doods = PyDOODS(url, auth_key, timeout)
response = doods.get_detectors()
if not isinstance(response, dict):
_LOGGER.warning("Could not connect to doods server: %s", url)
return
detector = {}
for server_detector in response["detectors"]:
if server_detector["name"] == detector_name:
detector = server_detector
break
if not detector:
_LOGGER.warning(
"Detector %s is not supported by doods server %s", detector_name, url
)
return
entities = []
for camera in config[CONF_SOURCE]:
entities.append(
Doods(
hass,
camera[CONF_ENTITY_ID],
camera.get(CONF_NAME),
doods,
detector,
config,
)
)
add_entities(entities)
class Doods(ImageProcessingEntity):
"""Doods image processing service client."""
def __init__(self, hass, camera_entity, name, doods, detector, config):
"""Initialize the DOODS entity."""
self.hass = hass
self._camera_entity = camera_entity
if name:
self._name = name
else:
name = split_entity_id(camera_entity)[1]
self._name = f"Doods {name}"
self._doods = doods
self._file_out = config[CONF_FILE_OUT]
self._detector_name = detector["name"]
# detector config and aspect ratio
self._width = None
self._height = None
self._aspect = None
if detector["width"] and detector["height"]:
self._width = detector["width"]
self._height = detector["height"]
self._aspect = self._width / self._height
# the base confidence
dconfig = {}
confidence = config[CONF_CONFIDENCE]
# handle labels and specific detection areas
labels = config[CONF_LABELS]
self._label_areas = {}
self._label_covers = {}
for label in labels:
if isinstance(label, dict):
label_name = label[CONF_NAME]
if label_name not in detector["labels"] and label_name != "*":
_LOGGER.warning("Detector does not support label %s", label_name)
continue
# If label confidence is not specified, use global confidence
if not (label_confidence := label.get(CONF_CONFIDENCE)):
label_confidence = confidence
if label_name not in dconfig or dconfig[label_name] > label_confidence:
dconfig[label_name] = label_confidence
# Label area
label_area = label.get(CONF_AREA)
self._label_areas[label_name] = [0, 0, 1, 1]
self._label_covers[label_name] = True
if label_area:
self._label_areas[label_name] = [
label_area[CONF_TOP],
label_area[CONF_LEFT],
label_area[CONF_BOTTOM],
label_area[CONF_RIGHT],
]
self._label_covers[label_name] = label_area[CONF_COVERS]
else:
if label not in detector["labels"] and label != "*":
_LOGGER.warning("Detector does not support label %s", label)
continue
self._label_areas[label] = [0, 0, 1, 1]
self._label_covers[label] = True
if label not in dconfig or dconfig[label] > confidence:
dconfig[label] = confidence
if not dconfig:
dconfig["*"] = confidence
# Handle global detection area
self._area = [0, 0, 1, 1]
self._covers = True
if area_config := config.get(CONF_AREA):
self._area = [
area_config[CONF_TOP],
area_config[CONF_LEFT],
area_config[CONF_BOTTOM],
area_config[CONF_RIGHT],
]
self._covers = area_config[CONF_COVERS]
template.attach(hass, self._file_out)
self._dconfig = dconfig
self._matches = {}
self._total_matches = 0
self._last_image = None
self._process_time = 0
@property
def camera_entity(self):
"""Return camera entity id from process pictures."""
return self._camera_entity
@property
def name(self):
"""Return the name of the image processor."""
return self._name
@property
def state(self):
"""Return the state of the entity."""
return self._total_matches
@property
def extra_state_attributes(self):
"""Return device specific state attributes."""
return {
ATTR_MATCHES: self._matches,
ATTR_SUMMARY: {
label: len(values) for label, values in self._matches.items()
},
ATTR_TOTAL_MATCHES: self._total_matches,
ATTR_PROCESS_TIME: self._process_time,
}
def _save_image(self, image, matches, paths):
img = Image.open(io.BytesIO(bytearray(image))).convert("RGB")
img_width, img_height = img.size
draw = ImageDraw.Draw(img)
# Draw custom global region/area
if self._area != [0, 0, 1, 1]:
draw_box(
draw, self._area, img_width, img_height, "Detection Area", (0, 255, 255)
)
for label, values in matches.items():
# Draw custom label regions/areas
if label in self._label_areas and self._label_areas[label] != [0, 0, 1, 1]:
box_label = f"{label.capitalize()} Detection Area"
draw_box(
draw,
self._label_areas[label],
img_width,
img_height,
box_label,
(0, 255, 0),
)
# Draw detected objects
for instance in values:
box_label = f'{label} {instance["score"]:.1f}%'
# Already scaled, use 1 for width and height
draw_box(
draw,
instance["box"],
img_width,
img_height,
box_label,
(255, 255, 0),
)
for path in paths:
_LOGGER.info("Saving results image to %s", path)
os.makedirs(os.path.dirname(path), exist_ok=True)
img.save(path)
def process_image(self, image):
"""Process the image."""
try:
img = Image.open(io.BytesIO(bytearray(image))).convert("RGB")
except UnidentifiedImageError:
_LOGGER.warning("Unable to process image, bad data")
return
img_width, img_height = img.size
if self._aspect and abs((img_width / img_height) - self._aspect) > 0.1:
_LOGGER.debug(
(
"The image aspect: %s and the detector aspect: %s differ by more"
" than 0.1"
),
(img_width / img_height),
self._aspect,
)
# Run detection
start = time.monotonic()
response = self._doods.detect(
image, dconfig=self._dconfig, detector_name=self._detector_name
)
_LOGGER.debug(
"doods detect: %s response: %s duration: %s",
self._dconfig,
response,
time.monotonic() - start,
)
matches = {}
total_matches = 0
if not response or "error" in response:
if "error" in response:
_LOGGER.error(response["error"])
self._matches = matches
self._total_matches = total_matches
self._process_time = time.monotonic() - start
return
for detection in response["detections"]:
score = detection["confidence"]
boxes = [
detection["top"],
detection["left"],
detection["bottom"],
detection["right"],
]
label = detection["label"]
# Exclude unlisted labels
if "*" not in self._dconfig and label not in self._dconfig:
continue
# Exclude matches outside global area definition
if self._covers:
if (
boxes[0] < self._area[0]
or boxes[1] < self._area[1]
or boxes[2] > self._area[2]
or boxes[3] > self._area[3]
):
continue
elif (
boxes[0] > self._area[2]
or boxes[1] > self._area[3]
or boxes[2] < self._area[0]
or boxes[3] < self._area[1]
):
continue
# Exclude matches outside label specific area definition
if self._label_areas.get(label):
if self._label_covers[label]:
if (
boxes[0] < self._label_areas[label][0]
or boxes[1] < self._label_areas[label][1]
or boxes[2] > self._label_areas[label][2]
or boxes[3] > self._label_areas[label][3]
):
continue
elif (
boxes[0] > self._label_areas[label][2]
or boxes[1] > self._label_areas[label][3]
or boxes[2] < self._label_areas[label][0]
or boxes[3] < self._label_areas[label][1]
):
continue
if label not in matches:
matches[label] = []
matches[label].append({"score": float(score), "box": boxes})
total_matches += 1
# Save Images
if total_matches and self._file_out:
paths = []
for path_template in self._file_out:
if isinstance(path_template, template.Template):
paths.append(
path_template.render(camera_entity=self._camera_entity)
)
else:
paths.append(path_template)
self._save_image(image, matches, paths)
self._matches = matches
self._total_matches = total_matches
self._process_time = time.monotonic() - start