Move Hub and Entity to separate module in ADS (#125665)

* Move Hub and Entity to separate module in ADS

* Missed one
pull/125158/head
epenet 2024-09-10 15:02:09 +02:00 committed by GitHub
parent 7f7db4efb6
commit 745a05d984
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 225 additions and 213 deletions

View File

@ -1,12 +1,6 @@
"""Support for Automation Device Specification (ADS).""" """Support for Automation Device Specification (ADS)."""
import asyncio
from asyncio import timeout
from collections import namedtuple
import ctypes
import logging import logging
import struct
import threading
import pyads import pyads
import voluptuous as vol import voluptuous as vol
@ -19,9 +13,10 @@ from homeassistant.const import (
) )
from homeassistant.core import HomeAssistant, ServiceCall from homeassistant.core import HomeAssistant, ServiceCall
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.typing import ConfigType from homeassistant.helpers.typing import ConfigType
from .hub import AdsHub
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
DATA_ADS = "data_ads" DATA_ADS = "data_ads"
@ -166,197 +161,3 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool:
) )
return True return True
# Tuple to hold data needed for notification
NotificationItem = namedtuple( # noqa: PYI024
"NotificationItem", "hnotify huser name plc_datatype callback"
)
class AdsHub:
"""Representation of an ADS connection."""
def __init__(self, ads_client):
"""Initialize the ADS hub."""
self._client = ads_client
self._client.open()
# All ADS devices are registered here
self._devices = []
self._notification_items = {}
self._lock = threading.Lock()
def shutdown(self, *args, **kwargs):
"""Shutdown ADS connection."""
_LOGGER.debug("Shutting down ADS")
for notification_item in self._notification_items.values():
_LOGGER.debug(
"Deleting device notification %d, %d",
notification_item.hnotify,
notification_item.huser,
)
try:
self._client.del_device_notification(
notification_item.hnotify, notification_item.huser
)
except pyads.ADSError as err:
_LOGGER.error(err)
try:
self._client.close()
except pyads.ADSError as err:
_LOGGER.error(err)
def register_device(self, device):
"""Register a new device."""
self._devices.append(device)
def write_by_name(self, name, value, plc_datatype):
"""Write a value to the device."""
with self._lock:
try:
return self._client.write_by_name(name, value, plc_datatype)
except pyads.ADSError as err:
_LOGGER.error("Error writing %s: %s", name, err)
def read_by_name(self, name, plc_datatype):
"""Read a value from the device."""
with self._lock:
try:
return self._client.read_by_name(name, plc_datatype)
except pyads.ADSError as err:
_LOGGER.error("Error reading %s: %s", name, err)
def add_device_notification(self, name, plc_datatype, callback):
"""Add a notification to the ADS devices."""
attr = pyads.NotificationAttrib(ctypes.sizeof(plc_datatype))
with self._lock:
try:
hnotify, huser = self._client.add_device_notification(
name, attr, self._device_notification_callback
)
except pyads.ADSError as err:
_LOGGER.error("Error subscribing to %s: %s", name, err)
else:
hnotify = int(hnotify)
self._notification_items[hnotify] = NotificationItem(
hnotify, huser, name, plc_datatype, callback
)
_LOGGER.debug(
"Added device notification %d for variable %s", hnotify, name
)
def _device_notification_callback(self, notification, name):
"""Handle device notifications."""
contents = notification.contents
hnotify = int(contents.hNotification)
_LOGGER.debug("Received notification %d", hnotify)
# Get dynamically sized data array
data_size = contents.cbSampleSize
data_address = (
ctypes.addressof(contents)
+ pyads.structs.SAdsNotificationHeader.data.offset
)
data = (ctypes.c_ubyte * data_size).from_address(data_address)
# Acquire notification item
with self._lock:
notification_item = self._notification_items.get(hnotify)
if not notification_item:
_LOGGER.error("Unknown device notification handle: %d", hnotify)
return
# Data parsing based on PLC data type
plc_datatype = notification_item.plc_datatype
unpack_formats = {
pyads.PLCTYPE_BYTE: "<b",
pyads.PLCTYPE_INT: "<h",
pyads.PLCTYPE_UINT: "<H",
pyads.PLCTYPE_SINT: "<b",
pyads.PLCTYPE_USINT: "<B",
pyads.PLCTYPE_DINT: "<i",
pyads.PLCTYPE_UDINT: "<I",
pyads.PLCTYPE_WORD: "<H",
pyads.PLCTYPE_DWORD: "<I",
pyads.PLCTYPE_LREAL: "<d",
pyads.PLCTYPE_REAL: "<f",
pyads.PLCTYPE_TOD: "<i", # Treat as DINT
pyads.PLCTYPE_DATE: "<i", # Treat as DINT
pyads.PLCTYPE_DT: "<i", # Treat as DINT
pyads.PLCTYPE_TIME: "<i", # Treat as DINT
}
if plc_datatype == pyads.PLCTYPE_BOOL:
value = bool(struct.unpack("<?", bytearray(data))[0])
elif plc_datatype == pyads.PLCTYPE_STRING:
value = (
bytearray(data).split(b"\x00", 1)[0].decode("utf-8", errors="ignore")
)
elif plc_datatype in unpack_formats:
value = struct.unpack(unpack_formats[plc_datatype], bytearray(data))[0]
else:
value = bytearray(data)
_LOGGER.warning("No callback available for this datatype")
notification_item.callback(notification_item.name, value)
class AdsEntity(Entity):
"""Representation of ADS entity."""
_attr_should_poll = False
def __init__(self, ads_hub, name, ads_var):
"""Initialize ADS binary sensor."""
self._state_dict = {}
self._state_dict[STATE_KEY_STATE] = None
self._ads_hub = ads_hub
self._ads_var = ads_var
self._event = None
self._attr_unique_id = ads_var
self._attr_name = name
async def async_initialize_device(
self, ads_var, plctype, state_key=STATE_KEY_STATE, factor=None
):
"""Register device notification."""
def update(name, value):
"""Handle device notifications."""
_LOGGER.debug("Variable %s changed its value to %d", name, value)
if factor is None:
self._state_dict[state_key] = value
else:
self._state_dict[state_key] = value / factor
asyncio.run_coroutine_threadsafe(async_event_set(), self.hass.loop)
self.schedule_update_ha_state()
async def async_event_set():
"""Set event in async context."""
self._event.set()
self._event = asyncio.Event()
await self.hass.async_add_executor_job(
self._ads_hub.add_device_notification, ads_var, plctype, update
)
try:
async with timeout(10):
await self._event.wait()
except TimeoutError:
_LOGGER.debug("Variable %s: Timeout during first update", ads_var)
@property
def available(self) -> bool:
"""Return False if state has not been updated yet."""
return self._state_dict[STATE_KEY_STATE] is not None

