core/homeassistant/components/knx/cover.py

231 lines
8.5 KiB
Python

"""Support for KNX/IP covers."""
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime
from typing import Any
from xknx import XKNX
from xknx.devices import Cover as XknxCover, Device as XknxDevice
from xknx.telegram.address import parse_device_group_address
from homeassistant.components.cover import (
ATTR_POSITION,
ATTR_TILT_POSITION,
DEVICE_CLASS_BLIND,
SUPPORT_CLOSE,
SUPPORT_CLOSE_TILT,
SUPPORT_OPEN,
SUPPORT_OPEN_TILT,
SUPPORT_SET_POSITION,
SUPPORT_SET_TILT_POSITION,
SUPPORT_STOP,
SUPPORT_STOP_TILT,
CoverEntity,
)
from homeassistant.const import CONF_DEVICE_CLASS, CONF_NAME
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import async_track_utc_time_change
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from .const import DOMAIN
from .knx_entity import KnxEntity
from .schema import CoverSchema
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up cover(s) for KNX platform."""
if not discovery_info or not discovery_info["platform_config"]:
return
platform_config = discovery_info["platform_config"]
xknx: XKNX = hass.data[DOMAIN].xknx
_async_migrate_unique_id(hass, platform_config)
async_add_entities(
KNXCover(xknx, entity_config) for entity_config in platform_config
)
@callback
def _async_migrate_unique_id(
hass: HomeAssistant, platform_config: list[ConfigType]
) -> None:
"""Change unique_ids used in 2021.4 to include position_target GA."""
entity_registry = er.async_get(hass)
for entity_config in platform_config:
# normalize group address strings - ga_updown was the old uid but is optional
updown_addresses = entity_config.get(CoverSchema.CONF_MOVE_LONG_ADDRESS)
if updown_addresses is None:
continue
ga_updown = parse_device_group_address(updown_addresses[0])
old_uid = str(ga_updown)
entity_id = entity_registry.async_get_entity_id("cover", DOMAIN, old_uid)
if entity_id is None:
continue
position_target_addresses = entity_config.get(CoverSchema.CONF_POSITION_ADDRESS)
ga_position_target = (
parse_device_group_address(position_target_addresses[0])
if position_target_addresses is not None
else None
)
new_uid = f"{ga_updown}_{ga_position_target}"
entity_registry.async_update_entity(entity_id, new_unique_id=new_uid)
class KNXCover(KnxEntity, CoverEntity):
"""Representation of a KNX cover."""
_device: XknxCover
def __init__(self, xknx: XKNX, config: ConfigType) -> None:
"""Initialize the cover."""
super().__init__(
device=XknxCover(
xknx,
name=config[CONF_NAME],
group_address_long=config.get(CoverSchema.CONF_MOVE_LONG_ADDRESS),
group_address_short=config.get(CoverSchema.CONF_MOVE_SHORT_ADDRESS),
group_address_stop=config.get(CoverSchema.CONF_STOP_ADDRESS),
group_address_position_state=config.get(
CoverSchema.CONF_POSITION_STATE_ADDRESS
),
group_address_angle=config.get(CoverSchema.CONF_ANGLE_ADDRESS),
group_address_angle_state=config.get(
CoverSchema.CONF_ANGLE_STATE_ADDRESS
),
group_address_position=config.get(CoverSchema.CONF_POSITION_ADDRESS),
travel_time_down=config[CoverSchema.CONF_TRAVELLING_TIME_DOWN],
travel_time_up=config[CoverSchema.CONF_TRAVELLING_TIME_UP],
invert_position=config[CoverSchema.CONF_INVERT_POSITION],
invert_angle=config[CoverSchema.CONF_INVERT_ANGLE],
)
)
self._unsubscribe_auto_updater: Callable[[], None] | None = None
self._attr_device_class = config.get(CONF_DEVICE_CLASS) or (
DEVICE_CLASS_BLIND if self._device.supports_angle else None
)
self._attr_supported_features = (
SUPPORT_CLOSE | SUPPORT_OPEN | SUPPORT_SET_POSITION
)
if self._device.supports_stop:
self._attr_supported_features |= SUPPORT_STOP | SUPPORT_STOP_TILT
if self._device.supports_angle:
self._attr_supported_features |= SUPPORT_SET_TILT_POSITION
if self._device.step.writable:
self._attr_supported_features |= (
SUPPORT_CLOSE_TILT | SUPPORT_OPEN_TILT | SUPPORT_STOP_TILT
)
self._attr_unique_id = (
f"{self._device.updown.group_address}_"
f"{self._device.position_target.group_address}"
)
@callback
async def after_update_callback(self, device: XknxDevice) -> None:
"""Call after device was updated."""
self.async_write_ha_state()
if self._device.is_traveling():
self.start_auto_updater()
@property
def current_cover_position(self) -> int | None:
"""Return the current position of the cover.
None is unknown, 0 is closed, 100 is fully open.
"""
# In KNX 0 is open, 100 is closed.
pos = self._device.current_position()
return 100 - pos if pos is not None else None
@property
def is_closed(self) -> bool | None:
"""Return if the cover is closed."""
# state shall be "unknown" when xknx travelcalculator is not initialized
if self._device.current_position() is None:
return None
return self._device.is_closed()
@property
def is_opening(self) -> bool:
"""Return if the cover is opening or not."""
return self._device.is_opening()
@property
def is_closing(self) -> bool:
"""Return if the cover is closing or not."""
return self._device.is_closing()
async def async_close_cover(self, **kwargs: Any) -> None:
"""Close the cover."""
await self._device.set_down()
async def async_open_cover(self, **kwargs: Any) -> None:
"""Open the cover."""
await self._device.set_up()
async def async_set_cover_position(self, **kwargs: Any) -> None:
"""Move the cover to a specific position."""
knx_position = 100 - kwargs[ATTR_POSITION]
await self._device.set_position(knx_position)
async def async_stop_cover(self, **kwargs: Any) -> None:
"""Stop the cover."""
await self._device.stop()
self.stop_auto_updater()
@property
def current_cover_tilt_position(self) -> int | None:
"""Return current tilt position of cover."""
if not self._device.supports_angle:
return None
ang = self._device.current_angle()
return 100 - ang if ang is not None else None
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
"""Move the cover tilt to a specific position."""
knx_tilt_position = 100 - kwargs[ATTR_TILT_POSITION]
await self._device.set_angle(knx_tilt_position)
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
"""Open the cover tilt."""
await self._device.set_short_up()
async def async_close_cover_tilt(self, **kwargs: Any) -> None:
"""Close the cover tilt."""
await self._device.set_short_down()
async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
"""Stop the cover tilt."""
await self._device.stop()
self.stop_auto_updater()
def start_auto_updater(self) -> None:
"""Start the autoupdater to update Home Assistant while cover is moving."""
if self._unsubscribe_auto_updater is None:
self._unsubscribe_auto_updater = async_track_utc_time_change(
self.hass, self.auto_updater_hook
)
def stop_auto_updater(self) -> None:
"""Stop the autoupdater."""
if self._unsubscribe_auto_updater is not None:
self._unsubscribe_auto_updater()
self._unsubscribe_auto_updater = None
@callback
def auto_updater_hook(self, now: datetime) -> None:
"""Call for the autoupdater."""
self.async_write_ha_state()
if self._device.position_reached():
self.hass.async_create_task(self._device.auto_stop_if_necessary())
self.stop_auto_updater()