Improve type checking for rfxtrx ()

pull/61406/head
Joakim Plate 2021-12-09 22:35:53 +01:00 committed by GitHub
parent f512bacfc7
commit ea3e08c041
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 166 additions and 88 deletions

View File

@ -1,9 +1,12 @@
"""Support for RFXtrx devices."""
from __future__ import annotations
import asyncio
import binascii
import copy
import functools
import logging
from typing import NamedTuple
import RFXtrx as rfxtrxmod
import async_timeout
@ -49,6 +52,14 @@ SIGNAL_EVENT = f"{DOMAIN}_event"
_LOGGER = logging.getLogger(__name__)
class DeviceTuple(NamedTuple):
"""Representation of a device in rfxtrx."""
packettype: str
subtype: str
id_string: str
def _bytearray_string(data):
val = cv.string(data)
try:
@ -225,7 +236,7 @@ async def async_setup_internal(hass, entry: config_entries.ConfigEntry):
hass.services.async_register(DOMAIN, SERVICE_SEND, send, schema=SERVICE_SEND_SCHEMA)
def get_rfx_object(packetid):
def get_rfx_object(packetid: str) -> rfxtrxmod.RFXtrxEvent | None:
"""Return the RFXObject with the packetid."""
try:
binarypacket = bytearray.fromhex(packetid)
@ -246,10 +257,10 @@ def get_rfx_object(packetid):
return obj
def get_pt2262_deviceid(device_id, nb_data_bits):
def get_pt2262_deviceid(device_id: str, nb_data_bits: int | None) -> bytes | None:
"""Extract and return the address bits from a Lighting4/PT2262 packet."""
if nb_data_bits is None:
return
return None
try:
data = bytearray.fromhex(device_id)
@ -262,7 +273,7 @@ def get_pt2262_deviceid(device_id, nb_data_bits):
return binascii.hexlify(data)
def get_pt2262_cmd(device_id, data_bits):
def get_pt2262_cmd(device_id: str, data_bits: int) -> str | None:
"""Extract and return the data bits from a Lighting4/PT2262 packet."""
try:
data = bytearray.fromhex(device_id)
@ -274,7 +285,9 @@ def get_pt2262_cmd(device_id, data_bits):
return hex(data[-1] & mask)
def get_device_data_bits(device, devices):
def get_device_data_bits(
device: rfxtrxmod.RFXtrxDevice, devices: dict[DeviceTuple, dict]
) -> int | None:
"""Deduce data bits for device based on a cache of device bits."""
data_bits = None
if device.packettype == DEVICE_PACKET_TYPE_LIGHTING4:
@ -286,7 +299,7 @@ def get_device_data_bits(device, devices):
return data_bits
def find_possible_pt2262_device(device_ids, device_id):
def find_possible_pt2262_device(device_ids: list[str], device_id: str) -> str | None:
"""Look for the device which id matches the given device_id parameter."""
for dev_id in device_ids:
if len(dev_id) == len(device_id):
@ -313,9 +326,11 @@ def find_possible_pt2262_device(device_ids, device_id):
return None
def get_device_id(device, data_bits=None):
def get_device_id(
device: rfxtrxmod.RFXtrxDevice, data_bits: int | None = None
) -> DeviceTuple:
"""Calculate a device id for device."""
id_string = device.id_string
id_string: str = device.id_string
if (
data_bits
and device.packettype == DEVICE_PACKET_TYPE_LIGHTING4
@ -323,7 +338,7 @@ def get_device_id(device, data_bits=None):
):
id_string = masked_id.decode("ASCII")
return (f"{device.packettype:x}", f"{device.subtype:x}", id_string)
return DeviceTuple(f"{device.packettype:x}", f"{device.subtype:x}", id_string)
def connect_auto_add(hass, entry_data, callback_fun):
@ -340,7 +355,15 @@ class RfxtrxEntity(RestoreEntity):
Contains the common logic for Rfxtrx lights and switches.
"""
def __init__(self, device, device_id, event=None):
_device: rfxtrxmod.RFXtrxDevice
_event: rfxtrxmod.RFXtrxEvent | None
def __init__(
self,
device: rfxtrxmod.RFXtrxDevice,
device_id: DeviceTuple,
event: rfxtrxmod.RFXtrxEvent | None = None,
) -> None:
"""Initialize the device."""
self._name = f"{device.type_string} {device.id_string}"
self._device = device
@ -405,21 +428,28 @@ class RfxtrxEntity(RestoreEntity):
name=f"{self._device.type_string} {self._device.id_string}",
)
def _event_applies(self, event, device_id):
def _event_applies(self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple):
"""Check if event applies to me."""
if "Command" in event.values and event.values["Command"] in COMMAND_GROUP_LIST:
(group_id, _, _) = event.device.id_string.partition(":")
return group_id == self._group_id
if isinstance(event, rfxtrxmod.ControlEvent):
if (
"Command" in event.values
and event.values["Command"] in COMMAND_GROUP_LIST
):
device: rfxtrxmod.RFXtrxDevice = event.device
(group_id, _, _) = device.id_string.partition(":")
return group_id == self._group_id
# Otherwise, the event only applies to the matching device.
return device_id == self._device_id
def _apply_event(self, event):
def _apply_event(self, event: rfxtrxmod.RFXtrxEvent) -> None:
"""Apply a received event."""
self._event = event
@callback
def _handle_event(self, event, device_id):
def _handle_event(
self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple
) -> None:
"""Handle a reception of data, overridden by other classes."""
@ -429,11 +459,17 @@ class RfxtrxCommandEntity(RfxtrxEntity):
Contains the common logic for Rfxtrx lights and switches.
"""
def __init__(self, device, device_id, signal_repetitions=1, event=None):
def __init__(
self,
device: rfxtrxmod.RFXtrxDevice,
device_id: DeviceTuple,
signal_repetitions: int = 1,
event: rfxtrxmod.RFXtrxEvent | None = None,
) -> None:
"""Initialzie a switch or light device."""
super().__init__(device, device_id, event=event)
self.signal_repetitions = signal_repetitions
self._state = None
self._state: bool | None = None
async def _async_send(self, fun, *args):
rfx_object = self.hass.data[DOMAIN][DATA_RFXOBJECT]

