diff --git a/.coveragerc b/.coveragerc index 1293f8a71f9..0ade0f20790 100644 --- a/.coveragerc +++ b/.coveragerc @@ -254,6 +254,13 @@ omit = homeassistant/components/fibaro/* homeassistant/components/filesize/sensor.py homeassistant/components/fints/sensor.py + homeassistant/components/firmata/__init__.py + homeassistant/components/firmata/binary_sensor.py + homeassistant/components/firmata/board.py + homeassistant/components/firmata/const.py + homeassistant/components/firmata/entity.py + homeassistant/components/firmata/pin.py + homeassistant/components/firmata/switch.py homeassistant/components/fitbit/sensor.py homeassistant/components/fixer/sensor.py homeassistant/components/fleetgo/device_tracker.py diff --git a/CODEOWNERS b/CODEOWNERS index 2d76eec1511..44345be6b37 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -128,6 +128,7 @@ homeassistant/components/ezviz/* @baqs homeassistant/components/fastdotcom/* @rohankapoorcom homeassistant/components/file/* @fabaff homeassistant/components/filter/* @dgomes +homeassistant/components/firmata/* @DaAwesomeP homeassistant/components/fixer/* @fabaff homeassistant/components/flick_electric/* @ZephireNZ homeassistant/components/flock/* @fabaff diff --git a/homeassistant/components/firmata/__init__.py b/homeassistant/components/firmata/__init__.py new file mode 100644 index 00000000000..b64a88cbf57 --- /dev/null +++ b/homeassistant/components/firmata/__init__.py @@ -0,0 +1,191 @@ +"""Support for Arduino-compatible Microcontrollers through Firmata.""" +import asyncio +from copy import copy +import logging + +import voluptuous as vol + +from homeassistant import config_entries +from homeassistant.const import CONF_NAME, EVENT_HOMEASSISTANT_STOP +from homeassistant.core import HomeAssistant +from homeassistant.helpers import config_validation as cv, device_registry as dr + +from .board import FirmataBoard +from .const import ( + CONF_ARDUINO_INSTANCE_ID, + CONF_ARDUINO_WAIT, + CONF_BINARY_SENSORS, + CONF_INITIAL_STATE, + CONF_NEGATE_STATE, + CONF_PIN, + CONF_PIN_MODE, + CONF_SAMPLING_INTERVAL, + CONF_SERIAL_BAUD_RATE, + CONF_SERIAL_PORT, + CONF_SLEEP_TUNE, + CONF_SWITCHES, + DOMAIN, + FIRMATA_MANUFACTURER, + PIN_MODE_INPUT, + PIN_MODE_OUTPUT, + PIN_MODE_PULLUP, +) + +_LOGGER = logging.getLogger(__name__) + +DATA_CONFIGS = "board_configs" + +ANALOG_PIN_SCHEMA = vol.All(cv.string, vol.Match(r"^A[0-9]+$")) + +SWITCH_SCHEMA = vol.Schema( + { + vol.Required(CONF_NAME): cv.string, + vol.Required(CONF_PIN): vol.Any(cv.positive_int, ANALOG_PIN_SCHEMA), + # will be analog mode in future too + vol.Required(CONF_PIN_MODE): PIN_MODE_OUTPUT, + vol.Optional(CONF_INITIAL_STATE, default=False): cv.boolean, + vol.Optional(CONF_NEGATE_STATE, default=False): cv.boolean, + }, + required=True, +) + +BINARY_SENSOR_SCHEMA = vol.Schema( + { + vol.Required(CONF_NAME): cv.string, + vol.Required(CONF_PIN): vol.Any(cv.positive_int, ANALOG_PIN_SCHEMA), + # will be analog mode in future too + vol.Required(CONF_PIN_MODE): vol.Any(PIN_MODE_INPUT, PIN_MODE_PULLUP), + vol.Optional(CONF_NEGATE_STATE, default=False): cv.boolean, + }, + required=True, +) + +BOARD_CONFIG_SCHEMA = vol.Schema( + { + vol.Required(CONF_SERIAL_PORT): cv.string, + vol.Optional(CONF_SERIAL_BAUD_RATE): cv.positive_int, + vol.Optional(CONF_ARDUINO_INSTANCE_ID): cv.positive_int, + vol.Optional(CONF_ARDUINO_WAIT): cv.positive_int, + vol.Optional(CONF_SLEEP_TUNE): vol.All( + vol.Coerce(float), vol.Range(min=0.0001) + ), + vol.Optional(CONF_SAMPLING_INTERVAL): cv.positive_int, + vol.Optional(CONF_SWITCHES): [SWITCH_SCHEMA], + vol.Optional(CONF_BINARY_SENSORS): [BINARY_SENSOR_SCHEMA], + }, + required=True, +) + +CONFIG_SCHEMA = vol.Schema( + {DOMAIN: vol.All(cv.ensure_list, [BOARD_CONFIG_SCHEMA])}, extra=vol.ALLOW_EXTRA +) + + +async def async_setup(hass: HomeAssistant, config: dict) -> bool: + """Set up the Firmata domain.""" + # Delete specific entries that no longer exist in the config + if hass.config_entries.async_entries(DOMAIN): + for entry in hass.config_entries.async_entries(DOMAIN): + remove = True + for board in config[DOMAIN]: + if entry.data[CONF_SERIAL_PORT] == board[CONF_SERIAL_PORT]: + remove = False + break + if remove: + await hass.config_entries.async_remove(entry.entry_id) + + # Setup new entries and update old entries + for board in config[DOMAIN]: + firmata_config = copy(board) + existing_entry = False + for entry in hass.config_entries.async_entries(DOMAIN): + if board[CONF_SERIAL_PORT] == entry.data[CONF_SERIAL_PORT]: + existing_entry = True + firmata_config[CONF_NAME] = entry.data[CONF_NAME] + hass.config_entries.async_update_entry(entry, data=firmata_config) + break + if not existing_entry: + hass.async_create_task( + hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_IMPORT}, + data=firmata_config, + ) + ) + + return True + + +async def async_setup_entry( + hass: HomeAssistant, config_entry: config_entries.ConfigEntry +) -> bool: + """Set up a Firmata board for a config entry.""" + if DOMAIN not in hass.data: + hass.data[DOMAIN] = {} + + _LOGGER.debug( + "Setting up Firmata id %s, name %s, config %s", + config_entry.entry_id, + config_entry.data[CONF_NAME], + config_entry.data, + ) + + board = FirmataBoard(config_entry.data) + + if not await board.async_setup(): + return False + + hass.data[DOMAIN][config_entry.entry_id] = board + + async def handle_shutdown(event) -> None: + """Handle shutdown of board when Home Assistant shuts down.""" + # Ensure board was not already removed previously before shutdown + if config_entry.entry_id in hass.data[DOMAIN]: + await board.async_reset() + + hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, handle_shutdown) + + device_registry = await dr.async_get_registry(hass) + device_registry.async_get_or_create( + config_entry_id=config_entry.entry_id, + connections={}, + identifiers={(DOMAIN, board.name)}, + manufacturer=FIRMATA_MANUFACTURER, + name=board.name, + sw_version=board.firmware_version, + ) + + if CONF_BINARY_SENSORS in config_entry.data: + hass.async_create_task( + hass.config_entries.async_forward_entry_setup(config_entry, "binary_sensor") + ) + if CONF_SWITCHES in config_entry.data: + hass.async_create_task( + hass.config_entries.async_forward_entry_setup(config_entry, "switch") + ) + return True + + +async def async_unload_entry( + hass: HomeAssistant, config_entry: config_entries.ConfigEntry +) -> None: + """Shutdown and close a Firmata board for a config entry.""" + _LOGGER.debug("Closing Firmata board %s", config_entry.data[CONF_NAME]) + + unload_entries = [] + if CONF_BINARY_SENSORS in config_entry.data: + unload_entries.append( + hass.config_entries.async_forward_entry_unload( + config_entry, "binary_sensor" + ) + ) + if CONF_SWITCHES in config_entry.data: + unload_entries.append( + hass.config_entries.async_forward_entry_unload(config_entry, "switch") + ) + results = [] + if unload_entries: + results = await asyncio.gather(*unload_entries) + results.append(await hass.data[DOMAIN].pop(config_entry.entry_id).async_reset()) + + return False not in results diff --git a/homeassistant/components/firmata/binary_sensor.py b/homeassistant/components/firmata/binary_sensor.py new file mode 100644 index 00000000000..4576b8dc69e --- /dev/null +++ b/homeassistant/components/firmata/binary_sensor.py @@ -0,0 +1,59 @@ +"""Support for Firmata binary sensor input.""" + +import logging + +from homeassistant.components.binary_sensor import BinarySensorEntity +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import CONF_NAME +from homeassistant.core import HomeAssistant + +from .const import CONF_NEGATE_STATE, CONF_PIN, CONF_PIN_MODE, DOMAIN +from .entity import FirmataPinEntity +from .pin import FirmataBinaryDigitalInput, FirmataPinUsedException + +_LOGGER = logging.getLogger(__name__) + + +async def async_setup_entry( + hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities +) -> None: + """Set up the Firmata binary sensors.""" + new_entities = [] + + board = hass.data[DOMAIN][config_entry.entry_id] + for binary_sensor in board.binary_sensors: + pin = binary_sensor[CONF_PIN] + pin_mode = binary_sensor[CONF_PIN_MODE] + negate = binary_sensor[CONF_NEGATE_STATE] + api = FirmataBinaryDigitalInput(board, pin, pin_mode, negate) + try: + api.setup() + except FirmataPinUsedException: + _LOGGER.error( + "Could not setup binary sensor on pin %s since pin already in use.", + binary_sensor[CONF_PIN], + ) + continue + name = binary_sensor[CONF_NAME] + binary_sensor_entity = FirmataBinarySensor(api, config_entry, name, pin) + new_entities.append(binary_sensor_entity) + + if new_entities: + async_add_entities(new_entities) + + +class FirmataBinarySensor(FirmataPinEntity, BinarySensorEntity): + """Representation of a binary sensor on a Firmata board.""" + + async def async_added_to_hass(self) -> None: + """Set up a binary sensor.""" + await self._api.start_pin(self.async_write_ha_state) + + async def async_will_remove_from_hass(self) -> None: + """Stop reporting a binary sensor.""" + await self._api.stop_pin() + + @property + def is_on(self) -> bool: + """Return true if binary sensor is on.""" + return self._api.is_on diff --git a/homeassistant/components/firmata/board.py b/homeassistant/components/firmata/board.py new file mode 100644 index 00000000000..bae30014d63 --- /dev/null +++ b/homeassistant/components/firmata/board.py @@ -0,0 +1,144 @@ +"""Code to handle a Firmata board.""" +import logging +from typing import Union + +from pymata_express.pymata_express import PymataExpress +from pymata_express.pymata_express_serial import serial + +from homeassistant.const import CONF_NAME + +from .const import ( + CONF_ARDUINO_INSTANCE_ID, + CONF_ARDUINO_WAIT, + CONF_BINARY_SENSORS, + CONF_SAMPLING_INTERVAL, + CONF_SERIAL_BAUD_RATE, + CONF_SERIAL_PORT, + CONF_SLEEP_TUNE, + CONF_SWITCHES, +) + +_LOGGER = logging.getLogger(__name__) + +FirmataPinType = Union[int, str] + + +class FirmataBoard: + """Manages a single Firmata board.""" + + def __init__(self, config: dict): + """Initialize the board.""" + self.config = config + self.api = None + self.firmware_version = None + self.protocol_version = None + self.name = self.config[CONF_NAME] + self.switches = [] + self.binary_sensors = [] + self.used_pins = [] + + if CONF_SWITCHES in self.config: + self.switches = self.config[CONF_SWITCHES] + if CONF_BINARY_SENSORS in self.config: + self.binary_sensors = self.config[CONF_BINARY_SENSORS] + + async def async_setup(self, tries=0) -> bool: + """Set up a Firmata instance.""" + try: + _LOGGER.debug("Connecting to Firmata %s", self.name) + self.api = await get_board(self.config) + except RuntimeError as err: + _LOGGER.error("Error connecting to PyMata board %s: %s", self.name, err) + return False + except serial.serialutil.SerialTimeoutException as err: + _LOGGER.error( + "Timeout writing to serial port for PyMata board %s: %s", self.name, err + ) + return False + except serial.serialutil.SerialException as err: + _LOGGER.error( + "Error connecting to serial port for PyMata board %s: %s", + self.name, + err, + ) + return False + + self.firmware_version = await self.api.get_firmware_version() + if not self.firmware_version: + _LOGGER.error( + "Error retrieving firmware version from Firmata board %s", self.name + ) + return False + + if CONF_SAMPLING_INTERVAL in self.config: + try: + await self.api.set_sampling_interval( + self.config[CONF_SAMPLING_INTERVAL] + ) + except RuntimeError as err: + _LOGGER.error( + "Error setting sampling interval for PyMata \ +board %s: %s", + self.name, + err, + ) + return False + + _LOGGER.debug("Firmata connection successful for %s", self.name) + return True + + async def async_reset(self) -> bool: + """Reset the board to default state.""" + _LOGGER.debug("Shutting down board %s", self.name) + # If the board was never setup, continue. + if self.api is None: + return True + + await self.api.shutdown() + self.api = None + + return True + + def mark_pin_used(self, pin: FirmataPinType) -> bool: + """Test if a pin is used already on the board or mark as used.""" + if pin in self.used_pins: + return False + self.used_pins.append(pin) + return True + + def get_pin_type(self, pin: FirmataPinType) -> tuple: + """Return the type and Firmata location of a pin on the board.""" + if isinstance(pin, str): + pin_type = "analog" + firmata_pin = int(pin[1:]) + firmata_pin += self.api.first_analog_pin + else: + pin_type = "digital" + firmata_pin = pin + return (pin_type, firmata_pin) + + +async def get_board(data: dict) -> PymataExpress: + """Create a Pymata board object.""" + board_data = {} + + if CONF_SERIAL_PORT in data: + board_data["com_port"] = data[CONF_SERIAL_PORT] + if CONF_SERIAL_BAUD_RATE in data: + board_data["baud_rate"] = data[CONF_SERIAL_BAUD_RATE] + if CONF_ARDUINO_INSTANCE_ID in data: + board_data["arduino_instance_id"] = data[CONF_ARDUINO_INSTANCE_ID] + + if CONF_ARDUINO_WAIT in data: + board_data["arduino_wait"] = data[CONF_ARDUINO_WAIT] + if CONF_SLEEP_TUNE in data: + board_data["sleep_tune"] = data[CONF_SLEEP_TUNE] + + board_data["autostart"] = False + board_data["shutdown_on_exception"] = True + board_data["close_loop_on_shutdown"] = False + + board = PymataExpress(**board_data) + + await board.start_aio() + return board diff --git a/homeassistant/components/firmata/config_flow.py b/homeassistant/components/firmata/config_flow.py new file mode 100644 index 00000000000..a86d97e9e2e --- /dev/null +++ b/homeassistant/components/firmata/config_flow.py @@ -0,0 +1,57 @@ +"""Config flow to configure firmata component.""" + +import logging + +from pymata_express.pymata_express_serial import serial + +from homeassistant import config_entries +from homeassistant.const import CONF_NAME + +from .board import get_board +from .const import CONF_SERIAL_PORT, DOMAIN # pylint: disable=unused-import + +_LOGGER = logging.getLogger(__name__) + + +class FirmataFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): + """Handle a firmata config flow.""" + + VERSION = 1 + CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_PUSH + + async def async_step_import(self, import_config: dict): + """Import a firmata board as a config entry. + + This flow is triggered by `async_setup` for configured boards. + + This will execute for any board that does not have a + config entry yet (based on entry_id). It validates a connection + and then adds the entry. + """ + name = f"serial-{import_config[CONF_SERIAL_PORT]}" + import_config[CONF_NAME] = name + + # Connect to the board to verify connection and then shutdown + # If either fail then we cannot continue + _LOGGER.debug("Connecting to Firmata board %s to test connection", name) + try: + api = await get_board(import_config) + await api.shutdown() + except RuntimeError as err: + _LOGGER.error("Error connecting to PyMata board %s: %s", name, err) + return self.async_abort(reason="cannot_connect") + except serial.serialutil.SerialTimeoutException as err: + _LOGGER.error( + "Timeout writing to serial port for PyMata board %s: %s", name, err + ) + return self.async_abort(reason="cannot_connect") + except serial.serialutil.SerialException as err: + _LOGGER.error( + "Error connecting to serial port for PyMata board %s: %s", name, err + ) + return self.async_abort(reason="cannot_connect") + _LOGGER.debug("Connection test to Firmata board %s successful", name) + + return self.async_create_entry( + title=import_config[CONF_NAME], data=import_config + ) diff --git a/homeassistant/components/firmata/const.py b/homeassistant/components/firmata/const.py new file mode 100644 index 00000000000..1ad3cbb8423 --- /dev/null +++ b/homeassistant/components/firmata/const.py @@ -0,0 +1,24 @@ +"""Constants for the Firmata component.""" +import logging + +LOGGER = logging.getLogger(__package__) + +CONF_ARDUINO_INSTANCE_ID = "arduino_instance_id" +CONF_ARDUINO_WAIT = "arduino_wait" +CONF_BINARY_SENSORS = "binary_sensors" +CONF_INITIAL_STATE = "initial" +CONF_NAME = "name" +CONF_NEGATE_STATE = "negate" +CONF_PIN = "pin" +CONF_PINS = "pins" +CONF_PIN_MODE = "pin_mode" +PIN_MODE_OUTPUT = "OUTPUT" +PIN_MODE_INPUT = "INPUT" +PIN_MODE_PULLUP = "PULLUP" +CONF_SAMPLING_INTERVAL = "sampling_interval" +CONF_SERIAL_BAUD_RATE = "serial_baud_rate" +CONF_SERIAL_PORT = "serial_port" +CONF_SLEEP_TUNE = "sleep_tune" +CONF_SWITCHES = "switches" +DOMAIN = "firmata" +FIRMATA_MANUFACTURER = "Firmata" diff --git a/homeassistant/components/firmata/entity.py b/homeassistant/components/firmata/entity.py new file mode 100644 index 00000000000..50ab58b9046 --- /dev/null +++ b/homeassistant/components/firmata/entity.py @@ -0,0 +1,60 @@ +"""Entity for Firmata devices.""" +from typing import Type + +from homeassistant.config_entries import ConfigEntry + +from .board import FirmataPinType +from .const import DOMAIN, FIRMATA_MANUFACTURER +from .pin import FirmataBoardPin + + +class FirmataEntity: + """Representation of a Firmata entity.""" + + def __init__(self, api): + """Initialize the entity.""" + self._api = api + + @property + def device_info(self) -> dict: + """Return device info.""" + return { + "connections": {}, + "identifiers": {(DOMAIN, self._api.board.name)}, + "manufacturer": FIRMATA_MANUFACTURER, + "name": self._api.board.name, + "sw_version": self._api.board.firmware_version, + } + + +class FirmataPinEntity(FirmataEntity): + """Representation of a Firmata pin entity.""" + + def __init__( + self, + api: Type[FirmataBoardPin], + config_entry: ConfigEntry, + name: str, + pin: FirmataPinType, + ): + """Initialize the pin entity.""" + super().__init__(api) + self._name = name + + location = (config_entry.entry_id, "pin", pin) + self._unique_id = "_".join(str(i) for i in location) + + @property + def name(self) -> str: + """Get the name of the pin.""" + return self._name + + @property + def should_poll(self) -> bool: + """No polling needed.""" + return False + + @property + def unique_id(self) -> str: + """Return a unique identifier for this device.""" + return self._unique_id diff --git a/homeassistant/components/firmata/manifest.json b/homeassistant/components/firmata/manifest.json new file mode 100644 index 00000000000..d894c0a440b --- /dev/null +++ b/homeassistant/components/firmata/manifest.json @@ -0,0 +1,12 @@ +{ + "domain": "firmata", + "name": "Firmata", + "config_flow": false, + "documentation": "https://www.home-assistant.io/integrations/firmata", + "requirements": [ + "pymata-express==1.13" + ], + "codeowners": [ + "@DaAwesomeP" + ] +} \ No newline at end of file diff --git a/homeassistant/components/firmata/pin.py b/homeassistant/components/firmata/pin.py new file mode 100644 index 00000000000..644986fb66c --- /dev/null +++ b/homeassistant/components/firmata/pin.py @@ -0,0 +1,153 @@ +"""Code to handle pins on a Firmata board.""" +import logging +from typing import Callable + +from homeassistant.core import callback + +from .board import FirmataBoard, FirmataPinType +from .const import PIN_MODE_INPUT, PIN_MODE_PULLUP + +_LOGGER = logging.getLogger(__name__) + + +class FirmataPinUsedException(Exception): + """Represents an exception when a pin is already in use.""" + + +class FirmataBoardPin: + """Manages a single Firmata board pin.""" + + def __init__(self, board: FirmataBoard, pin: FirmataPinType, pin_mode: str): + """Initialize the pin.""" + self.board = board + self._pin = pin + self._pin_mode = pin_mode + self._pin_type, self._firmata_pin = self.board.get_pin_type(self._pin) + self._state = None + + def setup(self): + """Set up a pin and make sure it is valid.""" + if not self.board.mark_pin_used(self._pin): + raise FirmataPinUsedException(f"Pin {self._pin} already used!") + + +class FirmataBinaryDigitalOutput(FirmataBoardPin): + """Representation of a Firmata Digital Output Pin.""" + + def __init__( + self, + board: FirmataBoard, + pin: FirmataPinType, + pin_mode: str, + initial: bool, + negate: bool, + ): + """Initialize the digital output pin.""" + self._initial = initial + self._negate = negate + super().__init__(board, pin, pin_mode) + + async def start_pin(self) -> None: + """Set initial state on a pin.""" + _LOGGER.debug( + "Setting initial state for digital output pin %s on board %s", + self._pin, + self.board.name, + ) + api = self.board.api + # Only PIN_MODE_OUTPUT mode is supported as binary digital output + await api.set_pin_mode_digital_output(self._firmata_pin) + + if self._initial: + new_pin_state = not self._negate + else: + new_pin_state = self._negate + await api.digital_pin_write(self._firmata_pin, int(new_pin_state)) + self._state = self._initial + + @property + def is_on(self) -> bool: + """Return true if digital output is on.""" + return self._state + + async def turn_on(self) -> None: + """Turn on digital output.""" + _LOGGER.debug("Turning digital output on pin %s on", self._pin) + new_pin_state = not self._negate + await self.board.api.digital_pin_write(self._firmata_pin, int(new_pin_state)) + self._state = True + + async def turn_off(self) -> None: + """Turn off digital output.""" + _LOGGER.debug("Turning digital output on pin %s off", self._pin) + new_pin_state = self._negate + await self.board.api.digital_pin_write(self._firmata_pin, int(new_pin_state)) + self._state = False + + +class FirmataBinaryDigitalInput(FirmataBoardPin): + """Representation of a Firmata Digital Input Pin.""" + + def __init__( + self, board: FirmataBoard, pin: FirmataPinType, pin_mode: str, negate: bool + ): + """Initialize the digital input pin.""" + self._negate = negate + self._forward_callback = None + super().__init__(board, pin, pin_mode) + + async def start_pin(self, forward_callback: Callable[[], None]) -> None: + """Get initial state and start reporting a pin.""" + _LOGGER.debug( + "Starting reporting updates for input pin %s on board %s", + self._pin, + self.board.name, + ) + self._forward_callback = forward_callback + api = self.board.api + if self._pin_mode == PIN_MODE_INPUT: + await api.set_pin_mode_digital_input(self._pin, self.latch_callback) + elif self._pin_mode == PIN_MODE_PULLUP: + await api.set_pin_mode_digital_input_pullup(self._pin, self.latch_callback) + + new_state = bool((await self.board.api.digital_read(self._firmata_pin))[0]) + if self._negate: + new_state = not new_state + self._state = new_state + + await api.enable_digital_reporting(self._pin) + self._forward_callback() + + async def stop_pin(self) -> None: + """Stop reporting digital input pin.""" + _LOGGER.debug( + "Stopping reporting updates for digital input pin %s on board %s", + self._pin, + self.board.name, + ) + api = self.board.api + await api.disable_digital_reporting(self._pin) + + @property + def is_on(self) -> bool: + """Return true if digital input is on.""" + return self._state + + @callback + async def latch_callback(self, data: list) -> None: + """Update pin state on callback.""" + if data[1] != self._firmata_pin: + return + _LOGGER.debug( + "Received latch %d for digital input pin %d on board %s", + data[2], + self._firmata_pin, + self.board.name, + ) + new_state = bool(data[2]) + if self._negate: + new_state = not new_state + if self._state == new_state: + return + self._state = new_state + self._forward_callback() diff --git a/homeassistant/components/firmata/strings.json b/homeassistant/components/firmata/strings.json new file mode 100644 index 00000000000..68d7ae8c041 --- /dev/null +++ b/homeassistant/components/firmata/strings.json @@ -0,0 +1,8 @@ +{ + "config": { + "abort": { + "cannot_connect": "Cannot connect to Firmata board during setup" + }, + "step": {} + } +} diff --git a/homeassistant/components/firmata/switch.py b/homeassistant/components/firmata/switch.py new file mode 100644 index 00000000000..ab67a6d6840 --- /dev/null +++ b/homeassistant/components/firmata/switch.py @@ -0,0 +1,75 @@ +"""Support for Firmata switch output.""" + +import logging + +from homeassistant.components.switch import SwitchEntity +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import CONF_NAME +from homeassistant.core import HomeAssistant + +from .const import ( + CONF_INITIAL_STATE, + CONF_NEGATE_STATE, + CONF_PIN, + CONF_PIN_MODE, + DOMAIN, +) +from .entity import FirmataPinEntity +from .pin import FirmataBinaryDigitalOutput, FirmataPinUsedException + +_LOGGER = logging.getLogger(__name__) + + +async def async_setup_entry( + hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities +) -> None: + """Set up the Firmata switches.""" + new_entities = [] + + board = hass.data[DOMAIN][config_entry.entry_id] + for switch in board.switches: + pin = switch[CONF_PIN] + pin_mode = switch[CONF_PIN_MODE] + initial = switch[CONF_INITIAL_STATE] + negate = switch[CONF_NEGATE_STATE] + api = FirmataBinaryDigitalOutput(board, pin, pin_mode, initial, negate) + try: + api.setup() + except FirmataPinUsedException: + _LOGGER.error( + "Could not setup switch on pin %s since pin already in use.", + switch[CONF_PIN], + ) + continue + name = switch[CONF_NAME] + switch_entity = FirmataSwitch(api, config_entry, name, pin) + new_entities.append(switch_entity) + + if new_entities: + async_add_entities(new_entities) + + +class FirmataSwitch(FirmataPinEntity, SwitchEntity): + """Representation of a switch on a Firmata board.""" + + async def async_added_to_hass(self) -> None: + """Set up a switch.""" + await self._api.start_pin() + self.async_write_ha_state() + + @property + def is_on(self) -> bool: + """Return true if switch is on.""" + return self._api.is_on + + async def async_turn_on(self, **kwargs) -> None: + """Turn on switch.""" + _LOGGER.debug("Turning switch %s on", self._name) + await self._api.turn_on() + self.async_write_ha_state() + + async def async_turn_off(self, **kwargs) -> None: + """Turn off switch.""" + _LOGGER.debug("Turning switch %s off", self._name) + await self._api.turn_off() + self.async_write_ha_state() diff --git a/homeassistant/components/firmata/translations/en.json b/homeassistant/components/firmata/translations/en.json new file mode 100644 index 00000000000..68d7ae8c041 --- /dev/null +++ b/homeassistant/components/firmata/translations/en.json @@ -0,0 +1,8 @@ +{ + "config": { + "abort": { + "cannot_connect": "Cannot connect to Firmata board during setup" + }, + "step": {} + } +} diff --git a/requirements_all.txt b/requirements_all.txt index 2d2e22f5983..3f572bb8f8a 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -1445,6 +1445,9 @@ pylutron==0.2.5 # homeassistant.components.mailgun pymailgunner==1.4 +# homeassistant.components.firmata +pymata-express==1.13 + # homeassistant.components.mediaroom pymediaroom==0.6.4 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index ecf468ca6b4..c3aae1e3b80 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -670,6 +670,9 @@ pylutron-caseta==0.6.1 # homeassistant.components.mailgun pymailgunner==1.4 +# homeassistant.components.firmata +pymata-express==1.13 + # homeassistant.components.melcloud pymelcloud==2.5.2 diff --git a/tests/components/firmata/__init__.py b/tests/components/firmata/__init__.py new file mode 100644 index 00000000000..48e58cf5c36 --- /dev/null +++ b/tests/components/firmata/__init__.py @@ -0,0 +1 @@ +"""Tests for the Firmata integration.""" diff --git a/tests/components/firmata/test_config_flow.py b/tests/components/firmata/test_config_flow.py new file mode 100644 index 00000000000..e77f219e320 --- /dev/null +++ b/tests/components/firmata/test_config_flow.py @@ -0,0 +1,92 @@ +"""Test the Firmata config flow.""" +from pymata_express.pymata_express_serial import serial + +from homeassistant import config_entries, setup +from homeassistant.components.firmata.const import CONF_SERIAL_PORT, DOMAIN +from homeassistant.const import CONF_NAME +from homeassistant.core import HomeAssistant + +from tests.async_mock import patch + + +async def test_import_cannot_connect_pymata(hass: HomeAssistant) -> None: + """Test we fail with an invalid board.""" + await setup.async_setup_component(hass, "persistent_notification", {}) + + with patch( + "homeassistant.components.firmata.board.PymataExpress.start_aio", + side_effect=RuntimeError, + ): + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_IMPORT}, + data={CONF_SERIAL_PORT: "/dev/nonExistent"}, + ) + + assert result["type"] == "abort" + assert result["reason"] == "cannot_connect" + + +async def test_import_cannot_connect_serial(hass: HomeAssistant) -> None: + """Test we fail with an invalid board.""" + await setup.async_setup_component(hass, "persistent_notification", {}) + + with patch( + "homeassistant.components.firmata.board.PymataExpress.start_aio", + side_effect=serial.serialutil.SerialException, + ): + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_IMPORT}, + data={CONF_SERIAL_PORT: "/dev/nonExistent"}, + ) + + assert result["type"] == "abort" + assert result["reason"] == "cannot_connect" + + +async def test_import_cannot_connect_serial_timeout(hass: HomeAssistant) -> None: + """Test we fail with an invalid board.""" + await setup.async_setup_component(hass, "persistent_notification", {}) + + with patch( + "homeassistant.components.firmata.board.PymataExpress.start_aio", + side_effect=serial.serialutil.SerialTimeoutException, + ): + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_IMPORT}, + data={CONF_SERIAL_PORT: "/dev/nonExistent"}, + ) + + assert result["type"] == "abort" + assert result["reason"] == "cannot_connect" + + +async def test_import(hass: HomeAssistant) -> None: + """Test we create an entry from config.""" + await setup.async_setup_component(hass, "persistent_notification", {}) + + with patch( + "homeassistant.components.firmata.board.PymataExpress", autospec=True + ), patch( + "homeassistant.components.firmata.async_setup", return_value=True + ) as mock_setup, patch( + "homeassistant.components.firmata.async_setup_entry", return_value=True + ) as mock_setup_entry: + + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_IMPORT}, + data={CONF_SERIAL_PORT: "/dev/nonExistent"}, + ) + + assert result["type"] == "create_entry" + assert result["title"] == "serial-/dev/nonExistent" + assert result["data"] == { + CONF_NAME: "serial-/dev/nonExistent", + CONF_SERIAL_PORT: "/dev/nonExistent", + } + await hass.async_block_till_done() + assert len(mock_setup.mock_calls) == 1 + assert len(mock_setup_entry.mock_calls) == 1