core/homeassistant/components/openalpr.py

475 lines
14 KiB
Python
Raw Normal View History

"""
Component that will help set the openalpr for video streams.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/openalpr/
"""
from base64 import b64encode
import logging
import os
from time import time
import requests
import voluptuous as vol
from homeassistant.config import load_yaml_config_file
from homeassistant.const import (
CONF_API_KEY, CONF_NAME, CONF_USERNAME, CONF_PASSWORD, ATTR_ENTITY_ID,
EVENT_HOMEASSISTANT_STOP, STATE_UNKNOWN)
from homeassistant.components.ffmpeg import (
get_binary, run_test, CONF_INPUT, CONF_EXTRA_ARGUMENTS)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_component import EntityComponent
DOMAIN = 'openalpr'
DEPENDENCIES = ['ffmpeg']
REQUIREMENTS = [
'https://github.com/pvizeli/cloudapi/releases/download/1.0.2/'
'python-1.0.2.zip#cloud_api==1.0.2',
'ha-alpr==0.3']
_LOGGER = logging.getLogger(__name__)
SERVICE_SCAN = 'scan'
SERVICE_RESTART = 'restart'
EVENT_FOUND = 'openalpr.found'
ATTR_PLATE = 'plate'
ENGINE_LOCAL = 'local'
ENGINE_CLOUD = 'cloud'
RENDER_IMAGE = 'image'
RENDER_FFMPEG = 'ffmpeg'
OPENALPR_REGIONS = [
'us',
'eu',
'au',
'auwide',
'gb',
'kr',
'mx',
'sg',
]
CONF_RENDER = 'render'
CONF_ENGINE = 'engine'
CONF_REGION = 'region'
CONF_INTERVAL = 'interval'
CONF_ENTITIES = 'entities'
CONF_CONFIDENCE = 'confidence'
CONF_ALPR_BINARY = 'alpr_binary'
DEFAULT_NAME = 'OpenAlpr'
DEFAULT_ENGINE = ENGINE_LOCAL
DEFAULT_RENDER = RENDER_FFMPEG
DEFAULT_BINARY = 'alpr'
DEFAULT_INTERVAL = 10
DEFAULT_CONFIDENCE = 80.0
DEVICE_SCHEMA = vol.Schema({
vol.Required(CONF_INPUT): cv.string,
vol.Optional(CONF_INTERVAL, default=DEFAULT_INTERVAL): cv.positive_int,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_RENDER, default=DEFAULT_RENDER):
vol.In([RENDER_IMAGE, RENDER_FFMPEG]),
vol.Optional(CONF_EXTRA_ARGUMENTS): cv.string,
vol.Optional(CONF_USERNAME): cv.string,
vol.Optional(CONF_PASSWORD): cv.string,
})
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Required(CONF_ENGINE): vol.In([ENGINE_LOCAL, ENGINE_CLOUD]),
vol.Required(CONF_REGION): vol.In(OPENALPR_REGIONS),
vol.Optional(CONF_CONFIDENCE, default=DEFAULT_CONFIDENCE):
vol.Coerce(float),
vol.Optional(CONF_API_KEY): cv.string,
vol.Optional(CONF_ALPR_BINARY, default=DEFAULT_BINARY): cv.string,
vol.Required(CONF_ENTITIES):
vol.All(cv.ensure_list, [DEVICE_SCHEMA]),
})
}, extra=vol.ALLOW_EXTRA)
SERVICE_RESTART_SCHEMA = vol.Schema({
vol.Optional(ATTR_ENTITY_ID): cv.entity_ids,
})
SERVICE_SCAN_SCHEMA = vol.Schema({
vol.Optional(ATTR_ENTITY_ID): cv.entity_ids,
})
def scan(hass, entity_id=None):
"""Scan a image immediately."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_SCAN, data)
def restart(hass, entity_id=None):
"""Restart a ffmpeg process."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_RESTART, data)
# pylint: disable=too-many-locals
def setup(hass, config):
"""Setup the OpenAlpr component."""
engine = config[DOMAIN].get(CONF_ENGINE)
region = config[DOMAIN].get(CONF_REGION)
confidence = config[DOMAIN].get(CONF_CONFIDENCE)
api_key = config[DOMAIN].get(CONF_API_KEY)
binary = config[DOMAIN].get(CONF_ALPR_BINARY)
use_render_fffmpeg = False
component = EntityComponent(_LOGGER, DOMAIN, hass)
openalpr_device = []
for device in config[DOMAIN].get(CONF_ENTITIES):
input_source = device.get(CONF_INPUT)
render = device.get(CONF_RENDER)
##
# create api
if engine == ENGINE_LOCAL:
alpr_api = OpenalprApiLocal(
confidence=confidence,
region=region,
binary=binary,
)
else:
alpr_api = OpenalprApiCloud(
confidence=confidence,
region=region,
api_key=api_key,
)
##
# Create Alpr device / render engine
if render == RENDER_FFMPEG:
use_render_fffmpeg = True
if not run_test(input_source):
_LOGGER.error("'%s' is not valid ffmpeg input", input_source)
continue
alpr_dev = OpenalprDeviceFFmpeg(
name=device.get(CONF_NAME),
interval=device.get(CONF_INTERVAL),
api=alpr_api,
input_source=input_source,
extra_arguments=device.get(CONF_EXTRA_ARGUMENTS),
)
else:
alpr_dev = OpenalprDeviceImage(
name=device.get(CONF_NAME),
interval=device.get(CONF_INTERVAL),
api=alpr_api,
input_source=input_source,
username=device.get(CONF_USERNAME),
password=device.get(CONF_PASSWORD),
)
# register shutdown event
openalpr_device.append(alpr_dev)
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, alpr_dev.shutdown)
component.add_entities(openalpr_device)
descriptions = load_yaml_config_file(
os.path.join(os.path.dirname(__file__), 'services.yaml'))
def _handle_service_scan(service):
"""Handle service for immediately scan."""
device_list = component.extract_from_service(service)
for device in device_list:
device.scan()
hass.services.register(DOMAIN, SERVICE_SCAN,
_handle_service_scan,
descriptions[DOMAIN][SERVICE_SCAN],
schema=SERVICE_SCAN_SCHEMA)
# Add restart service only if a device use ffmpeg as render
if not use_render_fffmpeg:
return True
def _handle_service_restart(service):
"""Handle service for restart ffmpeg process."""
device_list = component.extract_from_service(service)
for device in device_list:
device.restart()
hass.services.register(DOMAIN, SERVICE_RESTART,
_handle_service_restart,
descriptions[DOMAIN][SERVICE_RESTART],
schema=SERVICE_RESTART_SCHEMA)
return True
class OpenalprDevice(Entity):
"""Represent a openalpr device object for processing stream/images."""
def __init__(self, name, interval, api):
"""Init image processing."""
self._name = name
self._interval = interval
self._api = api
self._last = {}
@property
def state(self):
"""Return the state of the entity."""
confidence = 0
plate = STATE_UNKNOWN
# search high plate
for i_pl, i_co in self._last.items():
if i_co > confidence:
confidence = i_co
plate = i_pl
return plate
def shutdown(self, event):
"""Close stream."""
if hasattr(self._api, "shutdown"):
self._api.shutdown(event)
def restart(self):
"""Restart stream."""
raise NotImplementedError()
def _process_image(self, image):
"""Callback for processing image."""
self._api.process_image(image, self._process_event)
def _process_event(self, plates):
"""Send event with new plates."""
state_change = False
plates_set = set(plates)
last_set = set(self._last)
new_plates = plates_set - last_set
# send events
for i_plate in new_plates:
self.hass.bus.fire(EVENT_FOUND, {
ATTR_PLATE: i_plate,
ATTR_ENTITY_ID: self.entity_id
})
# update entity store
if last_set <= plates_set:
state_change = True
self._last = plates
# update HA state
if state_change:
self.update_ha_state()
@property
def device_state_attributes(self):
"""Return device specific state attributes."""
return {'plates': self._last}
def scan(self):
"""Immediately scan a image."""
raise NotImplementedError()
@property
def name(self):
"""Return the name of the entity."""
return self._name
class OpenalprDeviceFFmpeg(OpenalprDevice):
"""Represent a openalpr device object for processing stream/images."""
# pylint: disable=too-many-arguments
def __init__(self, name, interval, api, input_source,
extra_arguments=None):
"""Init image processing."""
from haffmpeg import ImageStream, ImageSingle
super().__init__(name, interval, api)
self._input_source = input_source
self._extra_arguments = extra_arguments
if self._interval > 0:
self._ffmpeg = ImageStream(get_binary(), self._process_image)
else:
self._ffmpeg = ImageSingle(get_binary())
self._start_ffmpeg()
def shutdown(self, event):
"""Close ffmpeg stream."""
if self._interval > 0:
self._ffmpeg.close()
def restart(self):
"""Restart ffmpeg stream."""
if self._interval > 0:
self._ffmpeg.close()
self._start_ffmpeg()
def scan(self):
"""Immediately scan a image."""
from haffmpeg import IMAGE_PNG
# process single image
if self._interval == 0:
image = self._ffmpeg.get_image(
self._input_source,
output_format=IMAGE_PNG,
extra_cmd=self._extra_arguments
)
return self._process_image(image)
# stream
self._ffmpeg.push_image()
def _start_ffmpeg(self):
"""Start a ffmpeg image stream."""
from haffmpeg import IMAGE_PNG
if self._interval == 0:
return
self._ffmpeg.open_stream(
input_source=self._input_source,
interval=self._interval,
output_format=IMAGE_PNG,
extra_cmd=self._extra_arguments,
)
@property
def should_poll(self):
"""Return True if render is be 'image' or False if 'ffmpeg'."""
return False
@property
def available(self):
"""Return True if entity is available."""
return self._interval == 0 or self._ffmpeg.is_running
class OpenalprDeviceImage(OpenalprDevice):
"""Represent a openalpr device object for processing stream/images."""
# pylint: disable=too-many-arguments
def __init__(self, name, interval, api, input_source,
username=None, password=None):
"""Init image processing."""
super().__init__(name, interval, api)
self._next = time()
self._username = username
self._password = password
self._url = input_source
def restart(self):
"""Fake restart with scan a picture."""
self.scan()
def scan(self):
"""Immediately scan a image."""
# send request
if self._username is not None and self._password is not None:
req = requests.get(
self._url, auth=(self._username, self._password), timeout=15)
else:
req = requests.get(self._url, timeout=15)
# process image
image = req.content
self._process_image(image)
@property
def should_poll(self):
"""Return True if render is be 'image' or False if 'ffmpeg'."""
return self._interval > 0
def update(self):
"""Retrieve latest state."""
if self._next > time():
return
self.scan()
self._next = time() + self._interval
# pylint: disable=too-few-public-methods
class OpenalprApi(object):
"""OpenAlpr api class."""
def __init__(self, region, confidence):
"""Init basic api processing."""
self._region = region
self._confidence = confidence
def process_image(self, image, event_callback):
"""Callback for processing image."""
raise NotImplementedError()
# pylint: disable=too-few-public-methods
class OpenalprApiCloud(OpenalprApi):
"""Use the cloud openalpr api to parse licences plate."""
def __init__(self, region, confidence, api_key):
"""Init cloud api processing."""
import openalpr_api
super().__init__(region=region, confidence=confidence)
self._api = openalpr_api.DefaultApi()
self._api_key = api_key
def process_image(self, image, event_callback):
"""Callback for processing image."""
result = self._api.recognize_post(
self._api_key,
'plate',
image="",
image_bytes=str(b64encode(image), 'utf-8'),
country=self._region
)
# process result
f_plates = {}
# pylint: disable=no-member
for object_plate in result.plate.results:
plate = object_plate.plate
confidence = object_plate.confidence
if confidence >= self._confidence:
f_plates[plate] = confidence
event_callback(f_plates)
class OpenalprApiLocal(OpenalprApi):
"""Use local openalpr library to parse licences plate."""
def __init__(self, region, confidence, binary):
"""Init local api processing."""
# pylint: disable=import-error
from haalpr import HAAlpr
super().__init__(region=region, confidence=confidence)
self._api = HAAlpr(binary=binary, country=region)
def process_image(self, image, event_callback):
"""Callback for processing image."""
result = self._api.recognize_byte(image)
# process result
f_plates = {}
for found in result:
for plate, confidence in found.items():
if confidence >= self._confidence:
f_plates[plate] = confidence
event_callback(f_plates)