254 lines
8.5 KiB
Python
254 lines
8.5 KiB
Python
"""Reproduce an Cover state."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import Coroutine, Iterable
|
|
from functools import partial
|
|
import logging
|
|
from typing import Any, Final
|
|
|
|
from homeassistant.const import (
|
|
ATTR_ENTITY_ID,
|
|
ATTR_SUPPORTED_FEATURES,
|
|
SERVICE_CLOSE_COVER,
|
|
SERVICE_CLOSE_COVER_TILT,
|
|
SERVICE_OPEN_COVER,
|
|
SERVICE_OPEN_COVER_TILT,
|
|
SERVICE_SET_COVER_POSITION,
|
|
SERVICE_SET_COVER_TILT_POSITION,
|
|
)
|
|
from homeassistant.core import Context, HomeAssistant, ServiceResponse, State
|
|
from homeassistant.util.enum import try_parse_enum
|
|
|
|
from . import (
|
|
ATTR_CURRENT_POSITION,
|
|
ATTR_CURRENT_TILT_POSITION,
|
|
ATTR_POSITION,
|
|
ATTR_TILT_POSITION,
|
|
DOMAIN,
|
|
CoverEntityFeature,
|
|
CoverState,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
OPENING_STATES = {CoverState.OPENING, CoverState.OPEN}
|
|
CLOSING_STATES = {CoverState.CLOSING, CoverState.CLOSED}
|
|
VALID_STATES: set[CoverState] = OPENING_STATES | CLOSING_STATES
|
|
|
|
FULL_OPEN: Final = 100
|
|
FULL_CLOSE: Final = 0
|
|
|
|
|
|
def _determine_features(current_attrs: dict[str, Any]) -> CoverEntityFeature:
|
|
"""Determine supported features based on current attributes."""
|
|
features = CoverEntityFeature(0)
|
|
if ATTR_CURRENT_POSITION in current_attrs:
|
|
features |= (
|
|
CoverEntityFeature.SET_POSITION
|
|
| CoverEntityFeature.OPEN
|
|
| CoverEntityFeature.CLOSE
|
|
)
|
|
if ATTR_CURRENT_TILT_POSITION in current_attrs:
|
|
features |= (
|
|
CoverEntityFeature.SET_TILT_POSITION
|
|
| CoverEntityFeature.OPEN_TILT
|
|
| CoverEntityFeature.CLOSE_TILT
|
|
)
|
|
if features == CoverEntityFeature(0):
|
|
features = CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE
|
|
return features
|
|
|
|
|
|
async def _async_set_position(
|
|
service_call: partial[Coroutine[Any, Any, ServiceResponse]],
|
|
service_data: dict[str, Any],
|
|
features: CoverEntityFeature,
|
|
target_position: int,
|
|
) -> bool:
|
|
"""Set the position of the cover.
|
|
|
|
Returns True if the position was set, False if there is no
|
|
supported method for setting the position.
|
|
"""
|
|
if CoverEntityFeature.SET_POSITION in features:
|
|
await service_call(
|
|
SERVICE_SET_COVER_POSITION, service_data | {ATTR_POSITION: target_position}
|
|
)
|
|
elif target_position == FULL_CLOSE and CoverEntityFeature.CLOSE in features:
|
|
await service_call(SERVICE_CLOSE_COVER, service_data)
|
|
elif target_position == FULL_OPEN and CoverEntityFeature.OPEN in features:
|
|
await service_call(SERVICE_OPEN_COVER, service_data)
|
|
else:
|
|
# Requested a position but the cover doesn't support it
|
|
return False
|
|
return True
|
|
|
|
|
|
async def _async_set_tilt_position(
|
|
service_call: partial[Coroutine[Any, Any, ServiceResponse]],
|
|
service_data: dict[str, Any],
|
|
features: CoverEntityFeature,
|
|
target_tilt_position: int,
|
|
) -> bool:
|
|
"""Set the tilt position of the cover.
|
|
|
|
Returns True if the tilt position was set, False if there is no
|
|
supported method for setting the tilt position.
|
|
"""
|
|
if CoverEntityFeature.SET_TILT_POSITION in features:
|
|
await service_call(
|
|
SERVICE_SET_COVER_TILT_POSITION,
|
|
service_data | {ATTR_TILT_POSITION: target_tilt_position},
|
|
)
|
|
elif (
|
|
target_tilt_position == FULL_CLOSE and CoverEntityFeature.CLOSE_TILT in features
|
|
):
|
|
await service_call(SERVICE_CLOSE_COVER_TILT, service_data)
|
|
elif target_tilt_position == FULL_OPEN and CoverEntityFeature.OPEN_TILT in features:
|
|
await service_call(SERVICE_OPEN_COVER_TILT, service_data)
|
|
else:
|
|
# Requested a tilt position but the cover doesn't support it
|
|
return False
|
|
return True
|
|
|
|
|
|
async def _async_close_cover(
|
|
service_call: partial[Coroutine[Any, Any, ServiceResponse]],
|
|
service_data: dict[str, Any],
|
|
features: CoverEntityFeature,
|
|
set_position: bool,
|
|
set_tilt: bool,
|
|
) -> None:
|
|
"""Close the cover if it was not closed by setting the position."""
|
|
if not set_position:
|
|
if CoverEntityFeature.CLOSE in features:
|
|
await service_call(SERVICE_CLOSE_COVER, service_data)
|
|
elif CoverEntityFeature.SET_POSITION in features:
|
|
await service_call(
|
|
SERVICE_SET_COVER_POSITION, service_data | {ATTR_POSITION: FULL_CLOSE}
|
|
)
|
|
if not set_tilt:
|
|
if CoverEntityFeature.CLOSE_TILT in features:
|
|
await service_call(SERVICE_CLOSE_COVER_TILT, service_data)
|
|
elif CoverEntityFeature.SET_TILT_POSITION in features:
|
|
await service_call(
|
|
SERVICE_SET_COVER_TILT_POSITION,
|
|
service_data | {ATTR_TILT_POSITION: FULL_CLOSE},
|
|
)
|
|
|
|
|
|
async def _async_open_cover(
|
|
service_call: partial[Coroutine[Any, Any, ServiceResponse]],
|
|
service_data: dict[str, Any],
|
|
features: CoverEntityFeature,
|
|
set_position: bool,
|
|
set_tilt: bool,
|
|
) -> None:
|
|
"""Open the cover if it was not opened by setting the position."""
|
|
if not set_position:
|
|
if CoverEntityFeature.OPEN in features:
|
|
await service_call(SERVICE_OPEN_COVER, service_data)
|
|
elif CoverEntityFeature.SET_POSITION in features:
|
|
await service_call(
|
|
SERVICE_SET_COVER_POSITION, service_data | {ATTR_POSITION: FULL_OPEN}
|
|
)
|
|
if not set_tilt:
|
|
if CoverEntityFeature.OPEN_TILT in features:
|
|
await service_call(SERVICE_OPEN_COVER_TILT, service_data)
|
|
elif CoverEntityFeature.SET_TILT_POSITION in features:
|
|
await service_call(
|
|
SERVICE_SET_COVER_TILT_POSITION,
|
|
service_data | {ATTR_TILT_POSITION: FULL_OPEN},
|
|
)
|
|
|
|
|
|
async def _async_reproduce_state(
|
|
hass: HomeAssistant,
|
|
state: State,
|
|
*,
|
|
context: Context | None = None,
|
|
reproduce_options: dict[str, Any] | None = None,
|
|
) -> None:
|
|
"""Reproduce a single state."""
|
|
entity_id = state.entity_id
|
|
if (cur_state := hass.states.get(entity_id)) is None:
|
|
_LOGGER.warning("Unable to find entity %s", entity_id)
|
|
return
|
|
|
|
if (target_state := state.state) not in VALID_STATES:
|
|
_LOGGER.warning("Invalid state specified for %s: %s", entity_id, target_state)
|
|
return
|
|
|
|
current_attrs = cur_state.attributes
|
|
target_attrs = state.attributes
|
|
|
|
current_position: int | None = current_attrs.get(ATTR_CURRENT_POSITION)
|
|
target_position: int | None = target_attrs.get(ATTR_CURRENT_POSITION)
|
|
position_matches = current_position == target_position
|
|
|
|
current_tilt_position: int | None = current_attrs.get(ATTR_CURRENT_TILT_POSITION)
|
|
target_tilt_position: int | None = target_attrs.get(ATTR_CURRENT_TILT_POSITION)
|
|
tilt_position_matches = current_tilt_position == target_tilt_position
|
|
|
|
state_matches = cur_state.state == target_state
|
|
# Return if we are already at the right state.
|
|
if state_matches and position_matches and tilt_position_matches:
|
|
return
|
|
|
|
features = try_parse_enum(
|
|
CoverEntityFeature, current_attrs.get(ATTR_SUPPORTED_FEATURES)
|
|
)
|
|
if features is None:
|
|
# Backwards compatibility for integrations that
|
|
# don't set supported features since it previously
|
|
# worked without it.
|
|
_LOGGER.warning("Supported features is not set for %s", entity_id)
|
|
features = _determine_features(current_attrs)
|
|
|
|
service_call = partial(
|
|
hass.services.async_call,
|
|
DOMAIN,
|
|
context=context,
|
|
blocking=True,
|
|
)
|
|
service_data = {ATTR_ENTITY_ID: entity_id}
|
|
|
|
set_position = target_position is not None and await _async_set_position(
|
|
service_call, service_data, features, target_position
|
|
)
|
|
set_tilt = target_tilt_position is not None and await _async_set_tilt_position(
|
|
service_call, service_data, features, target_tilt_position
|
|
)
|
|
|
|
if target_state in CLOSING_STATES:
|
|
await _async_close_cover(
|
|
service_call, service_data, features, set_position, set_tilt
|
|
)
|
|
|
|
elif target_state in OPENING_STATES:
|
|
await _async_open_cover(
|
|
service_call, service_data, features, set_position, set_tilt
|
|
)
|
|
|
|
|
|
async def async_reproduce_states(
|
|
hass: HomeAssistant,
|
|
states: Iterable[State],
|
|
*,
|
|
context: Context | None = None,
|
|
reproduce_options: dict[str, Any] | None = None,
|
|
) -> None:
|
|
"""Reproduce Cover states."""
|
|
# Reproduce states in parallel.
|
|
await asyncio.gather(
|
|
*(
|
|
_async_reproduce_state(
|
|
hass, state, context=context, reproduce_options=reproduce_options
|
|
)
|
|
for state in states
|
|
)
|
|
)
|