core/homeassistant/components/modbus/__init__.py

252 lines
7.6 KiB
Python

"""Support for Modbus."""
import logging
import threading
import voluptuous as vol
from homeassistant.const import (
ATTR_STATE,
CONF_HOST,
CONF_METHOD,
CONF_NAME,
CONF_PORT,
CONF_TIMEOUT,
CONF_TYPE,
EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STOP,
)
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
ATTR_ADDRESS = "address"
ATTR_HUB = "hub"
ATTR_UNIT = "unit"
ATTR_VALUE = "value"
CONF_BAUDRATE = "baudrate"
CONF_BYTESIZE = "bytesize"
CONF_HUB = "hub"
CONF_PARITY = "parity"
CONF_STOPBITS = "stopbits"
DEFAULT_HUB = "default"
DOMAIN = "modbus"
SERVICE_WRITE_COIL = "write_coil"
SERVICE_WRITE_REGISTER = "write_register"
BASE_SCHEMA = vol.Schema({vol.Optional(CONF_NAME, default=DEFAULT_HUB): cv.string})
SERIAL_SCHEMA = BASE_SCHEMA.extend(
{
vol.Required(CONF_BAUDRATE): cv.positive_int,
vol.Required(CONF_BYTESIZE): vol.Any(5, 6, 7, 8),
vol.Required(CONF_METHOD): vol.Any("rtu", "ascii"),
vol.Required(CONF_PORT): cv.string,
vol.Required(CONF_PARITY): vol.Any("E", "O", "N"),
vol.Required(CONF_STOPBITS): vol.Any(1, 2),
vol.Required(CONF_TYPE): "serial",
vol.Optional(CONF_TIMEOUT, default=3): cv.socket_timeout,
}
)
ETHERNET_SCHEMA = BASE_SCHEMA.extend(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PORT): cv.port,
vol.Required(CONF_TYPE): vol.Any("tcp", "udp", "rtuovertcp"),
vol.Optional(CONF_TIMEOUT, default=3): cv.socket_timeout,
}
)
CONFIG_SCHEMA = vol.Schema(
{DOMAIN: vol.All(cv.ensure_list, [vol.Any(SERIAL_SCHEMA, ETHERNET_SCHEMA)])},
extra=vol.ALLOW_EXTRA,
)
SERVICE_WRITE_REGISTER_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_HUB, default=DEFAULT_HUB): cv.string,
vol.Required(ATTR_UNIT): cv.positive_int,
vol.Required(ATTR_ADDRESS): cv.positive_int,
vol.Required(ATTR_VALUE): vol.Any(
cv.positive_int, vol.All(cv.ensure_list, [cv.positive_int])
),
}
)
SERVICE_WRITE_COIL_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_HUB, default=DEFAULT_HUB): cv.string,
vol.Required(ATTR_UNIT): cv.positive_int,
vol.Required(ATTR_ADDRESS): cv.positive_int,
vol.Required(ATTR_STATE): cv.boolean,
}
)
def setup_client(client_config):
"""Set up pymodbus client."""
client_type = client_config[CONF_TYPE]
if client_type == "serial":
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
return ModbusClient(
method=client_config[CONF_METHOD],
port=client_config[CONF_PORT],
baudrate=client_config[CONF_BAUDRATE],
stopbits=client_config[CONF_STOPBITS],
bytesize=client_config[CONF_BYTESIZE],
parity=client_config[CONF_PARITY],
timeout=client_config[CONF_TIMEOUT],
)
if client_type == "rtuovertcp":
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
from pymodbus.transaction import ModbusRtuFramer
return ModbusClient(
host=client_config[CONF_HOST],
port=client_config[CONF_PORT],
framer=ModbusRtuFramer,
timeout=client_config[CONF_TIMEOUT],
)
if client_type == "tcp":
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
return ModbusClient(
host=client_config[CONF_HOST],
port=client_config[CONF_PORT],
timeout=client_config[CONF_TIMEOUT],
)
if client_type == "udp":
from pymodbus.client.sync import ModbusUdpClient as ModbusClient
return ModbusClient(
host=client_config[CONF_HOST],
port=client_config[CONF_PORT],
timeout=client_config[CONF_TIMEOUT],
)
assert False
def setup(hass, config):
"""Set up Modbus component."""
hass.data[DOMAIN] = hub_collect = {}
for client_config in config[DOMAIN]:
client = setup_client(client_config)
name = client_config[CONF_NAME]
hub_collect[name] = ModbusHub(client, name)
_LOGGER.debug("Setting up hub: %s", client_config)
def stop_modbus(event):
"""Stop Modbus service."""
for client in hub_collect.values():
client.close()
def start_modbus(event):
"""Start Modbus service."""
for client in hub_collect.values():
client.connect()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_modbus)
# Register services for modbus
hass.services.register(
DOMAIN,
SERVICE_WRITE_REGISTER,
write_register,
schema=SERVICE_WRITE_REGISTER_SCHEMA,
)
hass.services.register(
DOMAIN, SERVICE_WRITE_COIL, write_coil, schema=SERVICE_WRITE_COIL_SCHEMA
)
def write_register(service):
"""Write Modbus registers."""
unit = int(float(service.data.get(ATTR_UNIT)))
address = int(float(service.data.get(ATTR_ADDRESS)))
value = service.data.get(ATTR_VALUE)
client_name = service.data.get(ATTR_HUB)
if isinstance(value, list):
hub_collect[client_name].write_registers(
unit, address, [int(float(i)) for i in value]
)
else:
hub_collect[client_name].write_register(unit, address, int(float(value)))
def write_coil(service):
"""Write Modbus coil."""
unit = service.data.get(ATTR_UNIT)
address = service.data.get(ATTR_ADDRESS)
state = service.data.get(ATTR_STATE)
client_name = service.data.get(ATTR_HUB)
hub_collect[client_name].write_coil(unit, address, state)
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_modbus)
return True
class ModbusHub:
"""Thread safe wrapper class for pymodbus."""
def __init__(self, modbus_client, name):
"""Initialize the Modbus hub."""
self._client = modbus_client
self._lock = threading.Lock()
self._name = name
@property
def name(self):
"""Return the name of this hub."""
return self._name
def close(self):
"""Disconnect client."""
with self._lock:
self._client.close()
def connect(self):
"""Connect client."""
with self._lock:
self._client.connect()
def read_coils(self, unit, address, count):
"""Read coils."""
with self._lock:
kwargs = {"unit": unit} if unit else {}
return self._client.read_coils(address, count, **kwargs)
def read_input_registers(self, unit, address, count):
"""Read input registers."""
with self._lock:
kwargs = {"unit": unit} if unit else {}
return self._client.read_input_registers(address, count, **kwargs)
def read_holding_registers(self, unit, address, count):
"""Read holding registers."""
with self._lock:
kwargs = {"unit": unit} if unit else {}
return self._client.read_holding_registers(address, count, **kwargs)
def write_coil(self, unit, address, value):
"""Write coil."""
with self._lock:
kwargs = {"unit": unit} if unit else {}
self._client.write_coil(address, value, **kwargs)
def write_register(self, unit, address, value):
"""Write register."""
with self._lock:
kwargs = {"unit": unit} if unit else {}
self._client.write_register(address, value, **kwargs)
def write_registers(self, unit, address, values):
"""Write registers."""
with self._lock:
kwargs = {"unit": unit} if unit else {}
self._client.write_registers(address, values, **kwargs)