Modbus component refactoring - sensors and switches (#3297)

pull/3391/head
Per Sandström 2016-09-13 22:47:44 +02:00 committed by Johann Kellerman
parent e4f4e91096
commit ca646c08c2
4 changed files with 213 additions and 202 deletions

View File

@ -0,0 +1,61 @@
"""
Support for Modbus Coil sensors.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/binary_sensor.modbus/
"""
import logging
import voluptuous as vol
import homeassistant.components.modbus as modbus
from homeassistant.const import CONF_NAME
from homeassistant.components.binary_sensor import BinarySensorDevice
from homeassistant.helpers import config_validation as cv
from homeassistant.components.sensor import PLATFORM_SCHEMA
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['modbus']
CONF_COIL = "coil"
CONF_COILS = "coils"
CONF_SLAVE = "slave"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_COILS): [{
vol.Required(CONF_COIL): cv.positive_int,
vol.Required(CONF_NAME): cv.string,
vol.Optional(CONF_SLAVE): cv.positive_int
}]
})
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Setup Modbus binary sensors."""
sensors = []
for coil in config.get(CONF_COILS):
sensors.append(ModbusCoilSensor(
coil.get(CONF_NAME),
coil.get(CONF_SLAVE),
coil.get(CONF_COIL)))
add_devices(sensors)
class ModbusCoilSensor(BinarySensorDevice):
"""Modbus coil sensor."""
def __init__(self, name, slave, coil):
"""Initialize the modbus coil sensor."""
self._name = name
self._slave = int(slave) if slave else None
self._coil = int(coil)
self._value = None
@property
def is_on(self):
"""Return the state of the sensor."""
return self._value
def update(self):
"""Update the state of the sensor."""
result = modbus.HUB.read_coils(self._slave, self._coil, 1)
self._value = result.bits[0]

View File

@ -7,8 +7,12 @@ https://home-assistant.io/components/modbus/
import logging
import threading
import voluptuous as vol
from homeassistant.const import (
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP)
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP,
CONF_HOST, CONF_METHOD, CONF_PORT)
import homeassistant.helpers.config_validation as cv
DOMAIN = "modbus"
@ -16,19 +20,33 @@ REQUIREMENTS = ['https://github.com/bashwork/pymodbus/archive/'
'd7fc4f1cc975631e0a9011390e8017f64b612661.zip#pymodbus==1.2.0']
# Type of network
MEDIUM = "type"
CONF_BAUDRATE = "baudrate"
CONF_BYTESIZE = "bytesize"
CONF_STOPBITS = "stopbits"
CONF_TYPE = "type"
CONF_PARITY = "parity"
# if MEDIUM == "serial"
METHOD = "method"
SERIAL_PORT = "port"
BAUDRATE = "baudrate"
STOPBITS = "stopbits"
BYTESIZE = "bytesize"
PARITY = "parity"
SERIAL_SCHEMA = {
vol.Required(CONF_BAUDRATE): cv.positive_int,
vol.Required(CONF_BYTESIZE): vol.Any(5, 6, 7, 8),
vol.Required(CONF_METHOD): vol.Any('rtu', 'ascii'),
vol.Required(CONF_PORT): cv.string,
vol.Required(CONF_PARITY): vol.Any('E', 'O', 'N'),
vol.Required(CONF_STOPBITS): vol.Any(1, 2),
vol.Required(CONF_TYPE): 'serial',
}
ETHERNET_SCHEMA = {
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PORT): cv.positive_int,
vol.Required(CONF_TYPE): vol.Any('tcp', 'udp'),
}
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Any(SERIAL_SCHEMA, ETHERNET_SCHEMA)
}, extra=vol.ALLOW_EXTRA)
# if MEDIUM == "tcp" or "udp"
HOST = "host"
IP_PORT = "port"
_LOGGER = logging.getLogger(__name__)
@ -38,36 +56,41 @@ ATTR_ADDRESS = "address"
ATTR_UNIT = "unit"
ATTR_VALUE = "value"
SERVICE_WRITE_REGISTER_SCHEMA = vol.Schema({
vol.Required(ATTR_UNIT): cv.positive_int,
vol.Required(ATTR_ADDRESS): cv.positive_int,
vol.Required(ATTR_VALUE): cv.positive_int
})
HUB = None
TYPE = None
def setup(hass, config):
"""Setup Modbus component."""
# Modbus connection type
# pylint: disable=global-statement, import-error
global TYPE
TYPE = config[DOMAIN][MEDIUM]
client_type = config[DOMAIN][CONF_TYPE]
# Connect to Modbus network
# pylint: disable=global-statement, import-error
if TYPE == "serial":
if client_type == "serial":
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
client = ModbusClient(method=config[DOMAIN][METHOD],
port=config[DOMAIN][SERIAL_PORT],
baudrate=config[DOMAIN][BAUDRATE],
stopbits=config[DOMAIN][STOPBITS],
bytesize=config[DOMAIN][BYTESIZE],
parity=config[DOMAIN][PARITY])
elif TYPE == "tcp":
client = ModbusClient(method=config[DOMAIN][CONF_METHOD],
port=config[DOMAIN][CONF_PORT],
baudrate=config[DOMAIN][CONF_BAUDRATE],
stopbits=config[DOMAIN][CONF_STOPBITS],
bytesize=config[DOMAIN][CONF_BYTESIZE],
parity=config[DOMAIN][CONF_PARITY])
elif client_type == "tcp":
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
client = ModbusClient(host=config[DOMAIN][HOST],
port=config[DOMAIN][IP_PORT])
elif TYPE == "udp":
client = ModbusClient(host=config[DOMAIN][CONF_HOST],
port=config[DOMAIN][CONF_PORT])
elif client_type == "udp":
from pymodbus.client.sync import ModbusUdpClient as ModbusClient
client = ModbusClient(host=config[DOMAIN][HOST],
port=config[DOMAIN][IP_PORT])
client = ModbusClient(host=config[DOMAIN][CONF_HOST],
port=config[DOMAIN][CONF_PORT])
else:
return False
@ -84,7 +107,8 @@ def setup(hass, config):
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_modbus)
# Register services for modbus
hass.services.register(DOMAIN, SERVICE_WRITE_REGISTER, write_register)
hass.services.register(DOMAIN, SERVICE_WRITE_REGISTER, write_register,
schema=SERVICE_WRITE_REGISTER_SCHEMA)
def write_register(service):
"""Write modbus registers."""
@ -128,39 +152,44 @@ class ModbusHub(object):
def read_coils(self, unit, address, count):
"""Read coils."""
with self._lock:
kwargs = {'unit': unit} if unit else {}
return self._client.read_coils(
address,
count,
unit=unit)
**kwargs)
def read_holding_registers(self, unit, address, count):
"""Read holding registers."""
with self._lock:
kwargs = {'unit': unit} if unit else {}
return self._client.read_holding_registers(
address,
count,
unit=unit)
**kwargs)
def write_coil(self, unit, address, value):
"""Write coil."""
with self._lock:
kwargs = {'unit': unit} if unit else {}
self._client.write_coil(
address,
value,
unit=unit)
**kwargs)
def write_register(self, unit, address, value):
"""Write register."""
with self._lock:
kwargs = {'unit': unit} if unit else {}
self._client.write_register(
address,
value,
unit=unit)
**kwargs)
def write_registers(self, unit, address, values):
"""Write registers."""
with self._lock:
kwargs = {'unit': unit} if unit else {}
self._client.write_registers(
address,
values,
unit=unit)
**kwargs)

View File

@ -1,100 +1,80 @@
"""
Support for Modbus sensors.
Support for Modbus Register sensors.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.modbus/
"""
import logging
import voluptuous as vol
import homeassistant.components.modbus as modbus
from homeassistant.const import (
STATE_OFF, STATE_ON, TEMP_CELSIUS, TEMP_FAHRENHEIT)
CONF_NAME, CONF_OFFSET, CONF_UNIT_OF_MEASUREMENT)
from homeassistant.helpers.entity import Entity
from homeassistant.helpers import config_validation as cv
from homeassistant.components.sensor import PLATFORM_SCHEMA
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['modbus']
CONF_COUNT = "count"
CONF_PRECISION = "precision"
CONF_REGISTER = "register"
CONF_REGISTERS = "registers"
CONF_SCALE = "scale"
CONF_SLAVE = "slave"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_REGISTERS): [{
vol.Required(CONF_NAME): cv.string,
vol.Required(CONF_REGISTER): cv.positive_int,
vol.Optional(CONF_COUNT, default=1): cv.positive_int,
vol.Optional(CONF_OFFSET, default=0): vol.Coerce(float),
vol.Optional(CONF_PRECISION, default=0): cv.positive_int,
vol.Optional(CONF_SCALE, default=1): vol.Coerce(float),
vol.Optional(CONF_SLAVE): cv.positive_int,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string
}]
})
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Setup Modbus devices."""
"""Setup Modbus sensors."""
sensors = []
slave = config.get("slave", None)
if modbus.TYPE == "serial" and not slave:
_LOGGER.error("No slave number provided for serial Modbus")
return False
registers = config.get("registers")
if registers:
for regnum, register in registers.items():
if register.get("name"):
sensors.append(
ModbusSensor(register.get("name"),
slave,
regnum,
None,
register.get("unit"),
scale=register.get("scale", 1),
offset=register.get("offset", 0),
precision=register.get("precision", 0)))
if register.get("bits"):
bits = register.get("bits")
for bitnum, bit in bits.items():
if bit.get("name"):
sensors.append(ModbusSensor(bit.get("name"),
slave,
regnum,
bitnum))
coils = config.get("coils")
if coils:
for coilnum, coil in coils.items():
sensors.append(ModbusSensor(coil.get("name"),
slave,
coilnum,
coil=True))
for register in config.get(CONF_REGISTERS):
sensors.append(ModbusRegisterSensor(
register.get(CONF_NAME),
register.get(CONF_SLAVE),
register.get(CONF_REGISTER),
register.get(CONF_UNIT_OF_MEASUREMENT),
register.get(CONF_COUNT),
register.get(CONF_SCALE),
register.get(CONF_OFFSET),
register.get(CONF_PRECISION)))
add_devices(sensors)
class ModbusSensor(Entity):
"""Representation of a Modbus Sensor."""
class ModbusRegisterSensor(Entity):
"""Modbus resgister sensor."""
# pylint: disable=too-many-arguments, too-many-instance-attributes
def __init__(self, name, slave, register, bit=None, unit=None, coil=False,
scale=1, offset=0, precision=0):
"""Initialize the sensor."""
# pylint: disable=too-many-instance-attributes, too-many-arguments
def __init__(self, name, slave, register, unit_of_measurement, count,
scale, offset, precision):
"""Initialize the modbus register sensor."""
self._name = name
self.slave = int(slave) if slave else 1
self.register = int(register)
self.bit = int(bit) if bit else None
self._value = None
self._unit = unit
self._coil = coil
self._slave = int(slave) if slave else None
self._register = int(register)
self._unit_of_measurement = unit_of_measurement
self._count = int(count)
self._scale = scale
self._offset = offset
self._precision = precision
def __str__(self):
"""Return the name and the state of the sensor."""
return "%s: %s" % (self.name, self.state)
@property
def should_poll(self):
"""Polling needed."""
return True
@property
def unique_id(self):
"""Return a unique id."""
return "MODBUS-SENSOR-{}-{}-{}".format(self.slave,
self.register,
self.bit)
self._value = None
@property
def state(self):
"""Return the state of the sensor."""
if self.bit:
return STATE_ON if self._value else STATE_OFF
else:
return self._value
return self._value
@property
def name(self):
@ -103,28 +83,18 @@ class ModbusSensor(Entity):
@property
def unit_of_measurement(self):
"""Return the unit of measurement of this entity, if any."""
if self._unit == "C":
return TEMP_CELSIUS
elif self._unit == "F":
return TEMP_FAHRENHEIT
else:
return self._unit
"""Return the unit of measurement."""
return self._unit_of_measurement
def update(self):
"""Update the state of the sensor."""
if self._coil:
result = modbus.HUB.read_coils(self.slave, self.register, 1)
self._value = result.bits[0]
else:
result = modbus.HUB.read_holding_registers(
self.slave, self.register, 1)
val = 0
for i, res in enumerate(result.registers):
val += res * (2**(i*16))
if self.bit:
self._value = val & (0x0001 << self.bit)
else:
self._value = format(
self._scale * val + self._offset,
".{}f".format(self._precision))
result = modbus.HUB.read_holding_registers(
self._slave,
self._register,
self._count)
val = 0
for i, res in enumerate(result.registers):
val += res * (2**(i*16))
self._value = format(
self._scale * val + self._offset,
".{}f".format(self._precision))

