"""Component to allow selecting an option from a list as platforms.""" from __future__ import annotations from datetime import timedelta from functools import cached_property import logging from typing import Any, final import voluptuous as vol from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant, callback from homeassistant.exceptions import ServiceValidationError from homeassistant.helpers import config_validation as cv from homeassistant.helpers.config_validation import ( PLATFORM_SCHEMA, PLATFORM_SCHEMA_BASE, ) from homeassistant.helpers.entity import Entity, EntityDescription from homeassistant.helpers.entity_component import EntityComponent from homeassistant.helpers.typing import ConfigType from .const import ( ATTR_CYCLE, ATTR_OPTION, ATTR_OPTIONS, DOMAIN, SERVICE_SELECT_FIRST, SERVICE_SELECT_LAST, SERVICE_SELECT_NEXT, SERVICE_SELECT_OPTION, SERVICE_SELECT_PREVIOUS, ) SCAN_INTERVAL = timedelta(seconds=30) ENTITY_ID_FORMAT = DOMAIN + ".{}" MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10) _LOGGER = logging.getLogger(__name__) __all__ = [ "ATTR_CYCLE", "ATTR_OPTION", "ATTR_OPTIONS", "DOMAIN", "PLATFORM_SCHEMA_BASE", "PLATFORM_SCHEMA", "SelectEntity", "SelectEntityDescription", "SERVICE_SELECT_FIRST", "SERVICE_SELECT_LAST", "SERVICE_SELECT_NEXT", "SERVICE_SELECT_OPTION", "SERVICE_SELECT_PREVIOUS", ] # mypy: disallow-any-generics async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up Select entities.""" component = hass.data[DOMAIN] = EntityComponent[SelectEntity]( _LOGGER, DOMAIN, hass, SCAN_INTERVAL ) await component.async_setup(config) component.async_register_entity_service( SERVICE_SELECT_FIRST, {}, SelectEntity.async_first.__name__, ) component.async_register_entity_service( SERVICE_SELECT_LAST, {}, SelectEntity.async_last.__name__, ) component.async_register_entity_service( SERVICE_SELECT_NEXT, {vol.Optional(ATTR_CYCLE, default=True): bool}, SelectEntity.async_next.__name__, ) component.async_register_entity_service( SERVICE_SELECT_OPTION, {vol.Required(ATTR_OPTION): cv.string}, SelectEntity.async_handle_select_option.__name__, ) component.async_register_entity_service( SERVICE_SELECT_PREVIOUS, {vol.Optional(ATTR_CYCLE, default=True): bool}, SelectEntity.async_previous.__name__, ) return True async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up a config entry.""" component: EntityComponent[SelectEntity] = hass.data[DOMAIN] return await component.async_setup_entry(entry) async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unload a config entry.""" component: EntityComponent[SelectEntity] = hass.data[DOMAIN] return await component.async_unload_entry(entry) class SelectEntityDescription(EntityDescription, frozen_or_thawed=True): """A class that describes select entities.""" options: list[str] | None = None CACHED_PROPERTIES_WITH_ATTR_ = { "current_option", "options", } class SelectEntity(Entity, cached_properties=CACHED_PROPERTIES_WITH_ATTR_): """Representation of a Select entity.""" _entity_component_unrecorded_attributes = frozenset({ATTR_OPTIONS}) entity_description: SelectEntityDescription _attr_current_option: str | None _attr_options: list[str] _attr_state: None = None @property def capability_attributes(self) -> dict[str, Any]: """Return capability attributes.""" return { ATTR_OPTIONS: self.options, } @property @final def state(self) -> str | None: """Return the entity state.""" current_option = self.current_option if current_option is None or current_option not in self.options: return None return current_option @cached_property def options(self) -> list[str]: """Return a set of selectable options.""" if hasattr(self, "_attr_options"): return self._attr_options if ( hasattr(self, "entity_description") and self.entity_description.options is not None ): return self.entity_description.options raise AttributeError @cached_property def current_option(self) -> str | None: """Return the selected entity option to represent the entity state.""" return self._attr_current_option @final @callback def _valid_option_or_raise(self, option: str) -> None: """Raise ServiceValidationError on invalid option.""" options = self.options if not options or option not in options: friendly_options: str = ", ".join(options or []) raise ServiceValidationError( translation_domain=DOMAIN, translation_key="not_valid_option", translation_placeholders={ "entity_id": self.entity_id, "option": option, "options": friendly_options, }, ) @final async def async_handle_select_option(self, option: str) -> None: """Service call wrapper to set a new value.""" self._valid_option_or_raise(option) await self.async_select_option(option) def select_option(self, option: str) -> None: """Change the selected option.""" raise NotImplementedError async def async_select_option(self, option: str) -> None: """Change the selected option.""" await self.hass.async_add_executor_job(self.select_option, option) @final async def async_first(self) -> None: """Select first option.""" await self._async_select_index(0) @final async def async_last(self) -> None: """Select last option.""" await self._async_select_index(-1) @final async def async_next(self, cycle: bool) -> None: """Select next option. If there is no current option, first item is the next. """ if self.current_option is None: await self.async_first() return await self._async_offset_index(1, cycle) @final async def async_previous(self, cycle: bool) -> None: """Select previous option. If there is no current option, last item is the previous. """ if self.current_option is None: await self.async_last() return await self._async_offset_index(-1, cycle) @final async def _async_offset_index(self, offset: int, cycle: bool) -> None: """Offset current index.""" current_index = 0 current_option = self.current_option options = self.options if current_option is not None and current_option in self.options: current_index = self.options.index(current_option) new_index = current_index + offset if cycle: new_index = new_index % len(options) elif new_index < 0: new_index = 0 elif new_index >= len(options): new_index = len(options) - 1 await self.async_select_option(options[new_index]) @final async def _async_select_index(self, idx: int) -> None: """Select new option by index.""" options = self.options new_index = idx % len(options) await self.async_select_option(options[new_index])