"""Support for Template fans.""" from __future__ import annotations from collections.abc import Generator, Sequence import logging from typing import TYPE_CHECKING, Any import voluptuous as vol from homeassistant.components.fan import ( ATTR_DIRECTION, ATTR_OSCILLATING, ATTR_PERCENTAGE, ATTR_PRESET_MODE, DIRECTION_FORWARD, DIRECTION_REVERSE, DOMAIN as FAN_DOMAIN, ENTITY_ID_FORMAT, FanEntity, FanEntityFeature, ) from homeassistant.const import ( CONF_ENTITY_ID, CONF_FRIENDLY_NAME, CONF_NAME, CONF_STATE, CONF_UNIQUE_ID, CONF_VALUE_TEMPLATE, STATE_ON, STATE_UNAVAILABLE, STATE_UNKNOWN, ) from homeassistant.core import HomeAssistant, callback from homeassistant.exceptions import TemplateError from homeassistant.helpers import config_validation as cv, template from homeassistant.helpers.entity import async_generate_entity_id from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from .const import CONF_OBJECT_ID, DOMAIN from .coordinator import TriggerUpdateCoordinator from .entity import AbstractTemplateEntity from .template_entity import ( LEGACY_FIELDS as TEMPLATE_ENTITY_LEGACY_FIELDS, TEMPLATE_ENTITY_AVAILABILITY_SCHEMA_LEGACY, TemplateEntity, make_template_entity_common_modern_schema, rewrite_common_legacy_to_modern_conf, ) from .trigger_entity import TriggerEntity _LOGGER = logging.getLogger(__name__) CONF_FANS = "fans" CONF_SPEED_COUNT = "speed_count" CONF_PRESET_MODES = "preset_modes" CONF_PERCENTAGE_TEMPLATE = "percentage_template" CONF_PRESET_MODE_TEMPLATE = "preset_mode_template" CONF_OSCILLATING_TEMPLATE = "oscillating_template" CONF_DIRECTION_TEMPLATE = "direction_template" CONF_ON_ACTION = "turn_on" CONF_OFF_ACTION = "turn_off" CONF_SET_PERCENTAGE_ACTION = "set_percentage" CONF_SET_OSCILLATING_ACTION = "set_oscillating" CONF_SET_DIRECTION_ACTION = "set_direction" CONF_SET_PRESET_MODE_ACTION = "set_preset_mode" _VALID_DIRECTIONS = [DIRECTION_FORWARD, DIRECTION_REVERSE] CONF_DIRECTION = "direction" CONF_OSCILLATING = "oscillating" CONF_PERCENTAGE = "percentage" CONF_PRESET_MODE = "preset_mode" LEGACY_FIELDS = TEMPLATE_ENTITY_LEGACY_FIELDS | { CONF_DIRECTION_TEMPLATE: CONF_DIRECTION, CONF_OSCILLATING_TEMPLATE: CONF_OSCILLATING, CONF_PERCENTAGE_TEMPLATE: CONF_PERCENTAGE, CONF_PRESET_MODE_TEMPLATE: CONF_PRESET_MODE, CONF_VALUE_TEMPLATE: CONF_STATE, } DEFAULT_NAME = "Template Fan" FAN_SCHEMA = vol.All( vol.Schema( { vol.Optional(CONF_DIRECTION): cv.template, vol.Required(CONF_OFF_ACTION): cv.SCRIPT_SCHEMA, vol.Required(CONF_ON_ACTION): cv.SCRIPT_SCHEMA, vol.Optional(CONF_OSCILLATING): cv.template, vol.Optional(CONF_PERCENTAGE): cv.template, vol.Optional(CONF_PRESET_MODE): cv.template, vol.Optional(CONF_PRESET_MODES): cv.ensure_list, vol.Optional(CONF_SET_DIRECTION_ACTION): cv.SCRIPT_SCHEMA, vol.Optional(CONF_SET_OSCILLATING_ACTION): cv.SCRIPT_SCHEMA, vol.Optional(CONF_SET_PERCENTAGE_ACTION): cv.SCRIPT_SCHEMA, vol.Optional(CONF_SET_PRESET_MODE_ACTION): cv.SCRIPT_SCHEMA, vol.Optional(CONF_SPEED_COUNT): vol.Coerce(int), vol.Optional(CONF_STATE): cv.template, } ).extend(make_template_entity_common_modern_schema(DEFAULT_NAME).schema) ) LEGACY_FAN_SCHEMA = vol.All( cv.deprecated(CONF_ENTITY_ID), vol.Schema( { vol.Optional(CONF_DIRECTION_TEMPLATE): cv.template, vol.Optional(CONF_ENTITY_ID): cv.entity_ids, vol.Optional(CONF_FRIENDLY_NAME): cv.string, vol.Required(CONF_OFF_ACTION): cv.SCRIPT_SCHEMA, vol.Required(CONF_ON_ACTION): cv.SCRIPT_SCHEMA, vol.Optional(CONF_OSCILLATING_TEMPLATE): cv.template, vol.Optional(CONF_PERCENTAGE_TEMPLATE): cv.template, vol.Optional(CONF_PRESET_MODE_TEMPLATE): cv.template, vol.Optional(CONF_PRESET_MODES): cv.ensure_list, vol.Optional(CONF_SET_DIRECTION_ACTION): cv.SCRIPT_SCHEMA, vol.Optional(CONF_SET_OSCILLATING_ACTION): cv.SCRIPT_SCHEMA, vol.Optional(CONF_SET_PERCENTAGE_ACTION): cv.SCRIPT_SCHEMA, vol.Optional(CONF_SET_PRESET_MODE_ACTION): cv.SCRIPT_SCHEMA, vol.Optional(CONF_SPEED_COUNT): vol.Coerce(int), vol.Optional(CONF_UNIQUE_ID): cv.string, vol.Optional(CONF_VALUE_TEMPLATE): cv.template, } ).extend(TEMPLATE_ENTITY_AVAILABILITY_SCHEMA_LEGACY.schema), ) PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA.extend( {vol.Required(CONF_FANS): cv.schema_with_slug_keys(LEGACY_FAN_SCHEMA)} ) def rewrite_legacy_to_modern_conf( hass: HomeAssistant, config: dict[str, dict] ) -> list[dict]: """Rewrite legacy fan configuration definitions to modern ones.""" fans = [] for object_id, entity_conf in config.items(): entity_conf = {**entity_conf, CONF_OBJECT_ID: object_id} entity_conf = rewrite_common_legacy_to_modern_conf( hass, entity_conf, LEGACY_FIELDS ) if CONF_NAME not in entity_conf: entity_conf[CONF_NAME] = template.Template(object_id, hass) fans.append(entity_conf) return fans @callback def _async_create_template_tracking_entities( async_add_entities: AddEntitiesCallback, hass: HomeAssistant, definitions: list[dict], unique_id_prefix: str | None, ) -> None: """Create the template fans.""" fans = [] for entity_conf in definitions: unique_id = entity_conf.get(CONF_UNIQUE_ID) if unique_id and unique_id_prefix: unique_id = f"{unique_id_prefix}-{unique_id}" fans.append( TemplateFan( hass, entity_conf, unique_id, ) ) async_add_entities(fans) async def async_setup_platform( hass: HomeAssistant, config: ConfigType, async_add_entities: AddEntitiesCallback, discovery_info: DiscoveryInfoType | None = None, ) -> None: """Set up the template fans.""" if discovery_info is None: _async_create_template_tracking_entities( async_add_entities, hass, rewrite_legacy_to_modern_conf(hass, config[CONF_FANS]), None, ) return if "coordinator" in discovery_info: async_add_entities( TriggerFanEntity(hass, discovery_info["coordinator"], config) for config in discovery_info["entities"] ) return _async_create_template_tracking_entities( async_add_entities, hass, discovery_info["entities"], discovery_info["unique_id"], ) class AbstractTemplateFan(AbstractTemplateEntity, FanEntity): """Representation of a template fan features.""" # The super init is not called because TemplateEntity and TriggerEntity will call AbstractTemplateEntity.__init__. # This ensures that the __init__ on AbstractTemplateEntity is not called twice. def __init__(self, config: dict[str, Any]) -> None: # pylint: disable=super-init-not-called """Initialize the features.""" self._template = config.get(CONF_STATE) self._percentage_template = config.get(CONF_PERCENTAGE) self._preset_mode_template = config.get(CONF_PRESET_MODE) self._oscillating_template = config.get(CONF_OSCILLATING) self._direction_template = config.get(CONF_DIRECTION) self._state: bool | None = False self._percentage: int | None = None self._preset_mode: str | None = None self._oscillating: bool | None = None self._direction: str | None = None # Number of valid speeds self._speed_count = config.get(CONF_SPEED_COUNT) # List of valid preset modes self._preset_modes: list[str] | None = config.get(CONF_PRESET_MODES) self._attr_assumed_state = self._template is None self._attr_supported_features |= ( FanEntityFeature.TURN_OFF | FanEntityFeature.TURN_ON ) def _iterate_scripts( self, config: dict[str, Any] ) -> Generator[tuple[str, Sequence[dict[str, Any]], FanEntityFeature | int]]: for action_id, supported_feature in ( (CONF_ON_ACTION, 0), (CONF_OFF_ACTION, 0), (CONF_SET_PERCENTAGE_ACTION, FanEntityFeature.SET_SPEED), (CONF_SET_PRESET_MODE_ACTION, FanEntityFeature.PRESET_MODE), (CONF_SET_OSCILLATING_ACTION, FanEntityFeature.OSCILLATE), (CONF_SET_DIRECTION_ACTION, FanEntityFeature.DIRECTION), ): if (action_config := config.get(action_id)) is not None: yield (action_id, action_config, supported_feature) @property def speed_count(self) -> int: """Return the number of speeds the fan supports.""" return self._speed_count or 100 @property def preset_modes(self) -> list[str] | None: """Get the list of available preset modes.""" return self._preset_modes @property def is_on(self) -> bool | None: """Return true if device is on.""" return self._state @property def preset_mode(self) -> str | None: """Return the current preset mode.""" return self._preset_mode @property def percentage(self) -> int | None: """Return the current speed percentage.""" return self._percentage @property def oscillating(self) -> bool | None: """Return the oscillation state.""" return self._oscillating @property def current_direction(self) -> str | None: """Return the oscillation state.""" return self._direction def _handle_state(self, result) -> None: if isinstance(result, bool): self._state = result return if isinstance(result, str): self._state = result.lower() in ("true", STATE_ON) return self._state = False @callback def _update_percentage(self, percentage): # Validate percentage try: percentage = int(float(percentage)) except (ValueError, TypeError): _LOGGER.error( "Received invalid percentage: %s for entity %s", percentage, self.entity_id, ) self._percentage = 0 return if 0 <= percentage <= 100: self._percentage = percentage else: _LOGGER.error( "Received invalid percentage: %s for entity %s", percentage, self.entity_id, ) self._percentage = 0 @callback def _update_preset_mode(self, preset_mode): # Validate preset mode preset_mode = str(preset_mode) if self.preset_modes and preset_mode in self.preset_modes: self._preset_mode = preset_mode elif preset_mode in (STATE_UNAVAILABLE, STATE_UNKNOWN): self._preset_mode = None else: _LOGGER.error( "Received invalid preset_mode: %s for entity %s. Expected: %s", preset_mode, self.entity_id, self.preset_mode, ) self._preset_mode = None @callback def _update_oscillating(self, oscillating): # Validate osc if oscillating == "True" or oscillating is True: self._oscillating = True elif oscillating == "False" or oscillating is False: self._oscillating = False elif oscillating in (STATE_UNAVAILABLE, STATE_UNKNOWN): self._oscillating = None else: _LOGGER.error( "Received invalid oscillating: %s for entity %s. Expected: True/False", oscillating, self.entity_id, ) self._oscillating = None @callback def _update_direction(self, direction): # Validate direction if direction in _VALID_DIRECTIONS: self._direction = direction elif direction in (STATE_UNAVAILABLE, STATE_UNKNOWN): self._direction = None else: _LOGGER.error( "Received invalid direction: %s for entity %s. Expected: %s", direction, self.entity_id, ", ".join(_VALID_DIRECTIONS), ) self._direction = None async def async_turn_on( self, percentage: int | None = None, preset_mode: str | None = None, **kwargs: Any, ) -> None: """Turn on the fan.""" await self.async_run_script( self._action_scripts[CONF_ON_ACTION], run_variables={ ATTR_PERCENTAGE: percentage, ATTR_PRESET_MODE: preset_mode, }, context=self._context, ) if preset_mode is not None: await self.async_set_preset_mode(preset_mode) if percentage is not None: await self.async_set_percentage(percentage) if self._template is None: self._state = True self.async_write_ha_state() async def async_turn_off(self, **kwargs: Any) -> None: """Turn off the fan.""" await self.async_run_script( self._action_scripts[CONF_OFF_ACTION], context=self._context ) if self._template is None: self._state = False self.async_write_ha_state() async def async_set_percentage(self, percentage: int) -> None: """Set the percentage speed of the fan.""" self._percentage = percentage if script := self._action_scripts.get(CONF_SET_PERCENTAGE_ACTION): await self.async_run_script( script, run_variables={ATTR_PERCENTAGE: self._percentage}, context=self._context, ) if self._template is None: self._state = percentage != 0 if self._template is None or self._percentage_template is None: self.async_write_ha_state() async def async_set_preset_mode(self, preset_mode: str) -> None: """Set the preset_mode of the fan.""" self._preset_mode = preset_mode if script := self._action_scripts.get(CONF_SET_PRESET_MODE_ACTION): await self.async_run_script( script, run_variables={ATTR_PRESET_MODE: self._preset_mode}, context=self._context, ) if self._template is None: self._state = True if self._template is None or self._preset_mode_template is None: self.async_write_ha_state() async def async_oscillate(self, oscillating: bool) -> None: """Set oscillation of the fan.""" self._oscillating = oscillating if ( script := self._action_scripts.get(CONF_SET_OSCILLATING_ACTION) ) is not None: await self.async_run_script( script, run_variables={ATTR_OSCILLATING: self.oscillating}, context=self._context, ) if self._oscillating_template is None: self.async_write_ha_state() async def async_set_direction(self, direction: str) -> None: """Set the direction of the fan.""" if direction in _VALID_DIRECTIONS: self._direction = direction if ( script := self._action_scripts.get(CONF_SET_DIRECTION_ACTION) ) is not None: await self.async_run_script( script, run_variables={ATTR_DIRECTION: direction}, context=self._context, ) if self._direction_template is None: self.async_write_ha_state() else: _LOGGER.error( "Received invalid direction: %s for entity %s. Expected: %s", direction, self.entity_id, ", ".join(_VALID_DIRECTIONS), ) class TemplateFan(TemplateEntity, AbstractTemplateFan): """A template fan component.""" _attr_should_poll = False def __init__( self, hass: HomeAssistant, config: dict[str, Any], unique_id, ) -> None: """Initialize the fan.""" TemplateEntity.__init__(self, hass, config=config, unique_id=unique_id) AbstractTemplateFan.__init__(self, config) if (object_id := config.get(CONF_OBJECT_ID)) is not None: self.entity_id = async_generate_entity_id( ENTITY_ID_FORMAT, object_id, hass=hass ) name = self._attr_name if TYPE_CHECKING: assert name is not None for action_id, action_config, supported_feature in self._iterate_scripts( config ): self.add_script(action_id, action_config, name, DOMAIN) self._attr_supported_features |= supported_feature @callback def _update_state(self, result): super()._update_state(result) if isinstance(result, TemplateError): self._state = None return self._handle_state(result) @callback def _async_setup_templates(self) -> None: """Set up templates.""" if self._template: self.add_template_attribute( "_state", self._template, None, self._update_state ) if self._preset_mode_template is not None: self.add_template_attribute( "_preset_mode", self._preset_mode_template, None, self._update_preset_mode, none_on_template_error=True, ) if self._percentage_template is not None: self.add_template_attribute( "_percentage", self._percentage_template, None, self._update_percentage, none_on_template_error=True, ) if self._oscillating_template is not None: self.add_template_attribute( "_oscillating", self._oscillating_template, None, self._update_oscillating, none_on_template_error=True, ) if self._direction_template is not None: self.add_template_attribute( "_direction", self._direction_template, None, self._update_direction, none_on_template_error=True, ) super()._async_setup_templates() class TriggerFanEntity(TriggerEntity, AbstractTemplateFan): """Fan entity based on trigger data.""" domain = FAN_DOMAIN def __init__( self, hass: HomeAssistant, coordinator: TriggerUpdateCoordinator, config: ConfigType, ) -> None: """Initialize the entity.""" TriggerEntity.__init__(self, hass, coordinator, config) AbstractTemplateFan.__init__(self, config) self._attr_name = name = self._rendered.get(CONF_NAME, DEFAULT_NAME) for action_id, action_config, supported_feature in self._iterate_scripts( config ): self.add_script(action_id, action_config, name, DOMAIN) self._attr_supported_features |= supported_feature for key in ( CONF_STATE, CONF_PRESET_MODE, CONF_PERCENTAGE, CONF_OSCILLATING, CONF_DIRECTION, ): if isinstance(config.get(key), template.Template): self._to_render_simple.append(key) self._parse_result.add(key) @callback def _handle_coordinator_update(self) -> None: """Handle update of the data.""" self._process_data() if not self.available: self.async_write_ha_state() return write_ha_state = False for key, updater in ( (CONF_STATE, self._handle_state), (CONF_PRESET_MODE, self._update_preset_mode), (CONF_PERCENTAGE, self._update_percentage), (CONF_OSCILLATING, self._update_oscillating), (CONF_DIRECTION, self._update_direction), ): if (rendered := self._rendered.get(key)) is not None: updater(rendered) write_ha_state = True if len(self._rendered) > 0: # In case any non optimistic template write_ha_state = True if write_ha_state: self.async_set_context(self.coordinator.data["context"]) self.async_write_ha_state()