core/homeassistant/components/qbus/cover.py

194 lines
6.8 KiB
Python

"""Support for Qbus cover."""
from typing import Any
from qbusmqttapi.const import (
KEY_PROPERTIES_SHUTTER_POSITION,
KEY_PROPERTIES_SLAT_POSITION,
)
from qbusmqttapi.discovery import QbusMqttOutput
from qbusmqttapi.state import QbusMqttShutterState, StateType
from homeassistant.components.cover import (
ATTR_POSITION,
ATTR_TILT_POSITION,
CoverDeviceClass,
CoverEntity,
CoverEntityFeature,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .coordinator import QbusConfigEntry
from .entity import QbusEntity, add_new_outputs
PARALLEL_UPDATES = 0
async def async_setup_entry(
hass: HomeAssistant,
entry: QbusConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up cover entities."""
coordinator = entry.runtime_data
added_outputs: list[QbusMqttOutput] = []
def _check_outputs() -> None:
add_new_outputs(
coordinator,
added_outputs,
lambda output: output.type == "shutter",
QbusCover,
async_add_entities,
)
_check_outputs()
entry.async_on_unload(coordinator.async_add_listener(_check_outputs))
class QbusCover(QbusEntity, CoverEntity):
"""Representation of a Qbus cover entity."""
_state_cls = QbusMqttShutterState
_attr_name = None
_attr_supported_features: CoverEntityFeature
_attr_device_class = CoverDeviceClass.BLIND
def __init__(self, mqtt_output: QbusMqttOutput) -> None:
"""Initialize cover entity."""
super().__init__(mqtt_output)
self._attr_assumed_state = False
self._attr_current_cover_position = 0
self._attr_current_cover_tilt_position = 0
self._attr_is_closed = True
self._attr_supported_features = (
CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE
)
if "shutterStop" in mqtt_output.actions:
self._attr_supported_features |= CoverEntityFeature.STOP
self._attr_assumed_state = True
if KEY_PROPERTIES_SHUTTER_POSITION in mqtt_output.properties:
self._attr_supported_features |= CoverEntityFeature.SET_POSITION
if KEY_PROPERTIES_SLAT_POSITION in mqtt_output.properties:
self._attr_supported_features |= CoverEntityFeature.SET_TILT_POSITION
self._attr_supported_features |= CoverEntityFeature.OPEN_TILT
self._attr_supported_features |= CoverEntityFeature.CLOSE_TILT
self._target_shutter_position: int | None = None
self._target_slat_position: int | None = None
self._target_state: str | None = None
self._previous_state: str | None = None
async def async_open_cover(self, **kwargs: Any) -> None:
"""Open the cover."""
state = QbusMqttShutterState(id=self._mqtt_output.id, type=StateType.STATE)
if self._attr_supported_features & CoverEntityFeature.SET_POSITION:
state.write_position(100)
else:
state.write_state("up")
await self._async_publish_output_state(state)
async def async_close_cover(self, **kwargs: Any) -> None:
"""Close the cover."""
state = QbusMqttShutterState(id=self._mqtt_output.id, type=StateType.STATE)
if self._attr_supported_features & CoverEntityFeature.SET_POSITION:
state.write_position(0)
if self._attr_supported_features & CoverEntityFeature.SET_TILT_POSITION:
state.write_slat_position(0)
else:
state.write_state("down")
await self._async_publish_output_state(state)
async def async_stop_cover(self, **kwargs: Any) -> None:
"""Stop the cover."""
state = QbusMqttShutterState(id=self._mqtt_output.id, type=StateType.STATE)
state.write_state("stop")
await self._async_publish_output_state(state)
async def async_set_cover_position(self, **kwargs: Any) -> None:
"""Move the cover to a specific position."""
state = QbusMqttShutterState(id=self._mqtt_output.id, type=StateType.STATE)
state.write_position(int(kwargs[ATTR_POSITION]))
await self._async_publish_output_state(state)
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
"""Open the cover tilt."""
state = QbusMqttShutterState(id=self._mqtt_output.id, type=StateType.STATE)
state.write_slat_position(50)
await self._async_publish_output_state(state)
async def async_close_cover_tilt(self, **kwargs: Any) -> None:
"""Close the cover tilt."""
state = QbusMqttShutterState(id=self._mqtt_output.id, type=StateType.STATE)
state.write_slat_position(0)
await self._async_publish_output_state(state)
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
"""Move the cover tilt to a specific position."""
state = QbusMqttShutterState(id=self._mqtt_output.id, type=StateType.STATE)
state.write_slat_position(int(kwargs[ATTR_TILT_POSITION]))
await self._async_publish_output_state(state)
async def _handle_state_received(self, state: QbusMqttShutterState) -> None:
output_state = state.read_state()
shutter_position = state.read_position()
slat_position = state.read_slat_position()
if output_state is not None:
self._previous_state = self._target_state
self._target_state = output_state
if shutter_position is not None:
self._target_shutter_position = shutter_position
if slat_position is not None:
self._target_slat_position = slat_position
self._update_is_closed()
self._update_cover_position()
self._update_tilt_position()
def _update_is_closed(self) -> None:
if self._attr_supported_features & CoverEntityFeature.SET_POSITION:
if self._attr_supported_features & CoverEntityFeature.SET_TILT_POSITION:
self._attr_is_closed = (
self._target_shutter_position == 0
and self._target_slat_position in (0, 100)
)
else:
self._attr_is_closed = self._target_shutter_position == 0
else:
self._attr_is_closed = (
self._previous_state == "down" and self._target_state == "stop"
)
def _update_cover_position(self) -> None:
self._attr_current_cover_position = (
self._target_shutter_position
if self._attr_supported_features & CoverEntityFeature.SET_POSITION
else None
)
def _update_tilt_position(self) -> None:
self._attr_current_cover_tilt_position = (
self._target_slat_position
if self._attr_supported_features & CoverEntityFeature.SET_TILT_POSITION
else None
)