View File

@ -17,7 +17,8 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import CONF_ADS_VAR, DATA_ADS, STATE_KEY_STATE, AdsEntity from . import CONF_ADS_VAR, DATA_ADS, STATE_KEY_STATE
from .entity import AdsEntity
DEFAULT_NAME = "ADS binary sensor" DEFAULT_NAME = "ADS binary sensor"
PLATFORM_SCHEMA = BINARY_SENSOR_PLATFORM_SCHEMA.extend( PLATFORM_SCHEMA = BINARY_SENSOR_PLATFORM_SCHEMA.extend(

View File

@ -26,8 +26,8 @@ from . import (
DATA_ADS, DATA_ADS,
STATE_KEY_POSITION, STATE_KEY_POSITION,
STATE_KEY_STATE, STATE_KEY_STATE,
AdsEntity,
) )
from .entity import AdsEntity
DEFAULT_NAME = "ADS Cover" DEFAULT_NAME = "ADS Cover"

View File

@ -0,0 +1,64 @@
"""Support for Automation Device Specification (ADS)."""
import asyncio
from asyncio import timeout
import logging
from homeassistant.helpers.entity import Entity
from . import STATE_KEY_STATE
_LOGGER = logging.getLogger(__name__)
class AdsEntity(Entity):
"""Representation of ADS entity."""
_attr_should_poll = False
def __init__(self, ads_hub, name, ads_var):
"""Initialize ADS binary sensor."""
self._state_dict = {}
self._state_dict[STATE_KEY_STATE] = None
self._ads_hub = ads_hub
self._ads_var = ads_var
self._event = None
self._attr_unique_id = ads_var
self._attr_name = name
async def async_initialize_device(
self, ads_var, plctype, state_key=STATE_KEY_STATE, factor=None
):
"""Register device notification."""
def update(name, value):
"""Handle device notifications."""
_LOGGER.debug("Variable %s changed its value to %d", name, value)
if factor is None:
self._state_dict[state_key] = value
else:
self._state_dict[state_key] = value / factor
asyncio.run_coroutine_threadsafe(async_event_set(), self.hass.loop)
self.schedule_update_ha_state()
async def async_event_set():
"""Set event in async context."""
self._event.set()
self._event = asyncio.Event()
await self.hass.async_add_executor_job(
self._ads_hub.add_device_notification, ads_var, plctype, update
)
try:
async with timeout(10):
await self._event.wait()
except TimeoutError:
_LOGGER.debug("Variable %s: Timeout during first update", ads_var)
@property
def available(self) -> bool:
"""Return False if state has not been updated yet."""
return self._state_dict[STATE_KEY_STATE] is not None

View File

@ -0,0 +1,151 @@
"""Support for Automation Device Specification (ADS)."""
from collections import namedtuple
import ctypes
import logging
import struct
import threading
import pyads
_LOGGER = logging.getLogger(__name__)
# Tuple to hold data needed for notification
NotificationItem = namedtuple( # noqa: PYI024
"NotificationItem", "hnotify huser name plc_datatype callback"
)
class AdsHub:
"""Representation of an ADS connection."""
def __init__(self, ads_client):
"""Initialize the ADS hub."""
self._client = ads_client
self._client.open()
# All ADS devices are registered here
self._devices = []
self._notification_items = {}
self._lock = threading.Lock()
def shutdown(self, *args, **kwargs):
"""Shutdown ADS connection."""
_LOGGER.debug("Shutting down ADS")
for notification_item in self._notification_items.values():
_LOGGER.debug(
"Deleting device notification %d, %d",
notification_item.hnotify,
notification_item.huser,
)
try:
self._client.del_device_notification(
notification_item.hnotify, notification_item.huser
)
except pyads.ADSError as err:
_LOGGER.error(err)
try:
self._client.close()
except pyads.ADSError as err:
_LOGGER.error(err)
def register_device(self, device):
"""Register a new device."""
self._devices.append(device)
def write_by_name(self, name, value, plc_datatype):
"""Write a value to the device."""
with self._lock:
try:
return self._client.write_by_name(name, value, plc_datatype)
except pyads.ADSError as err:
_LOGGER.error("Error writing %s: %s", name, err)
def read_by_name(self, name, plc_datatype):
"""Read a value from the device."""
with self._lock:
try:
return self._client.read_by_name(name, plc_datatype)
except pyads.ADSError as err:
_LOGGER.error("Error reading %s: %s", name, err)
def add_device_notification(self, name, plc_datatype, callback):
"""Add a notification to the ADS devices."""
attr = pyads.NotificationAttrib(ctypes.sizeof(plc_datatype))
with self._lock:
try:
hnotify, huser = self._client.add_device_notification(
name, attr, self._device_notification_callback
)
except pyads.ADSError as err:
_LOGGER.error("Error subscribing to %s: %s", name, err)
else:
hnotify = int(hnotify)
self._notification_items[hnotify] = NotificationItem(
hnotify, huser, name, plc_datatype, callback
)
_LOGGER.debug(
"Added device notification %d for variable %s", hnotify, name
)
def _device_notification_callback(self, notification, name):
"""Handle device notifications."""
contents = notification.contents
hnotify = int(contents.hNotification)
_LOGGER.debug("Received notification %d", hnotify)
# Get dynamically sized data array
data_size = contents.cbSampleSize
data_address = (
ctypes.addressof(contents)
+ pyads.structs.SAdsNotificationHeader.data.offset
)
data = (ctypes.c_ubyte * data_size).from_address(data_address)
# Acquire notification item
with self._lock:
notification_item = self._notification_items.get(hnotify)
if not notification_item:
_LOGGER.error("Unknown device notification handle: %d", hnotify)
return
# Data parsing based on PLC data type
plc_datatype = notification_item.plc_datatype
unpack_formats = {
pyads.PLCTYPE_BYTE: "<b",
pyads.PLCTYPE_INT: "<h",
pyads.PLCTYPE_UINT: "<H",
pyads.PLCTYPE_SINT: "<b",
pyads.PLCTYPE_USINT: "<B",
pyads.PLCTYPE_DINT: "<i",
pyads.PLCTYPE_UDINT: "<I",
pyads.PLCTYPE_WORD: "<H",
pyads.PLCTYPE_DWORD: "<I",
pyads.PLCTYPE_LREAL: "<d",
pyads.PLCTYPE_REAL: "<f",
pyads.PLCTYPE_TOD: "<i", # Treat as DINT
pyads.PLCTYPE_DATE: "<i", # Treat as DINT
pyads.PLCTYPE_DT: "<i", # Treat as DINT
pyads.PLCTYPE_TIME: "<i", # Treat as DINT
}
if plc_datatype == pyads.PLCTYPE_BOOL:
value = bool(struct.unpack("<?", bytearray(data))[0])
elif plc_datatype == pyads.PLCTYPE_STRING:
value = (
bytearray(data).split(b"\x00", 1)[0].decode("utf-8", errors="ignore")
)
elif plc_datatype in unpack_formats:
value = struct.unpack(unpack_formats[plc_datatype], bytearray(data))[0]
else:
value = bytearray(data)
_LOGGER.warning("No callback available for this datatype")
notification_item.callback(notification_item.name, value)

View File

@ -25,8 +25,8 @@ from . import (
DATA_ADS, DATA_ADS,
STATE_KEY_BRIGHTNESS, STATE_KEY_BRIGHTNESS,
STATE_KEY_STATE, STATE_KEY_STATE,
AdsEntity,
) )
from .entity import AdsEntity
DEFAULT_NAME = "ADS Light" DEFAULT_NAME = "ADS Light"
PLATFORM_SCHEMA = LIGHT_PLATFORM_SCHEMA.extend( PLATFORM_SCHEMA = LIGHT_PLATFORM_SCHEMA.extend(

View File

@ -15,14 +15,8 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType, StateType from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType, StateType
from .. import ads from .. import ads
from . import ( from . import ADS_TYPEMAP, CONF_ADS_FACTOR, CONF_ADS_TYPE, CONF_ADS_VAR, STATE_KEY_STATE
ADS_TYPEMAP, from .entity import AdsEntity
CONF_ADS_FACTOR,
CONF_ADS_TYPE,
CONF_ADS_VAR,
STATE_KEY_STATE,
AdsEntity,
)
DEFAULT_NAME = "ADS sensor" DEFAULT_NAME = "ADS sensor"
PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend( PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(

View File

@ -17,7 +17,8 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import CONF_ADS_VAR, DATA_ADS, STATE_KEY_STATE, AdsEntity from . import CONF_ADS_VAR, DATA_ADS, STATE_KEY_STATE
from .entity import AdsEntity
DEFAULT_NAME = "ADS Switch" DEFAULT_NAME = "ADS Switch"