View File

@ -17,10 +17,11 @@ from homeassistant.const import (
CONF_DEVICES,
STATE_ON,
)
from homeassistant.core import callback
from homeassistant.core import CALLBACK_TYPE, callback
from homeassistant.helpers import event as evt
from . import (
DeviceTuple,
RfxtrxEntity,
connect_auto_add,
find_possible_pt2262_device,
@ -83,7 +84,7 @@ SENSOR_TYPES = (
SENSOR_TYPES_DICT = {desc.key: desc for desc in SENSOR_TYPES}
def supported(event):
def supported(event: rfxtrxmod.RFXtrxEvent):
"""Return whether an event supports binary_sensor."""
if isinstance(event, rfxtrxmod.ControlEvent):
return True
@ -103,8 +104,8 @@ async def async_setup_entry(
"""Set up platform."""
sensors = []
device_ids = set()
pt2262_devices = []
device_ids: set[DeviceTuple] = set()
pt2262_devices: list[str] = []
discovery_info = config_entry.data
@ -127,25 +128,29 @@ async def async_setup_entry(
continue
device_ids.add(device_id)
if event.device.packettype == DEVICE_PACKET_TYPE_LIGHTING4:
find_possible_pt2262_device(pt2262_devices, event.device.id_string)
pt2262_devices.append(event.device.id_string)
device: rfxtrxmod.RFXtrxDevice = event.device
device = RfxtrxBinarySensor(
event.device,
if device.packettype == DEVICE_PACKET_TYPE_LIGHTING4:
find_possible_pt2262_device(pt2262_devices, device.id_string)
pt2262_devices.append(device.id_string)
entity = RfxtrxBinarySensor(
device,
device_id,
get_sensor_description(event.device.type_string),
get_sensor_description(device.type_string),
entity_info.get(CONF_OFF_DELAY),
entity_info.get(CONF_DATA_BITS),
entity_info.get(CONF_COMMAND_ON),
entity_info.get(CONF_COMMAND_OFF),
)
sensors.append(device)
sensors.append(entity)
async_add_entities(sensors)
@callback
def binary_sensor_update(event, device_id):
def binary_sensor_update(
event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple
) -> None:
"""Call for control updates from the RFXtrx gateway."""
if not supported(event):
return
@ -179,22 +184,22 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
def __init__(
self,
device,
device_id,
entity_description,
off_delay=None,
data_bits=None,
cmd_on=None,
cmd_off=None,
event=None,
):
device: rfxtrxmod.RFXtrxDevice,
device_id: DeviceTuple,
entity_description: BinarySensorEntityDescription,
off_delay: float | None = None,
data_bits: int | None = None,
cmd_on: int | None = None,
cmd_off: int | None = None,
event: rfxtrxmod.RFXtrxEvent | None = None,
) -> None:
"""Initialize the RFXtrx sensor."""
super().__init__(device, device_id, event=event)
self.entity_description = entity_description
self._data_bits = data_bits
self._off_delay = off_delay
self._state = None
self._delay_listener = None
self._state: bool | None = None
self._delay_listener: CALLBACK_TYPE | None = None
self._cmd_on = cmd_on
self._cmd_off = cmd_off
@ -220,11 +225,12 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
"""Return true if the sensor state is True."""
return self._state
def _apply_event_lighting4(self, event):
def _apply_event_lighting4(self, event: rfxtrxmod.RFXtrxEvent):
"""Apply event for a lighting 4 device."""
if self._data_bits is not None:
cmd = get_pt2262_cmd(event.device.id_string, self._data_bits)
cmd = int(cmd, 16)
cmdstr = get_pt2262_cmd(event.device.id_string, self._data_bits)
assert cmdstr
cmd = int(cmdstr, 16)
if cmd == self._cmd_on:
self._state = True
elif cmd == self._cmd_off:
@ -232,7 +238,8 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
else:
self._state = True
def _apply_event_standard(self, event):
def _apply_event_standard(self, event: rfxtrxmod.RFXtrxEvent):
assert isinstance(event, (rfxtrxmod.SensorEvent, rfxtrxmod.ControlEvent))
if event.values.get("Command") in COMMAND_ON_LIST:
self._state = True
elif event.values.get("Command") in COMMAND_OFF_LIST:
@ -242,7 +249,7 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
elif event.values.get("Sensor Status") in SENSOR_STATUS_OFF:
self._state = False
def _apply_event(self, event):
def _apply_event(self, event: rfxtrxmod.RFXtrxEvent):
"""Apply command from rfxtrx."""
super()._apply_event(event)
if event.device.packettype == DEVICE_PACKET_TYPE_LIGHTING4:
@ -251,7 +258,7 @@ class RfxtrxBinarySensor(RfxtrxEntity, BinarySensorEntity):
self._apply_event_standard(event)
@callback
def _handle_event(self, event, device_id):
def _handle_event(self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple):
"""Check if event applies to me and update."""
if not self._event_applies(event, device_id):
return

View File

@ -1,6 +1,9 @@
"""Config flow for RFXCOM RFXtrx integration."""
from __future__ import annotations
import copy
import os
from typing import TypedDict, cast
import RFXtrx as rfxtrxmod
import serial
@ -22,6 +25,8 @@ from homeassistant.const import (
from homeassistant.core import callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.device_registry import (
DeviceEntry,
DeviceRegistry,
async_entries_for_config_entry,
async_get_registry as async_get_device_registry,
)
@ -30,7 +35,7 @@ from homeassistant.helpers.entity_registry import (
async_get_registry as async_get_entity_registry,
)
from . import DOMAIN, get_device_id, get_rfx_object
from . import DOMAIN, DeviceTuple, get_device_id, get_rfx_object
from .binary_sensor import supported as binary_supported
from .const import (
CONF_AUTOMATIC_ADD,
@ -53,6 +58,13 @@ CONF_EVENT_CODE = "event_code"
CONF_MANUAL_PATH = "Enter Manually"
class DeviceData(TypedDict):
"""Dict data representing a device entry."""
event_code: str
device_id: DeviceTuple
def none_or_int(value, base):
"""Check if strin is one otherwise convert to int."""
if value is None:
@ -63,16 +75,17 @@ def none_or_int(value, base):
class OptionsFlow(config_entries.OptionsFlow):
"""Handle Rfxtrx options."""
_device_registry: DeviceRegistry
_device_entries: list[DeviceEntry]
def __init__(self, config_entry: ConfigEntry) -> None:
"""Initialize rfxtrx options flow."""
self._config_entry = config_entry
self._global_options = None
self._selected_device = None
self._selected_device_entry_id = None
self._selected_device_event_code = None
self._selected_device_object = None
self._device_entries = None
self._device_registry = None
self._selected_device_entry_id: str | None = None
self._selected_device_event_code: str | None = None
self._selected_device_object: rfxtrxmod.RFXtrxEvent | None = None
async def async_step_init(self, user_input=None):
"""Manage the options."""
@ -173,6 +186,8 @@ class OptionsFlow(config_entries.OptionsFlow):
errors = {}
if user_input is not None:
assert self._selected_device_object
assert self._selected_device_event_code
device_id = get_device_id(
self._selected_device_object.device,
data_bits=user_input.get(CONF_DATA_BITS),
@ -399,20 +414,18 @@ class OptionsFlow(config_entries.OptionsFlow):
return data[CONF_EVENT_CODE]
def _get_device_data(self, entry_id):
def _get_device_data(self, entry_id) -> DeviceData:
"""Get event code based on device identifier."""
event_code = None
device_id = None
event_code: str
entry = self._device_registry.async_get(entry_id)
device_id = next(iter(entry.identifiers))[1:]
assert entry
device_id = cast(DeviceTuple, next(iter(entry.identifiers))[1:])
for packet_id, entity_info in self._config_entry.data[CONF_DEVICES].items():
if tuple(entity_info.get(CONF_DEVICE_ID)) == device_id:
event_code = packet_id
event_code = cast(str, packet_id)
break
data = {CONF_EVENT_CODE: event_code, CONF_DEVICE_ID: device_id}
return data
assert event_code
return DeviceData(event_code=event_code, device_id=device_id)
@callback
def update_config_data(self, global_options=None, devices=None):

View File

@ -1,6 +1,10 @@
"""Support for RFXtrx covers."""
from __future__ import annotations
import logging
import RFXtrx as rfxtrxmod
from homeassistant.components.cover import (
SUPPORT_CLOSE,
SUPPORT_CLOSE_TILT,
@ -15,6 +19,7 @@ from homeassistant.core import callback
from . import (
DEFAULT_SIGNAL_REPETITIONS,
DeviceTuple,
RfxtrxCommandEntity,
connect_auto_add,
get_device_id,
@ -33,7 +38,7 @@ from .const import (
_LOGGER = logging.getLogger(__name__)
def supported(event):
def supported(event: rfxtrxmod.RFXtrxEvent):
"""Return whether an event supports cover."""
return event.device.known_to_be_rollershutter
@ -45,7 +50,7 @@ async def async_setup_entry(
):
"""Set up config entry."""
discovery_info = config_entry.data
device_ids = set()
device_ids: set[DeviceTuple] = set()
entities = []
for packet_id, entity_info in discovery_info[CONF_DEVICES].items():
@ -73,7 +78,7 @@ async def async_setup_entry(
async_add_entities(entities)
@callback
def cover_update(event, device_id):
def cover_update(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple) -> None:
"""Handle cover updates from the RFXtrx gateway."""
if not supported(event):
return
@ -81,12 +86,13 @@ async def async_setup_entry(
if device_id in device_ids:
return
device_ids.add(device_id)
device: rfxtrxmod.RFXtrxDevice = event.device
_LOGGER.info(
"Added cover (Device ID: %s Class: %s Sub: %s, Event: %s)",
event.device.id_string.lower(),
event.device.__class__.__name__,
event.device.subtype,
device.id_string.lower(),
device.__class__.__name__,
device.subtype,
"".join(f"{x:02x}" for x in event.data),
)
@ -102,14 +108,16 @@ async def async_setup_entry(
class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
"""Representation of a RFXtrx cover."""
_device: rfxtrxmod.RollerTrolDevice | rfxtrxmod.RfyDevice | rfxtrxmod.LightingDevice
def __init__(
self,
device,
device_id,
signal_repetitions,
event=None,
venetian_blind_mode=None,
):
device: rfxtrxmod.RFXtrxDevice,
device_id: DeviceTuple,
signal_repetitions: int,
event: rfxtrxmod.RFXtrxEvent = None,
venetian_blind_mode: bool | None = None,
) -> None:
"""Initialize the RFXtrx cover device."""
super().__init__(device, device_id, signal_repetitions, event)
self._venetian_blind_mode = venetian_blind_mode
@ -191,8 +199,9 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
self._state = True
self.async_write_ha_state()
def _apply_event(self, event):
def _apply_event(self, event: rfxtrxmod.RFXtrxEvent):
"""Apply command from rfxtrx."""
assert isinstance(event, rfxtrxmod.ControlEvent)
super()._apply_event(event)
if event.values["Command"] in COMMAND_ON_LIST:
self._state = True
@ -200,7 +209,7 @@ class RfxtrxCover(RfxtrxCommandEntity, CoverEntity):
self._state = False
@callback
def _handle_event(self, event, device_id):
def _handle_event(self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple):
"""Check if event applies to me and update."""
if device_id != self._device_id:
return

View File

@ -1,4 +1,6 @@
"""Support for RFXtrx lights."""
from __future__ import annotations
import logging
import RFXtrx as rfxtrxmod
@ -13,6 +15,7 @@ from homeassistant.core import callback
from . import (
DEFAULT_SIGNAL_REPETITIONS,
DeviceTuple,
RfxtrxCommandEntity,
connect_auto_add,
get_device_id,
@ -30,7 +33,7 @@ _LOGGER = logging.getLogger(__name__)
SUPPORT_RFXTRX = SUPPORT_BRIGHTNESS
def supported(event):
def supported(event: rfxtrxmod.RFXtrxEvent):
"""Return whether an event supports light."""
return (
isinstance(event.device, rfxtrxmod.LightingDevice)
@ -45,7 +48,7 @@ async def async_setup_entry(
):
"""Set up config entry."""
discovery_info = config_entry.data
device_ids = set()
device_ids: set[DeviceTuple] = set()
# Add switch from config file
entities = []
@ -72,7 +75,7 @@ async def async_setup_entry(
async_add_entities(entities)
@callback
def light_update(event, device_id):
def light_update(event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple):
"""Handle light updates from the RFXtrx gateway."""
if not supported(event):
return
@ -103,6 +106,7 @@ class RfxtrxLight(RfxtrxCommandEntity, LightEntity):
"""Representation of a RFXtrx light."""
_brightness = 0
_device: rfxtrxmod.LightingDevice
async def async_added_to_hass(self):
"""Restore RFXtrx device state (ON/OFF)."""
@ -149,8 +153,9 @@ class RfxtrxLight(RfxtrxCommandEntity, LightEntity):
self._brightness = 0
self.async_write_ha_state()
def _apply_event(self, event):
def _apply_event(self, event: rfxtrxmod.RFXtrxEvent):
"""Apply command from rfxtrx."""
assert isinstance(event, rfxtrxmod.ControlEvent)
super()._apply_event(event)
if event.values["Command"] in COMMAND_ON_LIST:
self._state = True

View File

@ -238,7 +238,7 @@ async def async_setup_entry(
event.device, data_bits=entity_info.get(CONF_DATA_BITS)
)
for data_type in set(event.values) & set(SENSOR_TYPES_DICT):
data_id = (*device_id, data_type)
data_id = (*device_id, str(data_type))
if data_id in data_ids:
continue
data_ids.add(data_id)

View File

@ -1,4 +1,6 @@
"""Support for RFXtrx switches."""
from __future__ import annotations
import logging
import RFXtrx as rfxtrxmod
@ -10,6 +12,7 @@ from homeassistant.core import callback
from . import (
DEFAULT_SIGNAL_REPETITIONS,
DOMAIN,
DeviceTuple,
RfxtrxCommandEntity,
connect_auto_add,
get_device_id,
@ -44,7 +47,7 @@ async def async_setup_entry(
):
"""Set up config entry."""
discovery_info = config_entry.data
device_ids = set()
device_ids: set[DeviceTuple] = set()
# Add switch from config file
entities = []
@ -79,16 +82,18 @@ async def async_setup_entry(
return
device_ids.add(device_id)
device: rfxtrxmod.RFXtrxDevice = event.device
_LOGGER.info(
"Added switch (Device ID: %s Class: %s Sub: %s, Event: %s)",
event.device.id_string.lower(),
event.device.__class__.__name__,
event.device.subtype,
device.id_string.lower(),
device.__class__.__name__,
device.subtype,
"".join(f"{x:02x}" for x in event.data),
)
entity = RfxtrxSwitch(
event.device, device_id, DEFAULT_SIGNAL_REPETITIONS, event=event
device, device_id, DEFAULT_SIGNAL_REPETITIONS, event=event
)
async_add_entities([entity])
@ -108,8 +113,9 @@ class RfxtrxSwitch(RfxtrxCommandEntity, SwitchEntity):
if old_state is not None:
self._state = old_state.state == STATE_ON
def _apply_event(self, event):
def _apply_event(self, event: rfxtrxmod.RFXtrxEvent) -> None:
"""Apply command from rfxtrx."""
assert isinstance(event, rfxtrxmod.ControlEvent)
super()._apply_event(event)
if event.values["Command"] in COMMAND_ON_LIST:
self._state = True
@ -117,7 +123,9 @@ class RfxtrxSwitch(RfxtrxCommandEntity, SwitchEntity):
self._state = False
@callback
def _handle_event(self, event, device_id):
def _handle_event(
self, event: rfxtrxmod.RFXtrxEvent, device_id: DeviceTuple
) -> None:
"""Check if event applies to me and update."""
if self._event_applies(event, device_id):
self._apply_event(event)