"""Support for LIFX.""" from __future__ import annotations import asyncio from collections.abc import Callable from functools import partial from typing import Any from aiolifx import products from aiolifx.aiolifx import Light from aiolifx.message import Message from awesomeversion import AwesomeVersion from homeassistant.components.light import ( ATTR_BRIGHTNESS, ATTR_BRIGHTNESS_PCT, ATTR_COLOR_NAME, ATTR_COLOR_TEMP, ATTR_COLOR_TEMP_KELVIN, ATTR_HS_COLOR, ATTR_KELVIN, ATTR_RGB_COLOR, ATTR_XY_COLOR, ) from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant, callback from homeassistant.helpers import device_registry as dr import homeassistant.util.color as color_util from .const import ( _LOGGER, DEFAULT_ATTEMPTS, DOMAIN, INFRARED_BRIGHTNESS_VALUES_MAP, OVERALL_TIMEOUT, ) FIX_MAC_FW = AwesomeVersion("3.70") @callback def async_entry_is_legacy(entry: ConfigEntry) -> bool: """Check if a config entry is the legacy shared one.""" return entry.unique_id is None or entry.unique_id == DOMAIN @callback def async_get_legacy_entry(hass: HomeAssistant) -> ConfigEntry | None: """Get the legacy config entry.""" for entry in hass.config_entries.async_entries(DOMAIN): if async_entry_is_legacy(entry): return entry return None def infrared_brightness_value_to_option(value: int) -> str | None: """Convert infrared brightness from value to option.""" return INFRARED_BRIGHTNESS_VALUES_MAP.get(value, None) def infrared_brightness_option_to_value(option: str) -> int | None: """Convert infrared brightness option to value.""" option_values = {v: k for k, v in INFRARED_BRIGHTNESS_VALUES_MAP.items()} return option_values.get(option, None) def convert_8_to_16(value: int) -> int: """Scale an 8 bit level into 16 bits.""" return (value << 8) | value def convert_16_to_8(value: int) -> int: """Scale a 16 bit level into 8 bits.""" return value >> 8 def lifx_features(bulb: Light) -> dict[str, Any]: """Return a feature map for this bulb, or a default map if unknown.""" features: dict[str, Any] = ( products.features_map.get(bulb.product) or products.features_map[1] ) return features def find_hsbk(hass: HomeAssistant, **kwargs: Any) -> list[float | int | None] | None: """Find the desired color from a number of possible inputs. Hue, Saturation, Brightness, Kelvin """ hue, saturation, brightness, kelvin = [None] * 4 if (color_name := kwargs.get(ATTR_COLOR_NAME)) is not None: try: hue, saturation = color_util.color_RGB_to_hs( *color_util.color_name_to_rgb(color_name) ) except ValueError: _LOGGER.warning( "Got unknown color %s, falling back to neutral white", color_name ) hue, saturation = (0, 0) if ATTR_HS_COLOR in kwargs: hue, saturation = kwargs[ATTR_HS_COLOR] elif ATTR_RGB_COLOR in kwargs: hue, saturation = color_util.color_RGB_to_hs(*kwargs[ATTR_RGB_COLOR]) elif ATTR_XY_COLOR in kwargs: hue, saturation = color_util.color_xy_to_hs(*kwargs[ATTR_XY_COLOR]) if hue is not None: assert saturation is not None hue = int(hue / 360 * 65535) saturation = int(saturation / 100 * 65535) kelvin = 3500 if ATTR_KELVIN in kwargs: _LOGGER.warning( "The 'kelvin' parameter is deprecated. Please use 'color_temp_kelvin' for" " all service calls" ) kelvin = kwargs.pop(ATTR_KELVIN) saturation = 0 if ATTR_COLOR_TEMP in kwargs: kelvin = color_util.color_temperature_mired_to_kelvin( kwargs.pop(ATTR_COLOR_TEMP) ) saturation = 0 if ATTR_COLOR_TEMP_KELVIN in kwargs: kelvin = kwargs.pop(ATTR_COLOR_TEMP_KELVIN) saturation = 0 if ATTR_BRIGHTNESS in kwargs: brightness = convert_8_to_16(kwargs[ATTR_BRIGHTNESS]) if ATTR_BRIGHTNESS_PCT in kwargs: brightness = convert_8_to_16(round(255 * kwargs[ATTR_BRIGHTNESS_PCT] / 100)) hsbk = [hue, saturation, brightness, kelvin] return None if hsbk == [None] * 4 else hsbk def merge_hsbk( base: list[float | int | None], change: list[float | int | None] ) -> list[float | int | None]: """Copy change on top of base, except when None. Hue, Saturation, Brightness, Kelvin """ return [b if c is None else c for b, c in zip(base, change)] def _get_mac_offset(mac_addr: str, offset: int) -> str: octets = [int(octet, 16) for octet in mac_addr.split(":")] octets[5] = (octets[5] + offset) % 256 return ":".join(f"{octet:02x}" for octet in octets) def _off_by_one_mac(firmware: str) -> bool: """Check if the firmware version has the off by one mac.""" return bool(firmware and AwesomeVersion(firmware) >= FIX_MAC_FW) def get_real_mac_addr(mac_addr: str, firmware: str) -> str: """Increment the last byte of the mac address by one for FW>3.70.""" return _get_mac_offset(mac_addr, 1) if _off_by_one_mac(firmware) else mac_addr def formatted_serial(serial_number: str) -> str: """Format the serial number to match the HA device registry.""" return dr.format_mac(serial_number) def mac_matches_serial_number(mac_addr: str, serial_number: str) -> bool: """Check if a mac address matches the serial number.""" formatted_mac = dr.format_mac(mac_addr) return bool( formatted_serial(serial_number) == formatted_mac or _get_mac_offset(serial_number, 1) == formatted_mac ) async def async_execute_lifx(method: Callable) -> Message: """Execute a lifx callback method and wait for a response.""" return ( await async_multi_execute_lifx_with_retries( [method], DEFAULT_ATTEMPTS, OVERALL_TIMEOUT ) )[0] async def async_multi_execute_lifx_with_retries( methods: list[Callable], attempts: int, overall_timeout: int ) -> list[Message]: """Execute multiple lifx callback methods with retries and wait for a response. This functional will the overall timeout by the number of attempts and wait for each method to return a result. If we don't get a result within the split timeout, we will send all methods that did not generate a response again. If we don't get a result after all attempts, we will raise an asyncio.TimeoutError exception. """ loop = asyncio.get_running_loop() futures: list[asyncio.Future] = [loop.create_future() for _ in methods] def _callback( bulb: Light, message: Message | None, future: asyncio.Future[Message] ) -> None: if message and not future.done(): future.set_result(message) timeout_per_attempt = overall_timeout / attempts for _ in range(attempts): for idx, method in enumerate(methods): future = futures[idx] if not future.done(): method(callb=partial(_callback, future=future)) _, pending = await asyncio.wait(futures, timeout=timeout_per_attempt) if not pending: break results: list[Message] = [] failed: list[str] = [] for idx, future in enumerate(futures): if not future.done() or not (result := future.result()): method = methods[idx] failed.append(str(getattr(method, "__name__", method))) else: results.append(result) if failed: failed_methods = ", ".join(failed) raise asyncio.TimeoutError( f"{failed_methods} timed out after {attempts} attempts" ) return results