core/homeassistant/components/lifx/util.py

240 lines
7.5 KiB
Python

"""Support for LIFX."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from functools import partial
from typing import TYPE_CHECKING, Any
from aiolifx import products
from aiolifx.aiolifx import Light
from aiolifx.message import Message
from awesomeversion import AwesomeVersion
from homeassistant.components.light import (
ATTR_BRIGHTNESS,
ATTR_BRIGHTNESS_PCT,
ATTR_COLOR_NAME,
ATTR_COLOR_TEMP_KELVIN,
ATTR_HS_COLOR,
ATTR_RGB_COLOR,
ATTR_XY_COLOR,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr
from homeassistant.util import color as color_util
from .const import (
_ATTR_COLOR_TEMP,
_LOGGER,
DEFAULT_ATTEMPTS,
DOMAIN,
INFRARED_BRIGHTNESS_VALUES_MAP,
OVERALL_TIMEOUT,
)
if TYPE_CHECKING:
from .coordinator import LIFXConfigEntry
FIX_MAC_FW = AwesomeVersion("3.70")
@callback
def async_entry_is_legacy(entry: LIFXConfigEntry) -> bool:
"""Check if a config entry is the legacy shared one."""
return entry.unique_id is None or entry.unique_id == DOMAIN
@callback
def async_get_legacy_entry(hass: HomeAssistant) -> LIFXConfigEntry | None:
"""Get the legacy config entry."""
for entry in hass.config_entries.async_entries(DOMAIN):
if async_entry_is_legacy(entry):
return entry
return None
def infrared_brightness_value_to_option(value: int) -> str | None:
"""Convert infrared brightness from value to option."""
return INFRARED_BRIGHTNESS_VALUES_MAP.get(value, None)
def infrared_brightness_option_to_value(option: str) -> int | None:
"""Convert infrared brightness option to value."""
option_values = {v: k for k, v in INFRARED_BRIGHTNESS_VALUES_MAP.items()}
return option_values.get(option)
def convert_8_to_16(value: int) -> int:
"""Scale an 8 bit level into 16 bits."""
return (value << 8) | value
def convert_16_to_8(value: int) -> int:
"""Scale a 16 bit level into 8 bits."""
return value >> 8
def lifx_features(bulb: Light) -> dict[str, Any]:
"""Return a feature map for this bulb, or a default map if unknown."""
features: dict[str, Any] = (
products.features_map.get(bulb.product) or products.features_map[1]
)
return features
def find_hsbk(hass: HomeAssistant, **kwargs: Any) -> list[float | int | None] | None:
"""Find the desired color from a number of possible inputs.
Hue, Saturation, Brightness, Kelvin
"""
hue, saturation, brightness, kelvin = [None] * 4
if (color_name := kwargs.get(ATTR_COLOR_NAME)) is not None:
try:
hue, saturation = color_util.color_RGB_to_hs(
*color_util.color_name_to_rgb(color_name)
)
except ValueError:
_LOGGER.warning(
"Got unknown color %s, falling back to neutral white", color_name
)
hue, saturation = (0, 0)
if ATTR_HS_COLOR in kwargs:
hue, saturation = kwargs[ATTR_HS_COLOR]
elif ATTR_RGB_COLOR in kwargs:
hue, saturation = color_util.color_RGB_to_hs(*kwargs[ATTR_RGB_COLOR])
elif ATTR_XY_COLOR in kwargs:
hue, saturation = color_util.color_xy_to_hs(*kwargs[ATTR_XY_COLOR])
if hue is not None:
assert saturation is not None
hue = int(hue / 360 * 65535)
saturation = int(saturation / 100 * 65535)
kelvin = 3500
if ATTR_COLOR_TEMP_KELVIN not in kwargs and _ATTR_COLOR_TEMP in kwargs:
# added in 2025.1, can be removed in 2026.1
_LOGGER.warning(
"The 'color_temp' parameter is deprecated. Please use 'color_temp_kelvin' for"
" all service calls"
)
kelvin = color_util.color_temperature_mired_to_kelvin(
kwargs.pop(_ATTR_COLOR_TEMP)
)
saturation = 0
if ATTR_COLOR_TEMP_KELVIN in kwargs:
kelvin = kwargs.pop(ATTR_COLOR_TEMP_KELVIN)
saturation = 0
if ATTR_BRIGHTNESS in kwargs:
brightness = convert_8_to_16(kwargs[ATTR_BRIGHTNESS])
if ATTR_BRIGHTNESS_PCT in kwargs:
brightness = convert_8_to_16(round(255 * kwargs[ATTR_BRIGHTNESS_PCT] / 100))
hsbk = [hue, saturation, brightness, kelvin]
return None if hsbk == [None] * 4 else hsbk
def merge_hsbk(
base: list[float | int | None], change: list[float | int | None]
) -> list[float | int | None]:
"""Copy change on top of base, except when None.
Hue, Saturation, Brightness, Kelvin
"""
return [b if c is None else c for b, c in zip(base, change, strict=False)]
def _get_mac_offset(mac_addr: str, offset: int) -> str:
octets = [int(octet, 16) for octet in mac_addr.split(":")]
octets[5] = (octets[5] + offset) % 256
return ":".join(f"{octet:02x}" for octet in octets)
def _off_by_one_mac(firmware: str) -> bool:
"""Check if the firmware version has the off by one mac."""
return bool(firmware and AwesomeVersion(firmware) >= FIX_MAC_FW)
def get_real_mac_addr(mac_addr: str, firmware: str) -> str:
"""Increment the last byte of the mac address by one for FW>3.70."""
return _get_mac_offset(mac_addr, 1) if _off_by_one_mac(firmware) else mac_addr
def formatted_serial(serial_number: str) -> str:
"""Format the serial number to match the HA device registry."""
return dr.format_mac(serial_number)
def mac_matches_serial_number(mac_addr: str, serial_number: str) -> bool:
"""Check if a mac address matches the serial number."""
formatted_mac = dr.format_mac(mac_addr)
return bool(
formatted_serial(serial_number) == formatted_mac
or _get_mac_offset(serial_number, 1) == formatted_mac
)
async def async_execute_lifx(method: Callable) -> Message:
"""Execute a lifx callback method and wait for a response."""
return (
await async_multi_execute_lifx_with_retries(
[method], DEFAULT_ATTEMPTS, OVERALL_TIMEOUT
)
)[0]
async def async_multi_execute_lifx_with_retries(
methods: list[Callable], attempts: int, overall_timeout: int
) -> list[Message]:
"""Execute multiple lifx callback methods with retries and wait for a response.
This functional will the overall timeout by the number of attempts and
wait for each method to return a result. If we don't get a result
within the split timeout, we will send all methods that did not generate
a response again.
If we don't get a result after all attempts, we will raise an
TimeoutError exception.
"""
loop = asyncio.get_running_loop()
futures: list[asyncio.Future] = [loop.create_future() for _ in methods]
def _callback(
bulb: Light, message: Message | None, future: asyncio.Future[Message]
) -> None:
if message and not future.done():
future.set_result(message)
timeout_per_attempt = overall_timeout / attempts
for _ in range(attempts):
for idx, method in enumerate(methods):
future = futures[idx]
if not future.done():
method(callb=partial(_callback, future=future))
_, pending = await asyncio.wait(futures, timeout=timeout_per_attempt)
if not pending:
break
results: list[Message] = []
failed: list[str] = []
for idx, future in enumerate(futures):
if not future.done() or not (result := future.result()):
method = methods[idx]
failed.append(str(getattr(method, "__name__", method)))
else:
results.append(result)
if failed:
failed_methods = ", ".join(failed)
raise TimeoutError(f"{failed_methods} timed out after {attempts} attempts")
return results