"""Support for Automation Device Specification (ADS).""" import asyncio from collections import namedtuple import ctypes import logging import struct import threading import async_timeout import pyads import voluptuous as vol from homeassistant.const import ( CONF_DEVICE, CONF_IP_ADDRESS, CONF_PORT, EVENT_HOMEASSISTANT_STOP, ) from homeassistant.core import HomeAssistant, ServiceCall import homeassistant.helpers.config_validation as cv from homeassistant.helpers.entity import Entity from homeassistant.helpers.typing import ConfigType _LOGGER = logging.getLogger(__name__) DATA_ADS = "data_ads" # Supported Types ADSTYPE_BOOL = "bool" ADSTYPE_BYTE = "byte" ADSTYPE_DINT = "dint" ADSTYPE_INT = "int" ADSTYPE_UDINT = "udint" ADSTYPE_UINT = "uint" ADS_TYPEMAP = { ADSTYPE_BOOL: pyads.PLCTYPE_BOOL, ADSTYPE_BYTE: pyads.PLCTYPE_BYTE, ADSTYPE_DINT: pyads.PLCTYPE_DINT, ADSTYPE_INT: pyads.PLCTYPE_INT, ADSTYPE_UDINT: pyads.PLCTYPE_UDINT, ADSTYPE_UINT: pyads.PLCTYPE_UINT, } CONF_ADS_FACTOR = "factor" CONF_ADS_TYPE = "adstype" CONF_ADS_VALUE = "value" CONF_ADS_VAR = "adsvar" CONF_ADS_VAR_BRIGHTNESS = "adsvar_brightness" CONF_ADS_VAR_POSITION = "adsvar_position" STATE_KEY_STATE = "state" STATE_KEY_BRIGHTNESS = "brightness" STATE_KEY_POSITION = "position" DOMAIN = "ads" SERVICE_WRITE_DATA_BY_NAME = "write_data_by_name" CONFIG_SCHEMA = vol.Schema( { DOMAIN: vol.Schema( { vol.Required(CONF_DEVICE): cv.string, vol.Required(CONF_PORT): cv.port, vol.Optional(CONF_IP_ADDRESS): cv.string, } ) }, extra=vol.ALLOW_EXTRA, ) SCHEMA_SERVICE_WRITE_DATA_BY_NAME = vol.Schema( { vol.Required(CONF_ADS_TYPE): vol.In( [ ADSTYPE_INT, ADSTYPE_UINT, ADSTYPE_BYTE, ADSTYPE_BOOL, ADSTYPE_DINT, ADSTYPE_UDINT, ] ), vol.Required(CONF_ADS_VALUE): vol.Coerce(int), vol.Required(CONF_ADS_VAR): cv.string, } ) def setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the ADS component.""" conf = config[DOMAIN] net_id = conf[CONF_DEVICE] ip_address = conf.get(CONF_IP_ADDRESS) port = conf[CONF_PORT] client = pyads.Connection(net_id, port, ip_address) try: ads = AdsHub(client) except pyads.ADSError: _LOGGER.error( "Could not connect to ADS host (netid=%s, ip=%s, port=%s)", net_id, ip_address, port, ) return False hass.data[DATA_ADS] = ads hass.bus.listen(EVENT_HOMEASSISTANT_STOP, ads.shutdown) def handle_write_data_by_name(call: ServiceCall) -> None: """Write a value to the connected ADS device.""" ads_var = call.data[CONF_ADS_VAR] ads_type = call.data[CONF_ADS_TYPE] value = call.data[CONF_ADS_VALUE] try: ads.write_by_name(ads_var, value, ADS_TYPEMAP[ads_type]) except pyads.ADSError as err: _LOGGER.error(err) hass.services.register( DOMAIN, SERVICE_WRITE_DATA_BY_NAME, handle_write_data_by_name, schema=SCHEMA_SERVICE_WRITE_DATA_BY_NAME, ) return True # Tuple to hold data needed for notification NotificationItem = namedtuple( "NotificationItem", "hnotify huser name plc_datatype callback" ) class AdsHub: """Representation of an ADS connection.""" def __init__(self, ads_client): """Initialize the ADS hub.""" self._client = ads_client self._client.open() # All ADS devices are registered here self._devices = [] self._notification_items = {} self._lock = threading.Lock() def shutdown(self, *args, **kwargs): """Shutdown ADS connection.""" _LOGGER.debug("Shutting down ADS") for notification_item in self._notification_items.values(): _LOGGER.debug( "Deleting device notification %d, %d", notification_item.hnotify, notification_item.huser, ) try: self._client.del_device_notification( notification_item.hnotify, notification_item.huser ) except pyads.ADSError as err: _LOGGER.error(err) try: self._client.close() except pyads.ADSError as err: _LOGGER.error(err) def register_device(self, device): """Register a new device.""" self._devices.append(device) def write_by_name(self, name, value, plc_datatype): """Write a value to the device.""" with self._lock: try: return self._client.write_by_name(name, value, plc_datatype) except pyads.ADSError as err: _LOGGER.error("Error writing %s: %s", name, err) def read_by_name(self, name, plc_datatype): """Read a value from the device.""" with self._lock: try: return self._client.read_by_name(name, plc_datatype) except pyads.ADSError as err: _LOGGER.error("Error reading %s: %s", name, err) def add_device_notification(self, name, plc_datatype, callback): """Add a notification to the ADS devices.""" attr = pyads.NotificationAttrib(ctypes.sizeof(plc_datatype)) with self._lock: try: hnotify, huser = self._client.add_device_notification( name, attr, self._device_notification_callback ) except pyads.ADSError as err: _LOGGER.error("Error subscribing to %s: %s", name, err) else: hnotify = int(hnotify) self._notification_items[hnotify] = NotificationItem( hnotify, huser, name, plc_datatype, callback ) _LOGGER.debug( "Added device notification %d for variable %s", hnotify, name ) def _device_notification_callback(self, notification, name): """Handle device notifications.""" contents = notification.contents hnotify = int(contents.hNotification) _LOGGER.debug("Received notification %d", hnotify) # get dynamically sized data array data_size = contents.cbSampleSize data = (ctypes.c_ubyte * data_size).from_address( ctypes.addressof(contents) + pyads.structs.SAdsNotificationHeader.data.offset ) try: with self._lock: notification_item = self._notification_items[hnotify] except KeyError: _LOGGER.error("Unknown device notification handle: %d", hnotify) return # Parse data to desired datatype if notification_item.plc_datatype == pyads.PLCTYPE_BOOL: value = bool(struct.unpack(" bool: """Return False if state has not been updated yet.""" return self._state_dict[STATE_KEY_STATE] is not None