Add numato integration (#33816)

* Add support for Numato 32 port USB GPIO boards

Included are a binary_sensor, sensor and switch component
implementations. The binary_sensor interface pushes updates via
registered callback functions, so no need to poll here.

Unit tests are included to test against a Numato device mockup.

* Refactor numato configuration due to PR finding

* Resolve minor review findings

* Bump numato-gpio requirement

* Load numato platforms during domain setup

According to review finding

* Guard from platform setup without discovery_info

According to review finding

* Move numato API state into hass.data

According to review finding.

* Avoid side effects in numato entity constructors

According to review finding

* Keep only first line of numato module docstrings

Removed reference to the documentation. Requested by reviewer.

* Minor improvements inspired by review findings

* Fix async tests

Pytest fixture was returning from the yield too early executing teardown
code during test execution.

* Improve test coverage

* Configure GPIO ports early

Review finding

* Move read_gpio callback to outside the loop

Also continue on failed switch setup, resolve other minor review
findings and correct some error messages

* Bump numato-gpio requirement

This fixes a crash during cleanup. When any device had a communication
problem, its cleanup would raise an exception which was not handled,
fell through to the caller and prevented the remaining devices from
being cleaned up.

* Call services directly

Define local helper functions for better readability.
Resolves a review finding.

* Assert something in every test

So not only coverage is satisfied but things are actually tested
to be in the expected state.
Resolves a review finding.

* Clarify scope of notification tests

Make unit test for hass NumatoAPI independent of Home Assistant (very basic test of notifications).
Improve the regular operations test for notifications.

* Test for hass.states after operating switches

Resolves a review finding.

* Check for wrong port directions

* WIP: Split numato tests to multiple files

test_hass_binary_sensor_notification still fails.

* Remove pytest asyncio decorator

Apears to be redundant. Resolves a review finding.

* Call switch services directly.

Resolves a review finding.

* Remove obsolete inline pylint config

Co-Authored-By: Martin Hjelmare <marhje52@gmail.com>

* Improve the numato_gpio module mockup

Resolves a review finding.

* Remove needless explicit conversions to str

Resolves review findings.

* Test setup of binary_sensor callbacks

* Fix test_hass_binary_sensor_notification

* Add forgotten await

Review finding.

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
pull/34962/head
clssn 2020-04-30 14:23:30 +02:00 committed by GitHub
parent b4083dc14f
commit 15b1a9ecea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1135 additions and 0 deletions

View File

@ -267,6 +267,7 @@ homeassistant/components/nsw_fuel_station/* @nickw444
homeassistant/components/nsw_rural_fire_service_feed/* @exxamalte
homeassistant/components/nuheat/* @bdraco
homeassistant/components/nuki/* @pvizeli
homeassistant/components/numato/* @clssn
homeassistant/components/nut/* @bdraco
homeassistant/components/nws/* @MatthewFlamm
homeassistant/components/nzbget/* @chriscla

View File

@ -0,0 +1,248 @@
"""Support for controlling GPIO pins of a Numato Labs USB GPIO expander."""
import logging
import numato_gpio as gpio
import voluptuous as vol
from homeassistant.const import (
CONF_BINARY_SENSORS,
CONF_ID,
CONF_NAME,
CONF_SENSORS,
CONF_SWITCHES,
EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STOP,
)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.discovery import load_platform
_LOGGER = logging.getLogger(__name__)
DOMAIN = "numato"
CONF_INVERT_LOGIC = "invert_logic"
CONF_DISCOVER = "discover"
CONF_DEVICES = "devices"
CONF_DEVICE_ID = "id"
CONF_PORTS = "ports"
CONF_SRC_RANGE = "source_range"
CONF_DST_RANGE = "destination_range"
CONF_DST_UNIT = "unit"
DEFAULT_INVERT_LOGIC = False
DEFAULT_SRC_RANGE = [0, 1024]
DEFAULT_DST_RANGE = [0.0, 100.0]
DEFAULT_UNIT = "%"
DEFAULT_DEV = [f"/dev/ttyACM{i}" for i in range(10)]
PORT_RANGE = range(1, 8) # ports 0-7 are ADC capable
DATA_PORTS_IN_USE = "ports_in_use"
DATA_API = "api"
def int_range(rng):
"""Validate the input array to describe a range by two integers."""
if not (isinstance(rng[0], int) and isinstance(rng[1], int)):
raise vol.Invalid(f"Only integers are allowed: {rng}")
if len(rng) != 2:
raise vol.Invalid(f"Only two numbers allowed in a range: {rng}")
if rng[0] > rng[1]:
raise vol.Invalid(f"Lower range bound must come first: {rng}")
return rng
def float_range(rng):
"""Validate the input array to describe a range by two floats."""
try:
coe = vol.Coerce(float)
coe(rng[0])
coe(rng[1])
except vol.CoerceInvalid:
raise vol.Invalid(f"Only int or float values are allowed: {rng}")
if len(rng) != 2:
raise vol.Invalid(f"Only two numbers allowed in a range: {rng}")
if rng[0] > rng[1]:
raise vol.Invalid(f"Lower range bound must come first: {rng}")
return rng
def adc_port_number(num):
"""Validate input number to be in the range of ADC enabled ports."""
try:
num = int(num)
except (ValueError):
raise vol.Invalid(f"Port numbers must be integers: {num}")
if num not in range(1, 8):
raise vol.Invalid(f"Only port numbers from 1 to 7 are ADC capable: {num}")
return num
ADC_SCHEMA = vol.Schema(
{
vol.Required(CONF_NAME): cv.string,
vol.Optional(CONF_SRC_RANGE, default=DEFAULT_SRC_RANGE): int_range,
vol.Optional(CONF_DST_RANGE, default=DEFAULT_DST_RANGE): float_range,
vol.Optional(CONF_DST_UNIT, default=DEFAULT_UNIT): cv.string,
}
)
PORTS_SCHEMA = vol.Schema({cv.positive_int: cv.string})
IO_PORTS_SCHEMA = vol.Schema(
{
vol.Required(CONF_PORTS): PORTS_SCHEMA,
vol.Optional(CONF_INVERT_LOGIC, default=DEFAULT_INVERT_LOGIC): cv.boolean,
}
)
DEVICE_SCHEMA = vol.Schema(
{
vol.Required(CONF_ID): cv.positive_int,
CONF_BINARY_SENSORS: IO_PORTS_SCHEMA,
CONF_SWITCHES: IO_PORTS_SCHEMA,
CONF_SENSORS: {CONF_PORTS: {adc_port_number: ADC_SCHEMA}},
}
)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: {
CONF_DEVICES: vol.All(cv.ensure_list, [DEVICE_SCHEMA]),
vol.Optional(CONF_DISCOVER, default=DEFAULT_DEV): vol.All(
cv.ensure_list, [cv.string]
),
},
},
extra=vol.ALLOW_EXTRA,
)
def setup(hass, config):
"""Initialize the numato integration.
Discovers available Numato devices and loads the binary_sensor, sensor and
switch platforms.
Returns False on error during device discovery (e.g. duplicate ID),
otherwise returns True.
No exceptions should occur, since the platforms are initialized on a best
effort basis, which means, errors are handled locally.
"""
hass.data[DOMAIN] = config[DOMAIN]
try:
gpio.discover(config[DOMAIN][CONF_DISCOVER])
except gpio.NumatoGpioError as err:
_LOGGER.info("Error discovering Numato devices: %s", err)
gpio.cleanup()
return False
_LOGGER.info(
"Initializing Numato 32 port USB GPIO expanders with IDs: %s",
", ".join(str(d) for d in gpio.devices),
)
hass.data[DOMAIN][DATA_API] = NumatoAPI()
def cleanup_gpio(event):
"""Stuff to do before stopping."""
_LOGGER.debug("Clean up Numato GPIO")
gpio.cleanup()
if DATA_API in hass.data[DOMAIN]:
hass.data[DOMAIN][DATA_API].ports_registered.clear()
def prepare_gpio(event):
"""Stuff to do when home assistant starts."""
_LOGGER.debug("Setup cleanup at stop for Numato GPIO")
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, cleanup_gpio)
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, prepare_gpio)
load_platform(hass, "binary_sensor", DOMAIN, {}, config)
load_platform(hass, "sensor", DOMAIN, {}, config)
load_platform(hass, "switch", DOMAIN, {}, config)
return True
# pylint: disable=no-self-use
class NumatoAPI:
"""Home-Assistant specific API for numato device access."""
def __init__(self):
"""Initialize API state."""
self.ports_registered = dict()
def check_port_free(self, device_id, port, direction):
"""Check whether a port is still free set up.
Fail with exception if it has already been registered.
"""
if (device_id, port) not in self.ports_registered:
self.ports_registered[(device_id, port)] = direction
else:
raise gpio.NumatoGpioError(
"Device {} port {} already in use as {}.".format(
device_id,
port,
"input"
if self.ports_registered[(device_id, port)] == gpio.IN
else "output",
)
)
def check_device_id(self, device_id):
"""Check whether a device has been discovered.
Fail with exception.
"""
if device_id not in gpio.devices:
raise gpio.NumatoGpioError(f"Device {device_id} not available.")
def check_port(self, device_id, port, direction):
"""Raise an error if the port setup doesn't match the direction."""
self.check_device_id(device_id)
if (device_id, port) not in self.ports_registered:
raise gpio.NumatoGpioError(
f"Port {port} is not set up for numato device {device_id}."
)
msg = {
gpio.OUT: f"Trying to write to device {device_id} port {port} set up as input.",
gpio.IN: f"Trying to read from device {device_id} port {port} set up as output.",
}
if self.ports_registered[(device_id, port)] != direction:
raise gpio.NumatoGpioError(msg[direction])
def setup_output(self, device_id, port):
"""Set up a GPIO as output."""
self.check_device_id(device_id)
self.check_port_free(device_id, port, gpio.OUT)
gpio.devices[device_id].setup(port, gpio.OUT)
def setup_input(self, device_id, port):
"""Set up a GPIO as input."""
self.check_device_id(device_id)
gpio.devices[device_id].setup(port, gpio.IN)
self.check_port_free(device_id, port, gpio.IN)
def write_output(self, device_id, port, value):
"""Write a value to a GPIO."""
self.check_port(device_id, port, gpio.OUT)
gpio.devices[device_id].write(port, value)
def read_input(self, device_id, port):
"""Read a value from a GPIO."""
self.check_port(device_id, port, gpio.IN)
return gpio.devices[device_id].read(port)
def read_adc_input(self, device_id, port):
"""Read an ADC value from a GPIO ADC port."""
self.check_port(device_id, port, gpio.IN)
self.check_device_id(device_id)
return gpio.devices[device_id].adc_read(port)
def edge_detect(self, device_id, port, event_callback):
"""Add detection for RISING and FALLING events."""
self.check_port(device_id, port, gpio.IN)
gpio.devices[device_id].add_event_detect(port, event_callback, gpio.BOTH)
gpio.devices[device_id].notify = True

View File

@ -0,0 +1,120 @@
"""Binary sensor platform integration for Numato USB GPIO expanders."""
from functools import partial
import logging
from numato_gpio import NumatoGpioError
from homeassistant.components.binary_sensor import BinarySensorDevice
from homeassistant.const import DEVICE_DEFAULT_NAME
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect, dispatcher_send
from . import (
CONF_BINARY_SENSORS,
CONF_DEVICES,
CONF_ID,
CONF_INVERT_LOGIC,
CONF_PORTS,
DATA_API,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
NUMATO_SIGNAL = "numato_signal_{}_{}"
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the configured Numato USB GPIO binary sensor ports."""
if discovery_info is None:
return
def read_gpio(device_id, port, level):
"""Send signal to entity to have it update state."""
dispatcher_send(hass, NUMATO_SIGNAL.format(device_id, port), level)
api = hass.data[DOMAIN][DATA_API]
binary_sensors = []
devices = hass.data[DOMAIN][CONF_DEVICES]
for device in [d for d in devices if CONF_BINARY_SENSORS in d]:
device_id = device[CONF_ID]
platform = device[CONF_BINARY_SENSORS]
invert_logic = platform[CONF_INVERT_LOGIC]
ports = platform[CONF_PORTS]
for port, port_name in ports.items():
try:
api.setup_input(device_id, port)
api.edge_detect(device_id, port, partial(read_gpio, device_id))
except NumatoGpioError as err:
_LOGGER.error(
"Failed to initialize binary sensor '%s' on Numato device %s port %s: %s",
port_name,
device_id,
port,
err,
)
continue
binary_sensors.append(
NumatoGpioBinarySensor(port_name, device_id, port, invert_logic, api,)
)
add_entities(binary_sensors, True)
class NumatoGpioBinarySensor(BinarySensorDevice):
"""Represents a binary sensor (input) port of a Numato GPIO expander."""
def __init__(self, name, device_id, port, invert_logic, api):
"""Initialize the Numato GPIO based binary sensor object."""
self._name = name or DEVICE_DEFAULT_NAME
self._device_id = device_id
self._port = port
self._invert_logic = invert_logic
self._state = None
self._api = api
async def async_added_to_hass(self):
"""Connect state update callback."""
self.async_on_remove(
async_dispatcher_connect(
self.hass,
NUMATO_SIGNAL.format(self._device_id, self._port),
self._async_update_state,
)
)
@callback
def _async_update_state(self, level):
"""Update entity state."""
self._state = level
self.async_write_ha_state()
@property
def should_poll(self):
"""No polling needed."""
return False
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def is_on(self):
"""Return the state of the entity."""
return self._state != self._invert_logic
def update(self):
"""Update the GPIO state."""
try:
self._state = self._api.read_input(self._device_id, self._port)
except NumatoGpioError as err:
self._state = None
_LOGGER.error(
"Failed to update Numato device %s port %s: %s",
self._device_id,
self._port,
err,
)

View File

@ -0,0 +1,8 @@
{
"domain": "numato",
"name": "Numato USB GPIO Expander",
"documentation": "https://www.home-assistant.io/integrations/numato",
"requirements": ["numato-gpio==0.7.1"],
"codeowners": ["@clssn"],
"quality_scale": "internal"
}

View File

@ -0,0 +1,123 @@
"""Sensor platform integration for ADC ports of Numato USB GPIO expanders."""
import logging
from numato_gpio import NumatoGpioError
from homeassistant.const import CONF_ID, CONF_NAME, CONF_SENSORS
from homeassistant.helpers.entity import Entity
from . import (
CONF_DEVICES,
CONF_DST_RANGE,
CONF_DST_UNIT,
CONF_PORTS,
CONF_SRC_RANGE,
DATA_API,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
ICON = "mdi:gauge"
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the configured Numato USB GPIO ADC sensor ports."""
if discovery_info is None:
return
api = hass.data[DOMAIN][DATA_API]
sensors = []
devices = hass.data[DOMAIN][CONF_DEVICES]
for device in [d for d in devices if CONF_SENSORS in d]:
device_id = device[CONF_ID]
ports = device[CONF_SENSORS][CONF_PORTS]
for port, adc_def in ports.items():
try:
api.setup_input(device_id, port)
except NumatoGpioError as err:
_LOGGER.error(
"Failed to initialize sensor '%s' on Numato device %s port %s: %s",
adc_def[CONF_NAME],
device_id,
port,
err,
)
continue
sensors.append(
NumatoGpioAdc(
adc_def[CONF_NAME],
device_id,
port,
adc_def[CONF_SRC_RANGE],
adc_def[CONF_DST_RANGE],
adc_def[CONF_DST_UNIT],
api,
)
)
add_entities(sensors, True)
class NumatoGpioAdc(Entity):
"""Represents an ADC port of a Numato USB GPIO expander."""
def __init__(self, name, device_id, port, src_range, dst_range, dst_unit, api):
"""Initialize the sensor."""
self._name = name
self._device_id = device_id
self._port = port
self._src_range = src_range
self._dst_range = dst_range
self._state = None
self._unit_of_measurement = dst_unit
self._api = api
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def state(self):
"""Return the state of the sensor."""
return self._state
@property
def unit_of_measurement(self):
"""Return the unit the value is expressed in."""
return self._unit_of_measurement
@property
def icon(self):
"""Return the icon to use in the frontend, if any."""
return ICON
def update(self):
"""Get the latest data and updates the state."""
try:
adc_val = self._api.read_adc_input(self._device_id, self._port)
adc_val = self._clamp_to_source_range(adc_val)
self._state = self._linear_scale_to_dest_range(adc_val)
except NumatoGpioError as err:
self._state = None
_LOGGER.error(
"Failed to update Numato device %s ADC-port %s: %s",
self._device_id,
self._port,
err,
)
def _clamp_to_source_range(self, val):
# clamp to source range
val = max(val, self._src_range[0])
val = min(val, self._src_range[1])
return val
def _linear_scale_to_dest_range(self, val):
# linear scale to dest range
src_len = self._src_range[1] - self._src_range[0]
adc_val_rel = val - self._src_range[0]
ratio = float(adc_val_rel) / float(src_len)
dst_len = self._dst_range[1] - self._dst_range[0]
dest_val = self._dst_range[0] + ratio * dst_len
return dest_val

View File

@ -0,0 +1,108 @@
"""Switch platform integration for Numato USB GPIO expanders."""
import logging
from numato_gpio import NumatoGpioError
from homeassistant.const import (
CONF_DEVICES,
CONF_ID,
CONF_SWITCHES,
DEVICE_DEFAULT_NAME,
)
from homeassistant.helpers.entity import ToggleEntity
from . import CONF_INVERT_LOGIC, CONF_PORTS, DATA_API, DOMAIN
_LOGGER = logging.getLogger(__name__)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the configured Numato USB GPIO switch ports."""
if discovery_info is None:
return
api = hass.data[DOMAIN][DATA_API]
switches = []
devices = hass.data[DOMAIN][CONF_DEVICES]
for device in [d for d in devices if CONF_SWITCHES in d]:
device_id = device[CONF_ID]
platform = device[CONF_SWITCHES]
invert_logic = platform[CONF_INVERT_LOGIC]
ports = platform[CONF_PORTS]
for port, port_name in ports.items():
try:
api.setup_output(device_id, port)
api.write_output(device_id, port, 1 if invert_logic else 0)
except NumatoGpioError as err:
_LOGGER.error(
"Failed to initialize switch '%s' on Numato device %s port %s: %s",
port_name,
device_id,
port,
err,
)
continue
switches.append(
NumatoGpioSwitch(port_name, device_id, port, invert_logic, api,)
)
add_entities(switches, True)
class NumatoGpioSwitch(ToggleEntity):
"""Representation of a Numato USB GPIO switch port."""
def __init__(self, name, device_id, port, invert_logic, api):
"""Initialize the port."""
self._name = name or DEVICE_DEFAULT_NAME
self._device_id = device_id
self._port = port
self._invert_logic = invert_logic
self._state = False
self._api = api
@property
def name(self):
"""Return the name of the switch."""
return self._name
@property
def should_poll(self):
"""No polling needed."""
return False
@property
def is_on(self):
"""Return true if port is turned on."""
return self._state
def turn_on(self, **kwargs):
"""Turn the port on."""
try:
self._api.write_output(
self._device_id, self._port, 0 if self._invert_logic else 1
)
self._state = True
self.schedule_update_ha_state()
except NumatoGpioError as err:
_LOGGER.error(
"Failed to turn on Numato device %s port %s: %s",
self._device_id,
self._port,
err,
)
def turn_off(self, **kwargs):
"""Turn the port off."""
try:
self._api.write_output(
self._device_id, self._port, 1 if self._invert_logic else 0
)
self._state = False
self.schedule_update_ha_state()
except NumatoGpioError as err:
_LOGGER.error(
"Failed to turn off Numato device %s port %s: %s",
self._device_id,
self._port,
err,
)

View File

@ -951,6 +951,9 @@ nsw-fuel-api-client==1.0.10
# homeassistant.components.nuheat
nuheat==0.3.0
# homeassistant.components.numato
numato-gpio==0.7.1
# homeassistant.components.iqvia
# homeassistant.components.opencv
# homeassistant.components.tensorflow

View File

@ -377,6 +377,9 @@ nsw-fuel-api-client==1.0.10
# homeassistant.components.nuheat
nuheat==0.3.0
# homeassistant.components.numato
numato-gpio==0.7.1
# homeassistant.components.iqvia
# homeassistant.components.opencv
# homeassistant.components.tensorflow

View File

@ -0,0 +1 @@
"""Tests for the numato integration."""

View File

@ -0,0 +1,49 @@
"""Definitions shared by all numato tests."""
from numato_gpio import NumatoGpioError
NUMATO_CFG = {
"numato": {
"discover": ["/ttyACM0", "/ttyACM1"],
"devices": [
{
"id": 0,
"binary_sensors": {
"invert_logic": False,
"ports": {
"2": "numato_binary_sensor_mock_port2",
"3": "numato_binary_sensor_mock_port3",
"4": "numato_binary_sensor_mock_port4",
},
},
"sensors": {
"ports": {
"1": {
"name": "numato_adc_mock_port1",
"source_range": [100, 1023],
"destination_range": [0, 10],
"unit": "mocks",
}
},
},
"switches": {
"invert_logic": False,
"ports": {
"5": "numato_switch_mock_port5",
"6": "numato_switch_mock_port6",
},
},
}
],
}
}
def mockup_raise(*args, **kwargs):
"""Mockup to replace regular functions for error injection."""
raise NumatoGpioError("Error mockup")
def mockup_return(*args, **kwargs):
"""Mockup to replace regular functions for error injection."""
return False

View File

@ -0,0 +1,28 @@
"""Fixtures for numato tests."""
from copy import deepcopy
import pytest
from homeassistant.components import numato
from . import numato_mock
from .common import NUMATO_CFG
@pytest.fixture
def config():
"""Provide a copy of the numato domain's test configuration.
This helps to quickly change certain aspects of the configuration scoped
to each individual test.
"""
return deepcopy(NUMATO_CFG)
@pytest.fixture
def numato_fixture(monkeypatch):
"""Inject the numato mockup into numato homeassistant module."""
module_mock = numato_mock.NumatoModuleMock()
monkeypatch.setattr(numato, "gpio", module_mock)
return module_mock

View File

@ -0,0 +1,68 @@
"""Mockup for the numato component interface."""
from numato_gpio import NumatoGpioError
class NumatoModuleMock:
"""Mockup for the numato_gpio module."""
NumatoGpioError = NumatoGpioError
def __init__(self):
"""Initialize the numato_gpio module mockup class."""
self.devices = {}
class NumatoDeviceMock:
"""Mockup for the numato_gpio.NumatoUsbGpio class."""
def __init__(self, device):
"""Initialize numato device mockup."""
self.device = device
self.callbacks = {}
self.ports = set()
self.values = {}
def setup(self, port, direction):
"""Mockup for setup."""
self.ports.add(port)
self.values[port] = None
def write(self, port, value):
"""Mockup for write."""
self.values[port] = value
def read(self, port):
"""Mockup for read."""
return 1
def adc_read(self, port):
"""Mockup for adc_read."""
return 1023
def add_event_detect(self, port, callback, direction):
"""Mockup for add_event_detect."""
self.callbacks[port] = callback
def notify(self, enable):
"""Mockup for notify."""
def mockup_inject_notification(self, port, value):
"""Make the mockup execute a notification callback."""
self.callbacks[port](port, value)
OUT = 0
IN = 1
RISING = 1
FALLING = 2
BOTH = 3
def discover(self, _=None):
"""Mockup for the numato device discovery.
Ignore the device list argument, mock discovers /dev/ttyACM0.
"""
self.devices[0] = NumatoModuleMock.NumatoDeviceMock("/dev/ttyACM0")
def cleanup(self):
"""Mockup for the numato device cleanup."""
self.devices.clear()

View File

@ -0,0 +1,62 @@
"""Tests for the numato binary_sensor platform."""
from homeassistant.helpers import discovery
from homeassistant.setup import async_setup_component
from .common import NUMATO_CFG, mockup_raise
MOCKUP_ENTITY_IDS = {
"binary_sensor.numato_binary_sensor_mock_port2",
"binary_sensor.numato_binary_sensor_mock_port3",
"binary_sensor.numato_binary_sensor_mock_port4",
}
async def test_failing_setups_no_entities(hass, numato_fixture, monkeypatch):
"""When port setup fails, no entity shall be created."""
monkeypatch.setattr(numato_fixture.NumatoDeviceMock, "setup", mockup_raise)
assert await async_setup_component(hass, "numato", NUMATO_CFG)
await hass.async_block_till_done()
for entity_id in MOCKUP_ENTITY_IDS:
assert entity_id not in hass.states.async_entity_ids()
async def test_setup_callbacks(hass, numato_fixture, monkeypatch):
"""During setup a callback shall be registered."""
numato_fixture.discover()
def mock_add_event_detect(self, port, callback, direction):
assert self == numato_fixture.devices[0]
assert port == 1
assert callback is callable
assert direction == numato_fixture.BOTH
monkeypatch.setattr(
numato_fixture.devices[0], "add_event_detect", mock_add_event_detect
)
assert await async_setup_component(hass, "numato", NUMATO_CFG)
async def test_hass_binary_sensor_notification(hass, numato_fixture):
"""Test regular operations from within Home Assistant."""
assert await async_setup_component(hass, "numato", NUMATO_CFG)
await hass.async_block_till_done() # wait until services are registered
assert (
hass.states.get("binary_sensor.numato_binary_sensor_mock_port2").state == "on"
)
await hass.async_add_executor_job(numato_fixture.devices[0].callbacks[2], 2, False)
await hass.async_block_till_done()
assert (
hass.states.get("binary_sensor.numato_binary_sensor_mock_port2").state == "off"
)
async def test_binary_sensor_setup_without_discovery_info(hass, config, numato_fixture):
"""Test handling of empty discovery_info."""
numato_fixture.discover()
await discovery.async_load_platform(hass, "binary_sensor", "numato", None, config)
for entity_id in MOCKUP_ENTITY_IDS:
assert entity_id not in hass.states.async_entity_ids()
await hass.async_block_till_done() # wait for numato platform to be loaded
for entity_id in MOCKUP_ENTITY_IDS:
assert entity_id in hass.states.async_entity_ids()

View File

@ -0,0 +1,161 @@
"""Tests for the numato integration."""
from numato_gpio import NumatoGpioError
import pytest
from homeassistant.components import numato
from homeassistant.setup import async_setup_component
from .common import NUMATO_CFG, mockup_raise, mockup_return
async def test_setup_no_devices(hass, numato_fixture, monkeypatch):
"""Test handling of an 'empty' discovery.
Platform setups are expected to return after handling errors locally
without raising.
"""
monkeypatch.setattr(numato_fixture, "discover", mockup_return)
assert await async_setup_component(hass, "numato", NUMATO_CFG)
assert len(numato_fixture.devices) == 0
async def test_fail_setup_raising_discovery(hass, numato_fixture, caplog, monkeypatch):
"""Test handling of an exception during discovery.
Setup shall return False.
"""
monkeypatch.setattr(numato_fixture, "discover", mockup_raise)
assert not await async_setup_component(hass, "numato", NUMATO_CFG)
await hass.async_block_till_done()
async def test_hass_numato_api_wrong_port_directions(hass, numato_fixture):
"""Test handling of wrong port directions.
This won't happen in the current platform implementation but would raise
in case of an introduced bug in the platforms.
"""
numato_fixture.discover()
api = numato.NumatoAPI()
api.setup_output(0, 5)
api.setup_input(0, 2)
api.setup_input(0, 6)
with pytest.raises(NumatoGpioError):
api.read_adc_input(0, 5) # adc_read from output
api.read_input(0, 6) # read from output
api.write_output(0, 2, 1) # write to input
async def test_hass_numato_api_errors(hass, numato_fixture, monkeypatch):
"""Test whether Home Assistant numato API (re-)raises errors."""
numato_fixture.discover()
monkeypatch.setattr(numato_fixture.devices[0], "setup", mockup_raise)
monkeypatch.setattr(numato_fixture.devices[0], "adc_read", mockup_raise)
monkeypatch.setattr(numato_fixture.devices[0], "read", mockup_raise)
monkeypatch.setattr(numato_fixture.devices[0], "write", mockup_raise)
api = numato.NumatoAPI()
with pytest.raises(NumatoGpioError):
api.setup_input(0, 5)
api.read_adc_input(0, 1)
api.read_input(0, 2)
api.write_output(0, 2, 1)
async def test_invalid_port_number(hass, numato_fixture, config):
"""Test validation of ADC port number type."""
sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"]
port1_config = sensorports_cfg["1"]
sensorports_cfg["one"] = port1_config
del sensorports_cfg["1"]
assert not await async_setup_component(hass, "numato", config)
await hass.async_block_till_done()
assert not numato_fixture.devices
async def test_too_low_adc_port_number(hass, numato_fixture, config):
"""Test handling of failing component setup.
Tries setting up an ADC on a port below (0) the allowed range.
"""
sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"]
sensorports_cfg.update({0: {"name": "toolow"}})
assert not await async_setup_component(hass, "numato", config)
assert not numato_fixture.devices
async def test_too_high_adc_port_number(hass, numato_fixture, config):
"""Test handling of failing component setup.
Tries setting up an ADC on a port above (8) the allowed range.
"""
sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"]
sensorports_cfg.update({8: {"name": "toohigh"}})
assert not await async_setup_component(hass, "numato", config)
assert not numato_fixture.devices
async def test_invalid_adc_range_value_type(hass, numato_fixture, config):
"""Test validation of ADC range config's types.
Replaces the source range beginning by a string.
"""
sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"]
sensorports_cfg["1"]["source_range"][0] = "zero"
assert not await async_setup_component(hass, "numato", config)
assert not numato_fixture.devices
async def test_invalid_adc_source_range_length(hass, numato_fixture, config):
"""Test validation of ADC range config's length.
Adds an element to the source range.
"""
sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"]
sensorports_cfg["1"]["source_range"].append(42)
assert not await async_setup_component(hass, "numato", config)
assert not numato_fixture.devices
async def test_invalid_adc_source_range_order(hass, numato_fixture, config):
"""Test validation of ADC range config's order.
Sets the source range to a decreasing [2, 1].
"""
sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"]
sensorports_cfg["1"]["source_range"] = [2, 1]
assert not await async_setup_component(hass, "numato", config)
assert not numato_fixture.devices
async def test_invalid_adc_destination_range_value_type(hass, numato_fixture, config):
"""Test validation of ADC range .
Replaces the destination range beginning by a string.
"""
sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"]
sensorports_cfg["1"]["destination_range"][0] = "zero"
assert not await async_setup_component(hass, "numato", config)
assert not numato_fixture.devices
async def test_invalid_adc_destination_range_length(hass, numato_fixture, config):
"""Test validation of ADC range config's length.
Adds an element to the destination range.
"""
sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"]
sensorports_cfg["1"]["destination_range"].append(42)
assert not await async_setup_component(hass, "numato", config)
assert not numato_fixture.devices
async def test_invalid_adc_destination_range_order(hass, numato_fixture, config):
"""Test validation of ADC range config's order.
Sets the destination range to a decreasing [2, 1].
"""
sensorports_cfg = config["numato"]["devices"][0]["sensors"]["ports"]
sensorports_cfg["1"]["destination_range"] = [2, 1]
assert not await async_setup_component(hass, "numato", config)
assert not numato_fixture.devices

View File

@ -0,0 +1,38 @@
"""Tests for the numato sensor platform."""
from homeassistant.const import STATE_UNKNOWN
from homeassistant.helpers import discovery
from homeassistant.setup import async_setup_component
from .common import NUMATO_CFG, mockup_raise
MOCKUP_ENTITY_IDS = {
"sensor.numato_adc_mock_port1",
}
async def test_failing_setups_no_entities(hass, numato_fixture, monkeypatch):
"""When port setup fails, no entity shall be created."""
monkeypatch.setattr(numato_fixture.NumatoDeviceMock, "setup", mockup_raise)
assert await async_setup_component(hass, "numato", NUMATO_CFG)
await hass.async_block_till_done()
for entity_id in MOCKUP_ENTITY_IDS:
assert entity_id not in hass.states.async_entity_ids()
async def test_failing_sensor_update(hass, numato_fixture, monkeypatch):
"""Test condition when a sensor update fails."""
monkeypatch.setattr(numato_fixture.NumatoDeviceMock, "adc_read", mockup_raise)
assert await async_setup_component(hass, "numato", NUMATO_CFG)
await hass.async_block_till_done()
assert hass.states.get("sensor.numato_adc_mock_port1").state is STATE_UNKNOWN
async def test_sensor_setup_without_discovery_info(hass, config, numato_fixture):
"""Test handling of empty discovery_info."""
numato_fixture.discover()
await discovery.async_load_platform(hass, "sensor", "numato", None, config)
for entity_id in MOCKUP_ENTITY_IDS:
assert entity_id not in hass.states.async_entity_ids()
await hass.async_block_till_done() # wait for numato platform to be loaded
for entity_id in MOCKUP_ENTITY_IDS:
assert entity_id in hass.states.async_entity_ids()

View File

@ -0,0 +1,114 @@
"""Tests for the numato switch platform."""
from homeassistant.components import switch
from homeassistant.const import ATTR_ENTITY_ID, SERVICE_TURN_OFF, SERVICE_TURN_ON
from homeassistant.helpers import discovery
from homeassistant.setup import async_setup_component
from .common import NUMATO_CFG, mockup_raise
MOCKUP_ENTITY_IDS = {
"switch.numato_switch_mock_port5",
"switch.numato_switch_mock_port6",
}
async def test_failing_setups_no_entities(hass, numato_fixture, monkeypatch):
"""When port setup fails, no entity shall be created."""
monkeypatch.setattr(numato_fixture.NumatoDeviceMock, "setup", mockup_raise)
assert await async_setup_component(hass, "numato", NUMATO_CFG)
await hass.async_block_till_done()
for entity_id in MOCKUP_ENTITY_IDS:
assert entity_id not in hass.states.async_entity_ids()
async def test_regular_hass_operations(hass, numato_fixture):
"""Test regular operations from within Home Assistant."""
assert await async_setup_component(hass, "numato", NUMATO_CFG)
await hass.async_block_till_done() # wait until services are registered
await hass.services.async_call(
switch.DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: "switch.numato_switch_mock_port5"},
blocking=True,
)
assert hass.states.get("switch.numato_switch_mock_port5").state == "on"
assert numato_fixture.devices[0].values[5] == 1
await hass.services.async_call(
switch.DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: "switch.numato_switch_mock_port6"},
blocking=True,
)
assert hass.states.get("switch.numato_switch_mock_port6").state == "on"
assert numato_fixture.devices[0].values[6] == 1
await hass.services.async_call(
switch.DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: "switch.numato_switch_mock_port5"},
blocking=True,
)
assert hass.states.get("switch.numato_switch_mock_port5").state == "off"
assert numato_fixture.devices[0].values[5] == 0
await hass.services.async_call(
switch.DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: "switch.numato_switch_mock_port6"},
blocking=True,
)
assert hass.states.get("switch.numato_switch_mock_port6").state == "off"
assert numato_fixture.devices[0].values[6] == 0
async def test_failing_hass_operations(hass, numato_fixture, monkeypatch):
"""Test failing operations called from within Home Assistant.
Switches remain in their initial 'off' state when the device can't
be written to.
"""
assert await async_setup_component(hass, "numato", NUMATO_CFG)
await hass.async_block_till_done() # wait until services are registered
monkeypatch.setattr(numato_fixture.devices[0], "write", mockup_raise)
await hass.services.async_call(
switch.DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: "switch.numato_switch_mock_port5"},
blocking=True,
)
assert hass.states.get("switch.numato_switch_mock_port5").state == "off"
assert not numato_fixture.devices[0].values[5]
await hass.services.async_call(
switch.DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: "switch.numato_switch_mock_port6"},
blocking=True,
)
assert hass.states.get("switch.numato_switch_mock_port6").state == "off"
assert not numato_fixture.devices[0].values[6]
await hass.services.async_call(
switch.DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: "switch.numato_switch_mock_port5"},
blocking=True,
)
assert hass.states.get("switch.numato_switch_mock_port5").state == "off"
assert not numato_fixture.devices[0].values[5]
await hass.services.async_call(
switch.DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: "switch.numato_switch_mock_port6"},
blocking=True,
)
assert hass.states.get("switch.numato_switch_mock_port6").state == "off"
assert not numato_fixture.devices[0].values[6]
async def test_switch_setup_without_discovery_info(hass, config, numato_fixture):
"""Test handling of empty discovery_info."""
numato_fixture.discover()
await discovery.async_load_platform(hass, "switch", "numato", None, config)
for entity_id in MOCKUP_ENTITY_IDS:
assert entity_id not in hass.states.async_entity_ids()
await hass.async_block_till_done() # wait for numato platform to be loaded
for entity_id in MOCKUP_ENTITY_IDS:
assert entity_id in hass.states.async_entity_ids()