Add Firmata Integration (attempt 2) (#35591)

pull/37928/head
Perry Naseck 2020-07-16 20:58:45 -04:00 committed by GitHub
parent b6befa2e83
commit 93919dea88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 898 additions and 0 deletions

View File

@ -254,6 +254,13 @@ omit =
homeassistant/components/fibaro/*
homeassistant/components/filesize/sensor.py
homeassistant/components/fints/sensor.py
homeassistant/components/firmata/__init__.py
homeassistant/components/firmata/binary_sensor.py
homeassistant/components/firmata/board.py
homeassistant/components/firmata/const.py
homeassistant/components/firmata/entity.py
homeassistant/components/firmata/pin.py
homeassistant/components/firmata/switch.py
homeassistant/components/fitbit/sensor.py
homeassistant/components/fixer/sensor.py
homeassistant/components/fleetgo/device_tracker.py

View File

@ -128,6 +128,7 @@ homeassistant/components/ezviz/* @baqs
homeassistant/components/fastdotcom/* @rohankapoorcom
homeassistant/components/file/* @fabaff
homeassistant/components/filter/* @dgomes
homeassistant/components/firmata/* @DaAwesomeP
homeassistant/components/fixer/* @fabaff
homeassistant/components/flick_electric/* @ZephireNZ
homeassistant/components/flock/* @fabaff

View File

@ -0,0 +1,191 @@
"""Support for Arduino-compatible Microcontrollers through Firmata."""
import asyncio
from copy import copy
import logging
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_NAME, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv, device_registry as dr
from .board import FirmataBoard
from .const import (
CONF_ARDUINO_INSTANCE_ID,
CONF_ARDUINO_WAIT,
CONF_BINARY_SENSORS,
CONF_INITIAL_STATE,
CONF_NEGATE_STATE,
CONF_PIN,
CONF_PIN_MODE,
CONF_SAMPLING_INTERVAL,
CONF_SERIAL_BAUD_RATE,
CONF_SERIAL_PORT,
CONF_SLEEP_TUNE,
CONF_SWITCHES,
DOMAIN,
FIRMATA_MANUFACTURER,
PIN_MODE_INPUT,
PIN_MODE_OUTPUT,
PIN_MODE_PULLUP,
)
_LOGGER = logging.getLogger(__name__)
DATA_CONFIGS = "board_configs"
ANALOG_PIN_SCHEMA = vol.All(cv.string, vol.Match(r"^A[0-9]+$"))
SWITCH_SCHEMA = vol.Schema(
{
vol.Required(CONF_NAME): cv.string,
vol.Required(CONF_PIN): vol.Any(cv.positive_int, ANALOG_PIN_SCHEMA),
# will be analog mode in future too
vol.Required(CONF_PIN_MODE): PIN_MODE_OUTPUT,
vol.Optional(CONF_INITIAL_STATE, default=False): cv.boolean,
vol.Optional(CONF_NEGATE_STATE, default=False): cv.boolean,
},
required=True,
)
BINARY_SENSOR_SCHEMA = vol.Schema(
{
vol.Required(CONF_NAME): cv.string,
vol.Required(CONF_PIN): vol.Any(cv.positive_int, ANALOG_PIN_SCHEMA),
# will be analog mode in future too
vol.Required(CONF_PIN_MODE): vol.Any(PIN_MODE_INPUT, PIN_MODE_PULLUP),
vol.Optional(CONF_NEGATE_STATE, default=False): cv.boolean,
},
required=True,
)
BOARD_CONFIG_SCHEMA = vol.Schema(
{
vol.Required(CONF_SERIAL_PORT): cv.string,
vol.Optional(CONF_SERIAL_BAUD_RATE): cv.positive_int,
vol.Optional(CONF_ARDUINO_INSTANCE_ID): cv.positive_int,
vol.Optional(CONF_ARDUINO_WAIT): cv.positive_int,
vol.Optional(CONF_SLEEP_TUNE): vol.All(
vol.Coerce(float), vol.Range(min=0.0001)
),
vol.Optional(CONF_SAMPLING_INTERVAL): cv.positive_int,
vol.Optional(CONF_SWITCHES): [SWITCH_SCHEMA],
vol.Optional(CONF_BINARY_SENSORS): [BINARY_SENSOR_SCHEMA],
},
required=True,
)
CONFIG_SCHEMA = vol.Schema(
{DOMAIN: vol.All(cv.ensure_list, [BOARD_CONFIG_SCHEMA])}, extra=vol.ALLOW_EXTRA
)
async def async_setup(hass: HomeAssistant, config: dict) -> bool:
"""Set up the Firmata domain."""
# Delete specific entries that no longer exist in the config
if hass.config_entries.async_entries(DOMAIN):
for entry in hass.config_entries.async_entries(DOMAIN):
remove = True
for board in config[DOMAIN]:
if entry.data[CONF_SERIAL_PORT] == board[CONF_SERIAL_PORT]:
remove = False
break
if remove:
await hass.config_entries.async_remove(entry.entry_id)
# Setup new entries and update old entries
for board in config[DOMAIN]:
firmata_config = copy(board)
existing_entry = False
for entry in hass.config_entries.async_entries(DOMAIN):
if board[CONF_SERIAL_PORT] == entry.data[CONF_SERIAL_PORT]:
existing_entry = True
firmata_config[CONF_NAME] = entry.data[CONF_NAME]
hass.config_entries.async_update_entry(entry, data=firmata_config)
break
if not existing_entry:
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data=firmata_config,
)
)
return True
async def async_setup_entry(
hass: HomeAssistant, config_entry: config_entries.ConfigEntry
) -> bool:
"""Set up a Firmata board for a config entry."""
if DOMAIN not in hass.data:
hass.data[DOMAIN] = {}
_LOGGER.debug(
"Setting up Firmata id %s, name %s, config %s",
config_entry.entry_id,
config_entry.data[CONF_NAME],
config_entry.data,
)
board = FirmataBoard(config_entry.data)
if not await board.async_setup():
return False
hass.data[DOMAIN][config_entry.entry_id] = board
async def handle_shutdown(event) -> None:
"""Handle shutdown of board when Home Assistant shuts down."""
# Ensure board was not already removed previously before shutdown
if config_entry.entry_id in hass.data[DOMAIN]:
await board.async_reset()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, handle_shutdown)
device_registry = await dr.async_get_registry(hass)
device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id,
connections={},
identifiers={(DOMAIN, board.name)},
manufacturer=FIRMATA_MANUFACTURER,
name=board.name,
sw_version=board.firmware_version,
)
if CONF_BINARY_SENSORS in config_entry.data:
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(config_entry, "binary_sensor")
)
if CONF_SWITCHES in config_entry.data:
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(config_entry, "switch")
)
return True
async def async_unload_entry(
hass: HomeAssistant, config_entry: config_entries.ConfigEntry
) -> None:
"""Shutdown and close a Firmata board for a config entry."""
_LOGGER.debug("Closing Firmata board %s", config_entry.data[CONF_NAME])
unload_entries = []
if CONF_BINARY_SENSORS in config_entry.data:
unload_entries.append(
hass.config_entries.async_forward_entry_unload(
config_entry, "binary_sensor"
)
)
if CONF_SWITCHES in config_entry.data:
unload_entries.append(
hass.config_entries.async_forward_entry_unload(config_entry, "switch")
)
results = []
if unload_entries:
results = await asyncio.gather(*unload_entries)
results.append(await hass.data[DOMAIN].pop(config_entry.entry_id).async_reset())
return False not in results

View File

@ -0,0 +1,59 @@
"""Support for Firmata binary sensor input."""
import logging
from homeassistant.components.binary_sensor import BinarySensorEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant
from .const import CONF_NEGATE_STATE, CONF_PIN, CONF_PIN_MODE, DOMAIN
from .entity import FirmataPinEntity
from .pin import FirmataBinaryDigitalInput, FirmataPinUsedException
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities
) -> None:
"""Set up the Firmata binary sensors."""
new_entities = []
board = hass.data[DOMAIN][config_entry.entry_id]
for binary_sensor in board.binary_sensors:
pin = binary_sensor[CONF_PIN]
pin_mode = binary_sensor[CONF_PIN_MODE]
negate = binary_sensor[CONF_NEGATE_STATE]
api = FirmataBinaryDigitalInput(board, pin, pin_mode, negate)
try:
api.setup()
except FirmataPinUsedException:
_LOGGER.error(
"Could not setup binary sensor on pin %s since pin already in use.",
binary_sensor[CONF_PIN],
)
continue
name = binary_sensor[CONF_NAME]
binary_sensor_entity = FirmataBinarySensor(api, config_entry, name, pin)
new_entities.append(binary_sensor_entity)
if new_entities:
async_add_entities(new_entities)
class FirmataBinarySensor(FirmataPinEntity, BinarySensorEntity):
"""Representation of a binary sensor on a Firmata board."""
async def async_added_to_hass(self) -> None:
"""Set up a binary sensor."""
await self._api.start_pin(self.async_write_ha_state)
async def async_will_remove_from_hass(self) -> None:
"""Stop reporting a binary sensor."""
await self._api.stop_pin()
@property
def is_on(self) -> bool:
"""Return true if binary sensor is on."""
return self._api.is_on

View File

@ -0,0 +1,144 @@
"""Code to handle a Firmata board."""
import logging
from typing import Union
from pymata_express.pymata_express import PymataExpress
from pymata_express.pymata_express_serial import serial
from homeassistant.const import CONF_NAME
from .const import (
CONF_ARDUINO_INSTANCE_ID,
CONF_ARDUINO_WAIT,
CONF_BINARY_SENSORS,
CONF_SAMPLING_INTERVAL,
CONF_SERIAL_BAUD_RATE,
CONF_SERIAL_PORT,
CONF_SLEEP_TUNE,
CONF_SWITCHES,
)
_LOGGER = logging.getLogger(__name__)
FirmataPinType = Union[int, str]
class FirmataBoard:
"""Manages a single Firmata board."""
def __init__(self, config: dict):
"""Initialize the board."""
self.config = config
self.api = None
self.firmware_version = None
self.protocol_version = None
self.name = self.config[CONF_NAME]
self.switches = []
self.binary_sensors = []
self.used_pins = []
if CONF_SWITCHES in self.config:
self.switches = self.config[CONF_SWITCHES]
if CONF_BINARY_SENSORS in self.config:
self.binary_sensors = self.config[CONF_BINARY_SENSORS]
async def async_setup(self, tries=0) -> bool:
"""Set up a Firmata instance."""
try:
_LOGGER.debug("Connecting to Firmata %s", self.name)
self.api = await get_board(self.config)
except RuntimeError as err:
_LOGGER.error("Error connecting to PyMata board %s: %s", self.name, err)
return False
except serial.serialutil.SerialTimeoutException as err:
_LOGGER.error(
"Timeout writing to serial port for PyMata board %s: %s", self.name, err
)
return False
except serial.serialutil.SerialException as err:
_LOGGER.error(
"Error connecting to serial port for PyMata board %s: %s",
self.name,
err,
)
return False
self.firmware_version = await self.api.get_firmware_version()
if not self.firmware_version:
_LOGGER.error(
"Error retrieving firmware version from Firmata board %s", self.name
)
return False
if CONF_SAMPLING_INTERVAL in self.config:
try:
await self.api.set_sampling_interval(
self.config[CONF_SAMPLING_INTERVAL]
)
except RuntimeError as err:
_LOGGER.error(
"Error setting sampling interval for PyMata \
board %s: %s",
self.name,
err,
)
return False
_LOGGER.debug("Firmata connection successful for %s", self.name)
return True
async def async_reset(self) -> bool:
"""Reset the board to default state."""
_LOGGER.debug("Shutting down board %s", self.name)
# If the board was never setup, continue.
if self.api is None:
return True
await self.api.shutdown()
self.api = None
return True
def mark_pin_used(self, pin: FirmataPinType) -> bool:
"""Test if a pin is used already on the board or mark as used."""
if pin in self.used_pins:
return False
self.used_pins.append(pin)
return True
def get_pin_type(self, pin: FirmataPinType) -> tuple:
"""Return the type and Firmata location of a pin on the board."""
if isinstance(pin, str):
pin_type = "analog"
firmata_pin = int(pin[1:])
firmata_pin += self.api.first_analog_pin
else:
pin_type = "digital"
firmata_pin = pin
return (pin_type, firmata_pin)
async def get_board(data: dict) -> PymataExpress:
"""Create a Pymata board object."""
board_data = {}
if CONF_SERIAL_PORT in data:
board_data["com_port"] = data[CONF_SERIAL_PORT]
if CONF_SERIAL_BAUD_RATE in data:
board_data["baud_rate"] = data[CONF_SERIAL_BAUD_RATE]
if CONF_ARDUINO_INSTANCE_ID in data:
board_data["arduino_instance_id"] = data[CONF_ARDUINO_INSTANCE_ID]
if CONF_ARDUINO_WAIT in data:
board_data["arduino_wait"] = data[CONF_ARDUINO_WAIT]
if CONF_SLEEP_TUNE in data:
board_data["sleep_tune"] = data[CONF_SLEEP_TUNE]
board_data["autostart"] = False
board_data["shutdown_on_exception"] = True
board_data["close_loop_on_shutdown"] = False
board = PymataExpress(**board_data)
await board.start_aio()
return board

View File

@ -0,0 +1,57 @@
"""Config flow to configure firmata component."""
import logging
from pymata_express.pymata_express_serial import serial
from homeassistant import config_entries
from homeassistant.const import CONF_NAME
from .board import get_board
from .const import CONF_SERIAL_PORT, DOMAIN # pylint: disable=unused-import
_LOGGER = logging.getLogger(__name__)
class FirmataFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a firmata config flow."""
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_PUSH
async def async_step_import(self, import_config: dict):
"""Import a firmata board as a config entry.
This flow is triggered by `async_setup` for configured boards.
This will execute for any board that does not have a
config entry yet (based on entry_id). It validates a connection
and then adds the entry.
"""
name = f"serial-{import_config[CONF_SERIAL_PORT]}"
import_config[CONF_NAME] = name
# Connect to the board to verify connection and then shutdown
# If either fail then we cannot continue
_LOGGER.debug("Connecting to Firmata board %s to test connection", name)
try:
api = await get_board(import_config)
await api.shutdown()
except RuntimeError as err:
_LOGGER.error("Error connecting to PyMata board %s: %s", name, err)
return self.async_abort(reason="cannot_connect")
except serial.serialutil.SerialTimeoutException as err:
_LOGGER.error(
"Timeout writing to serial port for PyMata board %s: %s", name, err
)
return self.async_abort(reason="cannot_connect")
except serial.serialutil.SerialException as err:
_LOGGER.error(
"Error connecting to serial port for PyMata board %s: %s", name, err
)
return self.async_abort(reason="cannot_connect")
_LOGGER.debug("Connection test to Firmata board %s successful", name)
return self.async_create_entry(
title=import_config[CONF_NAME], data=import_config
)

View File

@ -0,0 +1,24 @@
"""Constants for the Firmata component."""
import logging
LOGGER = logging.getLogger(__package__)
CONF_ARDUINO_INSTANCE_ID = "arduino_instance_id"
CONF_ARDUINO_WAIT = "arduino_wait"
CONF_BINARY_SENSORS = "binary_sensors"
CONF_INITIAL_STATE = "initial"
CONF_NAME = "name"
CONF_NEGATE_STATE = "negate"
CONF_PIN = "pin"
CONF_PINS = "pins"
CONF_PIN_MODE = "pin_mode"
PIN_MODE_OUTPUT = "OUTPUT"
PIN_MODE_INPUT = "INPUT"
PIN_MODE_PULLUP = "PULLUP"
CONF_SAMPLING_INTERVAL = "sampling_interval"
CONF_SERIAL_BAUD_RATE = "serial_baud_rate"
CONF_SERIAL_PORT = "serial_port"
CONF_SLEEP_TUNE = "sleep_tune"
CONF_SWITCHES = "switches"
DOMAIN = "firmata"
FIRMATA_MANUFACTURER = "Firmata"

View File

@ -0,0 +1,60 @@
"""Entity for Firmata devices."""
from typing import Type
from homeassistant.config_entries import ConfigEntry
from .board import FirmataPinType
from .const import DOMAIN, FIRMATA_MANUFACTURER
from .pin import FirmataBoardPin
class FirmataEntity:
"""Representation of a Firmata entity."""
def __init__(self, api):
"""Initialize the entity."""
self._api = api
@property
def device_info(self) -> dict:
"""Return device info."""
return {
"connections": {},
"identifiers": {(DOMAIN, self._api.board.name)},
"manufacturer": FIRMATA_MANUFACTURER,
"name": self._api.board.name,
"sw_version": self._api.board.firmware_version,
}
class FirmataPinEntity(FirmataEntity):
"""Representation of a Firmata pin entity."""
def __init__(
self,
api: Type[FirmataBoardPin],
config_entry: ConfigEntry,
name: str,
pin: FirmataPinType,
):
"""Initialize the pin entity."""
super().__init__(api)
self._name = name
location = (config_entry.entry_id, "pin", pin)
self._unique_id = "_".join(str(i) for i in location)
@property
def name(self) -> str:
"""Get the name of the pin."""
return self._name
@property
def should_poll(self) -> bool:
"""No polling needed."""
return False
@property
def unique_id(self) -> str:
"""Return a unique identifier for this device."""
return self._unique_id

View File

@ -0,0 +1,12 @@
{
"domain": "firmata",
"name": "Firmata",
"config_flow": false,
"documentation": "https://www.home-assistant.io/integrations/firmata",
"requirements": [
"pymata-express==1.13"
],
"codeowners": [
"@DaAwesomeP"
]
}

View File

@ -0,0 +1,153 @@
"""Code to handle pins on a Firmata board."""
import logging
from typing import Callable
from homeassistant.core import callback
from .board import FirmataBoard, FirmataPinType
from .const import PIN_MODE_INPUT, PIN_MODE_PULLUP
_LOGGER = logging.getLogger(__name__)
class FirmataPinUsedException(Exception):
"""Represents an exception when a pin is already in use."""
class FirmataBoardPin:
"""Manages a single Firmata board pin."""
def __init__(self, board: FirmataBoard, pin: FirmataPinType, pin_mode: str):
"""Initialize the pin."""
self.board = board
self._pin = pin
self._pin_mode = pin_mode
self._pin_type, self._firmata_pin = self.board.get_pin_type(self._pin)
self._state = None
def setup(self):
"""Set up a pin and make sure it is valid."""
if not self.board.mark_pin_used(self._pin):
raise FirmataPinUsedException(f"Pin {self._pin} already used!")
class FirmataBinaryDigitalOutput(FirmataBoardPin):
"""Representation of a Firmata Digital Output Pin."""
def __init__(
self,
board: FirmataBoard,
pin: FirmataPinType,
pin_mode: str,
initial: bool,
negate: bool,
):
"""Initialize the digital output pin."""
self._initial = initial
self._negate = negate
super().__init__(board, pin, pin_mode)
async def start_pin(self) -> None:
"""Set initial state on a pin."""
_LOGGER.debug(
"Setting initial state for digital output pin %s on board %s",
self._pin,
self.board.name,
)
api = self.board.api
# Only PIN_MODE_OUTPUT mode is supported as binary digital output
await api.set_pin_mode_digital_output(self._firmata_pin)
if self._initial:
new_pin_state = not self._negate
else:
new_pin_state = self._negate
await api.digital_pin_write(self._firmata_pin, int(new_pin_state))
self._state = self._initial
@property
def is_on(self) -> bool:
"""Return true if digital output is on."""
return self._state
async def turn_on(self) -> None:
"""Turn on digital output."""
_LOGGER.debug("Turning digital output on pin %s on", self._pin)
new_pin_state = not self._negate
await self.board.api.digital_pin_write(self._firmata_pin, int(new_pin_state))
self._state = True
async def turn_off(self) -> None:
"""Turn off digital output."""
_LOGGER.debug("Turning digital output on pin %s off", self._pin)
new_pin_state = self._negate
await self.board.api.digital_pin_write(self._firmata_pin, int(new_pin_state))
self._state = False
class FirmataBinaryDigitalInput(FirmataBoardPin):
"""Representation of a Firmata Digital Input Pin."""
def __init__(
self, board: FirmataBoard, pin: FirmataPinType, pin_mode: str, negate: bool
):
"""Initialize the digital input pin."""
self._negate = negate
self._forward_callback = None
super().__init__(board, pin, pin_mode)
async def start_pin(self, forward_callback: Callable[[], None]) -> None:
"""Get initial state and start reporting a pin."""
_LOGGER.debug(
"Starting reporting updates for input pin %s on board %s",
self._pin,
self.board.name,
)
self._forward_callback = forward_callback
api = self.board.api
if self._pin_mode == PIN_MODE_INPUT:
await api.set_pin_mode_digital_input(self._pin, self.latch_callback)
elif self._pin_mode == PIN_MODE_PULLUP:
await api.set_pin_mode_digital_input_pullup(self._pin, self.latch_callback)
new_state = bool((await self.board.api.digital_read(self._firmata_pin))[0])
if self._negate:
new_state = not new_state
self._state = new_state
await api.enable_digital_reporting(self._pin)
self._forward_callback()
async def stop_pin(self) -> None:
"""Stop reporting digital input pin."""
_LOGGER.debug(
"Stopping reporting updates for digital input pin %s on board %s",
self._pin,
self.board.name,
)
api = self.board.api
await api.disable_digital_reporting(self._pin)
@property
def is_on(self) -> bool:
"""Return true if digital input is on."""
return self._state
@callback
async def latch_callback(self, data: list) -> None:
"""Update pin state on callback."""
if data[1] != self._firmata_pin:
return
_LOGGER.debug(
"Received latch %d for digital input pin %d on board %s",
data[2],
self._firmata_pin,
self.board.name,
)
new_state = bool(data[2])
if self._negate:
new_state = not new_state
if self._state == new_state:
return
self._state = new_state
self._forward_callback()

View File

@ -0,0 +1,8 @@
{
"config": {
"abort": {
"cannot_connect": "Cannot connect to Firmata board during setup"
},
"step": {}
}
}

View File

@ -0,0 +1,75 @@
"""Support for Firmata switch output."""
import logging
from homeassistant.components.switch import SwitchEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant
from .const import (
CONF_INITIAL_STATE,
CONF_NEGATE_STATE,
CONF_PIN,
CONF_PIN_MODE,
DOMAIN,
)
from .entity import FirmataPinEntity
from .pin import FirmataBinaryDigitalOutput, FirmataPinUsedException
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities
) -> None:
"""Set up the Firmata switches."""
new_entities = []
board = hass.data[DOMAIN][config_entry.entry_id]
for switch in board.switches:
pin = switch[CONF_PIN]
pin_mode = switch[CONF_PIN_MODE]
initial = switch[CONF_INITIAL_STATE]
negate = switch[CONF_NEGATE_STATE]
api = FirmataBinaryDigitalOutput(board, pin, pin_mode, initial, negate)
try:
api.setup()
except FirmataPinUsedException:
_LOGGER.error(
"Could not setup switch on pin %s since pin already in use.",
switch[CONF_PIN],
)
continue
name = switch[CONF_NAME]
switch_entity = FirmataSwitch(api, config_entry, name, pin)
new_entities.append(switch_entity)
if new_entities:
async_add_entities(new_entities)
class FirmataSwitch(FirmataPinEntity, SwitchEntity):
"""Representation of a switch on a Firmata board."""
async def async_added_to_hass(self) -> None:
"""Set up a switch."""
await self._api.start_pin()
self.async_write_ha_state()
@property
def is_on(self) -> bool:
"""Return true if switch is on."""
return self._api.is_on
async def async_turn_on(self, **kwargs) -> None:
"""Turn on switch."""
_LOGGER.debug("Turning switch %s on", self._name)
await self._api.turn_on()
self.async_write_ha_state()
async def async_turn_off(self, **kwargs) -> None:
"""Turn off switch."""
_LOGGER.debug("Turning switch %s off", self._name)
await self._api.turn_off()
self.async_write_ha_state()

View File

@ -0,0 +1,8 @@
{
"config": {
"abort": {
"cannot_connect": "Cannot connect to Firmata board during setup"
},
"step": {}
}
}

View File

@ -1445,6 +1445,9 @@ pylutron==0.2.5
# homeassistant.components.mailgun
pymailgunner==1.4
# homeassistant.components.firmata
pymata-express==1.13
# homeassistant.components.mediaroom
pymediaroom==0.6.4

View File

@ -670,6 +670,9 @@ pylutron-caseta==0.6.1
# homeassistant.components.mailgun
pymailgunner==1.4
# homeassistant.components.firmata
pymata-express==1.13
# homeassistant.components.melcloud
pymelcloud==2.5.2

View File

@ -0,0 +1 @@
"""Tests for the Firmata integration."""

View File

@ -0,0 +1,92 @@
"""Test the Firmata config flow."""
from pymata_express.pymata_express_serial import serial
from homeassistant import config_entries, setup
from homeassistant.components.firmata.const import CONF_SERIAL_PORT, DOMAIN
from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant
from tests.async_mock import patch
async def test_import_cannot_connect_pymata(hass: HomeAssistant) -> None:
"""Test we fail with an invalid board."""
await setup.async_setup_component(hass, "persistent_notification", {})
with patch(
"homeassistant.components.firmata.board.PymataExpress.start_aio",
side_effect=RuntimeError,
):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={CONF_SERIAL_PORT: "/dev/nonExistent"},
)
assert result["type"] == "abort"
assert result["reason"] == "cannot_connect"
async def test_import_cannot_connect_serial(hass: HomeAssistant) -> None:
"""Test we fail with an invalid board."""
await setup.async_setup_component(hass, "persistent_notification", {})
with patch(
"homeassistant.components.firmata.board.PymataExpress.start_aio",
side_effect=serial.serialutil.SerialException,
):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={CONF_SERIAL_PORT: "/dev/nonExistent"},
)
assert result["type"] == "abort"
assert result["reason"] == "cannot_connect"
async def test_import_cannot_connect_serial_timeout(hass: HomeAssistant) -> None:
"""Test we fail with an invalid board."""
await setup.async_setup_component(hass, "persistent_notification", {})
with patch(
"homeassistant.components.firmata.board.PymataExpress.start_aio",
side_effect=serial.serialutil.SerialTimeoutException,
):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={CONF_SERIAL_PORT: "/dev/nonExistent"},
)
assert result["type"] == "abort"
assert result["reason"] == "cannot_connect"
async def test_import(hass: HomeAssistant) -> None:
"""Test we create an entry from config."""
await setup.async_setup_component(hass, "persistent_notification", {})
with patch(
"homeassistant.components.firmata.board.PymataExpress", autospec=True
), patch(
"homeassistant.components.firmata.async_setup", return_value=True
) as mock_setup, patch(
"homeassistant.components.firmata.async_setup_entry", return_value=True
) as mock_setup_entry:
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={CONF_SERIAL_PORT: "/dev/nonExistent"},
)
assert result["type"] == "create_entry"
assert result["title"] == "serial-/dev/nonExistent"
assert result["data"] == {
CONF_NAME: "serial-/dev/nonExistent",
CONF_SERIAL_PORT: "/dev/nonExistent",
}
await hass.async_block_till_done()
assert len(mock_setup.mock_calls) == 1
assert len(mock_setup_entry.mock_calls) == 1