core/homeassistant/components/velbus/cover.py

152 lines
4.5 KiB
Python
Raw Normal View History

"""Support for Velbus covers."""
import logging
import time
import voluptuous as vol
from homeassistant.components.cover import (
PLATFORM_SCHEMA, SUPPORT_CLOSE, SUPPORT_OPEN, SUPPORT_STOP, CoverDevice)
from homeassistant.const import CONF_COVERS, CONF_NAME
import homeassistant.helpers.config_validation as cv
from . import DOMAIN
_LOGGER = logging.getLogger(__name__)
COVER_SCHEMA = vol.Schema({
vol.Required('module'): cv.positive_int,
vol.Required('open_channel'): cv.positive_int,
vol.Required('close_channel'): cv.positive_int,
vol.Required(CONF_NAME): cv.string
})
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_COVERS): cv.schema_with_slug_keys(COVER_SCHEMA),
})
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up cover controlled by Velbus."""
devices = config.get(CONF_COVERS, {})
covers = []
velbus = hass.data[DOMAIN]
for device_name, device_config in devices.items():
covers.append(
VelbusCover(
velbus,
device_config.get(CONF_NAME, device_name),
device_config.get('module'),
device_config.get('open_channel'),
device_config.get('close_channel')
)
)
if not covers:
_LOGGER.error("No covers added")
return False
add_entities(covers)
class VelbusCover(CoverDevice):
"""Representation a Velbus cover."""
def __init__(self, velbus, name, module, open_channel, close_channel):
"""Initialize the cover."""
self._velbus = velbus
self._name = name
self._close_channel_state = None
self._open_channel_state = None
self._module = module
self._open_channel = open_channel
self._close_channel = close_channel
async def async_added_to_hass(self):
"""Add listener for Velbus messages on bus."""
def _init_velbus():
"""Initialize Velbus on startup."""
self._velbus.subscribe(self._on_message)
self.get_status()
await self.hass.async_add_job(_init_velbus)
def _on_message(self, message):
import velbus
if isinstance(message, velbus.RelayStatusMessage):
if message.address == self._module:
if message.channel == self._close_channel:
self._close_channel_state = message.is_on()
self.schedule_update_ha_state()
if message.channel == self._open_channel:
self._open_channel_state = message.is_on()
self.schedule_update_ha_state()
@property
def supported_features(self):
"""Flag supported features."""
return SUPPORT_OPEN | SUPPORT_CLOSE | SUPPORT_STOP
@property
def should_poll(self):
"""Disable polling."""
return False
@property
def name(self):
"""Return the name of the cover."""
return self._name
@property
def is_closed(self):
"""Return if the cover is closed."""
return self._close_channel_state
@property
def current_cover_position(self):
"""Return current position of cover.
None is unknown.
"""
return None
def _relay_off(self, channel):
import velbus
message = velbus.SwitchRelayOffMessage()
message.set_defaults(self._module)
message.relay_channels = [channel]
self._velbus.send(message)
def _relay_on(self, channel):
import velbus
message = velbus.SwitchRelayOnMessage()
message.set_defaults(self._module)
message.relay_channels = [channel]
self._velbus.send(message)
def open_cover(self, **kwargs):
"""Open the cover."""
self._relay_off(self._close_channel)
time.sleep(0.3)
self._relay_on(self._open_channel)
def close_cover(self, **kwargs):
"""Close the cover."""
self._relay_off(self._open_channel)
time.sleep(0.3)
self._relay_on(self._close_channel)
def stop_cover(self, **kwargs):
"""Stop the cover."""
self._relay_off(self._open_channel)
time.sleep(0.3)
self._relay_off(self._close_channel)
def get_status(self):
"""Retrieve current status."""
import velbus
message = velbus.ModuleStatusRequestMessage()
message.set_defaults(self._module)
message.channels = [self._open_channel, self._close_channel]
self._velbus.send(message)