core/homeassistant/components/cover/reproduce_state.py

254 lines
8.5 KiB
Python

"""Reproduce an Cover state."""
from __future__ import annotations
import asyncio
from collections.abc import Coroutine, Iterable
from functools import partial
import logging
from typing import Any, Final
from homeassistant.const import (
ATTR_ENTITY_ID,
ATTR_SUPPORTED_FEATURES,
SERVICE_CLOSE_COVER,
SERVICE_CLOSE_COVER_TILT,
SERVICE_OPEN_COVER,
SERVICE_OPEN_COVER_TILT,
SERVICE_SET_COVER_POSITION,
SERVICE_SET_COVER_TILT_POSITION,
)
from homeassistant.core import Context, HomeAssistant, ServiceResponse, State
from homeassistant.util.enum import try_parse_enum
from . import (
ATTR_CURRENT_POSITION,
ATTR_CURRENT_TILT_POSITION,
ATTR_POSITION,
ATTR_TILT_POSITION,
DOMAIN,
CoverEntityFeature,
CoverState,
)
_LOGGER = logging.getLogger(__name__)
OPENING_STATES = {CoverState.OPENING, CoverState.OPEN}
CLOSING_STATES = {CoverState.CLOSING, CoverState.CLOSED}
VALID_STATES: set[CoverState] = OPENING_STATES | CLOSING_STATES
FULL_OPEN: Final = 100
FULL_CLOSE: Final = 0
def _determine_features(current_attrs: dict[str, Any]) -> CoverEntityFeature:
"""Determine supported features based on current attributes."""
features = CoverEntityFeature(0)
if ATTR_CURRENT_POSITION in current_attrs:
features |= (
CoverEntityFeature.SET_POSITION
| CoverEntityFeature.OPEN
| CoverEntityFeature.CLOSE
)
if ATTR_CURRENT_TILT_POSITION in current_attrs:
features |= (
CoverEntityFeature.SET_TILT_POSITION
| CoverEntityFeature.OPEN_TILT
| CoverEntityFeature.CLOSE_TILT
)
if features == CoverEntityFeature(0):
features = CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE
return features
async def _async_set_position(
service_call: partial[Coroutine[Any, Any, ServiceResponse]],
service_data: dict[str, Any],
features: CoverEntityFeature,
target_position: int,
) -> bool:
"""Set the position of the cover.
Returns True if the position was set, False if there is no
supported method for setting the position.
"""
if CoverEntityFeature.SET_POSITION in features:
await service_call(
SERVICE_SET_COVER_POSITION, service_data | {ATTR_POSITION: target_position}
)
elif target_position == FULL_CLOSE and CoverEntityFeature.CLOSE in features:
await service_call(SERVICE_CLOSE_COVER, service_data)
elif target_position == FULL_OPEN and CoverEntityFeature.OPEN in features:
await service_call(SERVICE_OPEN_COVER, service_data)
else:
# Requested a position but the cover doesn't support it
return False
return True
async def _async_set_tilt_position(
service_call: partial[Coroutine[Any, Any, ServiceResponse]],
service_data: dict[str, Any],
features: CoverEntityFeature,
target_tilt_position: int,
) -> bool:
"""Set the tilt position of the cover.
Returns True if the tilt position was set, False if there is no
supported method for setting the tilt position.
"""
if CoverEntityFeature.SET_TILT_POSITION in features:
await service_call(
SERVICE_SET_COVER_TILT_POSITION,
service_data | {ATTR_TILT_POSITION: target_tilt_position},
)
elif (
target_tilt_position == FULL_CLOSE and CoverEntityFeature.CLOSE_TILT in features
):
await service_call(SERVICE_CLOSE_COVER_TILT, service_data)
elif target_tilt_position == FULL_OPEN and CoverEntityFeature.OPEN_TILT in features:
await service_call(SERVICE_OPEN_COVER_TILT, service_data)
else:
# Requested a tilt position but the cover doesn't support it
return False
return True
async def _async_close_cover(
service_call: partial[Coroutine[Any, Any, ServiceResponse]],
service_data: dict[str, Any],
features: CoverEntityFeature,
set_position: bool,
set_tilt: bool,
) -> None:
"""Close the cover if it was not closed by setting the position."""
if not set_position:
if CoverEntityFeature.CLOSE in features:
await service_call(SERVICE_CLOSE_COVER, service_data)
elif CoverEntityFeature.SET_POSITION in features:
await service_call(
SERVICE_SET_COVER_POSITION, service_data | {ATTR_POSITION: FULL_CLOSE}
)
if not set_tilt:
if CoverEntityFeature.CLOSE_TILT in features:
await service_call(SERVICE_CLOSE_COVER_TILT, service_data)
elif CoverEntityFeature.SET_TILT_POSITION in features:
await service_call(
SERVICE_SET_COVER_TILT_POSITION,
service_data | {ATTR_TILT_POSITION: FULL_CLOSE},
)
async def _async_open_cover(
service_call: partial[Coroutine[Any, Any, ServiceResponse]],
service_data: dict[str, Any],
features: CoverEntityFeature,
set_position: bool,
set_tilt: bool,
) -> None:
"""Open the cover if it was not opened by setting the position."""
if not set_position:
if CoverEntityFeature.OPEN in features:
await service_call(SERVICE_OPEN_COVER, service_data)
elif CoverEntityFeature.SET_POSITION in features:
await service_call(
SERVICE_SET_COVER_POSITION, service_data | {ATTR_POSITION: FULL_OPEN}
)
if not set_tilt:
if CoverEntityFeature.OPEN_TILT in features:
await service_call(SERVICE_OPEN_COVER_TILT, service_data)
elif CoverEntityFeature.SET_TILT_POSITION in features:
await service_call(
SERVICE_SET_COVER_TILT_POSITION,
service_data | {ATTR_TILT_POSITION: FULL_OPEN},
)
async def _async_reproduce_state(
hass: HomeAssistant,
state: State,
*,
context: Context | None = None,
reproduce_options: dict[str, Any] | None = None,
) -> None:
"""Reproduce a single state."""
entity_id = state.entity_id
if (cur_state := hass.states.get(entity_id)) is None:
_LOGGER.warning("Unable to find entity %s", entity_id)
return
if (target_state := state.state) not in VALID_STATES:
_LOGGER.warning("Invalid state specified for %s: %s", entity_id, target_state)
return
current_attrs = cur_state.attributes
target_attrs = state.attributes
current_position: int | None = current_attrs.get(ATTR_CURRENT_POSITION)
target_position: int | None = target_attrs.get(ATTR_CURRENT_POSITION)
position_matches = current_position == target_position
current_tilt_position: int | None = current_attrs.get(ATTR_CURRENT_TILT_POSITION)
target_tilt_position: int | None = target_attrs.get(ATTR_CURRENT_TILT_POSITION)
tilt_position_matches = current_tilt_position == target_tilt_position
state_matches = cur_state.state == target_state
# Return if we are already at the right state.
if state_matches and position_matches and tilt_position_matches:
return
features = try_parse_enum(
CoverEntityFeature, current_attrs.get(ATTR_SUPPORTED_FEATURES)
)
if features is None:
# Backwards compatibility for integrations that
# don't set supported features since it previously
# worked without it.
_LOGGER.warning("Supported features is not set for %s", entity_id)
features = _determine_features(current_attrs)
service_call = partial(
hass.services.async_call,
DOMAIN,
context=context,
blocking=True,
)
service_data = {ATTR_ENTITY_ID: entity_id}
set_position = target_position is not None and await _async_set_position(
service_call, service_data, features, target_position
)
set_tilt = target_tilt_position is not None and await _async_set_tilt_position(
service_call, service_data, features, target_tilt_position
)
if target_state in CLOSING_STATES:
await _async_close_cover(
service_call, service_data, features, set_position, set_tilt
)
elif target_state in OPENING_STATES:
await _async_open_cover(
service_call, service_data, features, set_position, set_tilt
)
async def async_reproduce_states(
hass: HomeAssistant,
states: Iterable[State],
*,
context: Context | None = None,
reproduce_options: dict[str, Any] | None = None,
) -> None:
"""Reproduce Cover states."""
# Reproduce states in parallel.
await asyncio.gather(
*(
_async_reproduce_state(
hass, state, context=context, reproduce_options=reproduce_options
)
for state in states
)
)