Improve TDBU motion blinds control (#44500)

* improve TDBU motion blinds control

* Simplify service registration
pull/44600/head
starkillerOG 2020-12-28 22:09:53 +01:00 committed by GitHub
parent 0bc04a6501
commit a22d9e54db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 131 additions and 10 deletions

View File

@ -5,28 +5,65 @@ import logging
from socket import timeout
from motionblinds import MotionMulticast
import voluptuous as vol
from homeassistant import config_entries, core
from homeassistant.const import CONF_API_KEY, CONF_HOST, EVENT_HOMEASSISTANT_STOP
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_API_KEY,
CONF_HOST,
EVENT_HOMEASSISTANT_STOP,
)
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers import config_validation as cv, device_registry as dr
from homeassistant.helpers.dispatcher import dispatcher_send
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import (
ATTR_ABSOLUTE_POSITION,
ATTR_WIDTH,
DOMAIN,
KEY_COORDINATOR,
KEY_GATEWAY,
KEY_MULTICAST_LISTENER,
MANUFACTURER,
MOTION_PLATFORMS,
SERVICE_SET_ABSOLUTE_POSITION,
)
from .gateway import ConnectMotionGateway
_LOGGER = logging.getLogger(__name__)
CALL_SCHEMA = vol.Schema({vol.Required(ATTR_ENTITY_ID): cv.comp_entity_ids})
async def async_setup(hass: core.HomeAssistant, config: dict):
SET_ABSOLUTE_POSITION_SCHEMA = CALL_SCHEMA.extend(
{
vol.Required(ATTR_ABSOLUTE_POSITION): vol.All(
cv.positive_int, vol.Range(max=100)
),
vol.Optional(ATTR_WIDTH): vol.All(cv.positive_int, vol.Range(max=100)),
}
)
SERVICE_TO_METHOD = {
SERVICE_SET_ABSOLUTE_POSITION: {
"schema": SET_ABSOLUTE_POSITION_SCHEMA,
}
}
def setup(hass: core.HomeAssistant, config: dict):
"""Set up the Motion Blinds component."""
def service_handler(service):
data = service.data.copy()
data["method"] = service.service
dispatcher_send(hass, DOMAIN, data)
for service in SERVICE_TO_METHOD:
schema = SERVICE_TO_METHOD[service]["schema"]
hass.services.register(DOMAIN, service, service_handler, schema=schema)
return True

View File

@ -8,3 +8,8 @@ MOTION_PLATFORMS = ["cover", "sensor"]
KEY_GATEWAY = "gateway"
KEY_COORDINATOR = "coordinator"
KEY_MULTICAST_LISTENER = "multicast_listener"
ATTR_WIDTH = "width"
ATTR_ABSOLUTE_POSITION = "absolute_position"
SERVICE_SET_ABSOLUTE_POSITION = "set_absolute_position"

View File

