core/homeassistant/components/modbus.py

167 lines
4.6 KiB
Python

"""
Support for Modbus.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/modbus/
"""
import logging
import threading
from homeassistant.const import (
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP)
DOMAIN = "modbus"
REQUIREMENTS = ['https://github.com/bashwork/pymodbus/archive/'
'd7fc4f1cc975631e0a9011390e8017f64b612661.zip#pymodbus==1.2.0']
# Type of network
MEDIUM = "type"
# if MEDIUM == "serial"
METHOD = "method"
SERIAL_PORT = "port"
BAUDRATE = "baudrate"
STOPBITS = "stopbits"
BYTESIZE = "bytesize"
PARITY = "parity"
# if MEDIUM == "tcp" or "udp"
HOST = "host"
IP_PORT = "port"
_LOGGER = logging.getLogger(__name__)
SERVICE_WRITE_REGISTER = "write_register"
ATTR_ADDRESS = "address"
ATTR_UNIT = "unit"
ATTR_VALUE = "value"
HUB = None
TYPE = None
def setup(hass, config):
"""Setup Modbus component."""
# Modbus connection type
# pylint: disable=global-statement, import-error
global TYPE
TYPE = config[DOMAIN][MEDIUM]
# Connect to Modbus network
# pylint: disable=global-statement, import-error
if TYPE == "serial":
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
client = ModbusClient(method=config[DOMAIN][METHOD],
port=config[DOMAIN][SERIAL_PORT],
baudrate=config[DOMAIN][BAUDRATE],
stopbits=config[DOMAIN][STOPBITS],
bytesize=config[DOMAIN][BYTESIZE],
parity=config[DOMAIN][PARITY])
elif TYPE == "tcp":
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
client = ModbusClient(host=config[DOMAIN][HOST],
port=config[DOMAIN][IP_PORT])
elif TYPE == "udp":
from pymodbus.client.sync import ModbusUdpClient as ModbusClient
client = ModbusClient(host=config[DOMAIN][HOST],
port=config[DOMAIN][IP_PORT])
else:
return False
global HUB
HUB = ModbusHub(client)
def stop_modbus(event):
"""Stop Modbus service."""
HUB.close()
def start_modbus(event):
"""Start Modbus service."""
HUB.connect()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_modbus)
# Register services for modbus
hass.services.register(DOMAIN, SERVICE_WRITE_REGISTER, write_register)
def write_register(service):
"""Write modbus registers."""
unit = int(float(service.data.get(ATTR_UNIT)))
address = int(float(service.data.get(ATTR_ADDRESS)))
value = service.data.get(ATTR_VALUE)
if isinstance(value, list):
HUB.write_registers(
unit,
address,
[int(float(i)) for i in value])
else:
HUB.write_register(
unit,
address,
int(float(value)))
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_modbus)
return True
class ModbusHub(object):
"""Thread safe wrapper class for pymodbus."""
def __init__(self, modbus_client):
"""Initialize the modbus hub."""
self._client = modbus_client
self._lock = threading.Lock()
def close(self):
"""Disconnect client."""
with self._lock:
self._client.close()
def connect(self):
"""Connect client."""
with self._lock:
self._client.connect()
def read_coils(self, unit, address, count):
"""Read coils."""
with self._lock:
return self._client.read_coils(
address,
count,
unit=unit)
def read_holding_registers(self, unit, address, count):
"""Read holding registers."""
with self._lock:
return self._client.read_holding_registers(
address,
count,
unit=unit)
def write_coil(self, unit, address, value):
"""Write coil."""
with self._lock:
self._client.write_coil(
address,
value,
unit=unit)
def write_register(self, unit, address, value):
"""Write register."""
with self._lock:
self._client.write_register(
address,
value,
unit=unit)
def write_registers(self, unit, address, values):
"""Write registers."""
with self._lock:
self._client.write_registers(
address,
values,
unit=unit)