View File

@ -5,74 +5,51 @@ For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/switch.modbus/
"""
import logging
import voluptuous as vol
import homeassistant.components.modbus as modbus
from homeassistant.const import CONF_NAME
from homeassistant.helpers.entity import ToggleEntity
from homeassistant.helpers import config_validation as cv
from homeassistant.components.sensor import PLATFORM_SCHEMA
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['modbus']
CONF_COIL = "coil"
CONF_COILS = "coils"
CONF_SLAVE = "slave"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_COILS): [{
vol.Required(CONF_COIL): cv.positive_int,
vol.Required(CONF_NAME): cv.string,
vol.Optional(CONF_SLAVE): cv.positive_int,
}]
})
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Read configuration and create Modbus devices."""
switches = []
slave = config.get("slave", None)
if modbus.TYPE == "serial" and not slave:
_LOGGER.error("No slave number provided for serial Modbus")
return False
registers = config.get("registers")
if registers:
for regnum, register in registers.items():
bits = register.get("bits")
for bitnum, bit in bits.items():
if bit.get("name"):
switches.append(ModbusSwitch(bit.get("name"),
slave,
regnum,
bitnum))
coils = config.get("coils")
if coils:
for coilnum, coil in coils.items():
switches.append(ModbusSwitch(coil.get("name"),
slave,
coilnum,
0,
coil=True))
for coil in config.get("coils"):
switches.append(ModbusCoilSwitch(
coil.get(CONF_NAME),
coil.get(CONF_SLAVE),
coil.get(CONF_COIL)))
add_devices(switches)
class ModbusSwitch(ToggleEntity):
class ModbusCoilSwitch(ToggleEntity):
"""Representation of a Modbus switch."""
# pylint: disable=too-many-arguments
def __init__(self, name, slave, register, bit, coil=False):
def __init__(self, name, slave, coil):
"""Initialize the switch."""
self._name = name
self.slave = int(slave) if slave else 1
self.register = int(register)
self.bit = int(bit)
self._coil = coil
self._slave = int(slave) if slave else None
self._coil = int(coil)
self._is_on = None
self.register_value = None
def __str__(self):
"""String representation of Modbus switch."""
return "%s: %s" % (self.name, self.state)
@property
def should_poll(self):
"""Poling needed.
Slaves are not allowed to initiate communication on Modbus networks.
"""
return True
@property
def unique_id(self):
"""Return a unique ID."""
return "MODBUS-SWITCH-{}-{}-{}".format(self.slave,
self.register,
self.bit)
@property
def is_on(self):
@ -86,39 +63,13 @@ class ModbusSwitch(ToggleEntity):
def turn_on(self, **kwargs):
"""Set switch on."""
if self.register_value is None:
self.update()
if self._coil:
modbus.HUB.write_coil(self.slave, self.register, True)
else:
val = self.register_value | (0x0001 << self.bit)
modbus.HUB.write_register(self.slave, self.register, val)
modbus.HUB.write_coil(self._slave, self._coil, True)
def turn_off(self, **kwargs):
"""Set switch off."""
if self.register_value is None:
self.update()
if self._coil:
modbus.HUB.write_coil(self.slave, self.register, False)
else:
val = self.register_value & ~(0x0001 << self.bit)
modbus.HUB.write_register(self.slave, self.register, val)
modbus.HUB.write_coil(self._slave, self._coil, False)
def update(self):
"""Update the state of the switch."""
if self._coil:
result = modbus.HUB.read_coils(self.slave, self.register, 1)
self.register_value = result.bits[0]
self._is_on = self.register_value
else:
result = modbus.HUB.read_holding_registers(
self.slave,
self.register,
1)
val = 0
for i, res in enumerate(result.registers):
val += res * (2**(i*16))
self.register_value = val
self._is_on = (val & (0x0001 << self.bit) > 0)
result = modbus.HUB.read_coils(self._slave, self._coil, 1)
self._is_on = bool(result.bits[0])