@ -15,10 +15,19 @@ from homeassistant.components.cover import (
DEVICE_CLASS_SHUTTER,
CoverEntity,
)
from homeassistant.const import ATTR_ENTITY_ID, ENTITY_MATCH_ALL, ENTITY_MATCH_NONE
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN, KEY_COORDINATOR, KEY_GATEWAY, MANUFACTURER
from .const import (
ATTR_ABSOLUTE_POSITION,
ATTR_WIDTH,
DOMAIN,
KEY_COORDINATOR,
KEY_GATEWAY,
MANUFACTURER,
)
_LOGGER = logging.getLogger(__name__)
@ -85,6 +94,15 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
"Bottom",
)
)
entities.append(
MotionTDBUDevice(
coordinator,
blind,
TDBU_DEVICE_MAP[blind.type],
config_entry,
"Combined",
)
)
else:
_LOGGER.warning("Blind type '%s' not yet supported", blind.blind_type)
@ -158,10 +176,28 @@ class MotionPositionDevice(CoordinatorEntity, CoverEntity):
self.schedule_update_ha_state(force_refresh=False)
async def async_added_to_hass(self):
"""Subscribe to multicast pushes."""
"""Subscribe to multicast pushes and register signal handler."""
self._blind.Register_callback(self.unique_id, self._push_callback)
self.async_on_remove(
async_dispatcher_connect(self.hass, DOMAIN, self.signal_handler)
)
await super().async_added_to_hass()
def signal_handler(self, data):
"""Handle domain-specific signal by calling appropriate method."""
entity_ids = data[ATTR_ENTITY_ID]
if entity_ids == ENTITY_MATCH_NONE:
return
if entity_ids == ENTITY_MATCH_ALL or self.entity_id in entity_ids:
params = {
key: value
for key, value in data.items()
if key not in ["entity_id", "method"]
}
getattr(self, data["method"])(**params)
async def async_will_remove_from_hass(self):
"""Unsubscribe when removed."""
self._blind.Remove_callback(self.unique_id)
@ -180,6 +216,11 @@ class MotionPositionDevice(CoordinatorEntity, CoverEntity):
position = kwargs[ATTR_POSITION]
self._blind.Set_position(100 - position)
def set_absolute_position(self, **kwargs):
"""Move the cover to a specific absolute position (see TDBU)."""
position = kwargs[ATTR_ABSOLUTE_POSITION]
self._blind.Set_position(100 - position)
def stop_cover(self, **kwargs):
"""Stop the cover."""
self._blind.Stop()
@ -226,7 +267,7 @@ class MotionTDBUDevice(MotionPositionDevice):
self._motor = motor
self._motor_key = motor[0]
if self._motor not in ["Bottom", "Top"]:
if self._motor not in ["Bottom", "Top", "Combined"]:
_LOGGER.error("Unknown motor '%s'", self._motor)
@property
@ -246,10 +287,10 @@ class MotionTDBUDevice(MotionPositionDevice):
None is unknown, 0 is open, 100 is closed.
"""
if self._blind.position is None:
if self._blind.scaled_position is None:
return None
return 100 - self._blind.position[self._motor_key]
return 100 - self._blind.scaled_position[self._motor_key]
@property
def is_closed(self):
@ -257,8 +298,23 @@ class MotionTDBUDevice(MotionPositionDevice):
if self._blind.position is None:
return None
if self._motor == "Combined":
return self._blind.width == 100
return self._blind.position[self._motor_key] == 100
@property
def device_state_attributes(self):
"""Return device specific state attributes."""
attributes = {}
if self._blind.position is not None:
attributes[ATTR_ABSOLUTE_POSITION] = (
100 - self._blind.position[self._motor_key]
)
if self._blind.width is not None:
attributes[ATTR_WIDTH] = self._blind.width
return attributes
def open_cover(self, **kwargs):
"""Open the cover."""
self._blind.Open(motor=self._motor_key)
@ -268,9 +324,18 @@ class MotionTDBUDevice(MotionPositionDevice):
self._blind.Close(motor=self._motor_key)
def set_cover_position(self, **kwargs):
"""Move the cover to a specific position."""
"""Move the cover to a specific scaled position."""
position = kwargs[ATTR_POSITION]
self._blind.Set_position(100 - position, motor=self._motor_key)
self._blind.Set_scaled_position(100 - position, motor=self._motor_key)
def set_absolute_position(self, **kwargs):
"""Move the cover to a specific absolute position."""
position = kwargs[ATTR_ABSOLUTE_POSITION]
target_width = kwargs.get(ATTR_WIDTH, None)
self._blind.Set_position(
100 - position, motor=self._motor_key, width=target_width
)
def stop_cover(self, **kwargs):
"""Stop the cover."""

View File

@ -0,0 +1,14 @@
# Describes the format for available motion blinds services
set_absolute_position:
description: "Set the absolute position of the cover."
fields:
entity_id:
description: Name of the motion blind cover entity to control.
example: "cover.TopDownBottomUp-Bottom-0001"
absolute_position:
description: Absolute position to move to.
example: 70
width:
description: Optionally specify the width that is covered, only for TDBU Combined entities.
example: 30