""" Support for the LIFX platform that implements lights. For more details about this platform, please refer to the documentation at https://home-assistant.io/components/light.lifx/ """ import logging import asyncio import sys import math from os import path from functools import partial from datetime import timedelta import voluptuous as vol from homeassistant.components.light import ( Light, DOMAIN, PLATFORM_SCHEMA, LIGHT_TURN_ON_SCHEMA, ATTR_BRIGHTNESS, ATTR_BRIGHTNESS_PCT, ATTR_COLOR_NAME, ATTR_RGB_COLOR, ATTR_XY_COLOR, ATTR_COLOR_TEMP, ATTR_KELVIN, ATTR_TRANSITION, ATTR_EFFECT, SUPPORT_BRIGHTNESS, SUPPORT_COLOR_TEMP, SUPPORT_RGB_COLOR, SUPPORT_XY_COLOR, SUPPORT_TRANSITION, SUPPORT_EFFECT, VALID_BRIGHTNESS, VALID_BRIGHTNESS_PCT, preprocess_turn_on_alternatives) from homeassistant.config import load_yaml_config_file from homeassistant.const import ATTR_ENTITY_ID, EVENT_HOMEASSISTANT_STOP from homeassistant import util from homeassistant.core import callback from homeassistant.helpers.event import async_track_point_in_utc_time from homeassistant.helpers.service import extract_entity_ids import homeassistant.helpers.config_validation as cv import homeassistant.util.color as color_util _LOGGER = logging.getLogger(__name__) REQUIREMENTS = ['aiolifx==0.6.1', 'aiolifx_effects==0.1.2'] UDP_BROADCAST_PORT = 56700 DISCOVERY_INTERVAL = 60 MESSAGE_TIMEOUT = 1.0 MESSAGE_RETRIES = 8 UNAVAILABLE_GRACE = 90 CONF_SERVER = 'server' CONF_BROADCAST = 'broadcast' PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ vol.Optional(CONF_SERVER, default='0.0.0.0'): cv.string, vol.Optional(CONF_BROADCAST, default='255.255.255.255'): cv.string, }) SERVICE_LIFX_SET_STATE = 'lifx_set_state' ATTR_INFRARED = 'infrared' ATTR_ZONES = 'zones' ATTR_POWER = 'power' LIFX_SET_STATE_SCHEMA = LIGHT_TURN_ON_SCHEMA.extend({ ATTR_INFRARED: vol.All(vol.Coerce(int), vol.Clamp(min=0, max=255)), ATTR_ZONES: vol.All(cv.ensure_list, [cv.positive_int]), ATTR_POWER: cv.boolean, }) SERVICE_EFFECT_PULSE = 'lifx_effect_pulse' SERVICE_EFFECT_COLORLOOP = 'lifx_effect_colorloop' SERVICE_EFFECT_STOP = 'lifx_effect_stop' ATTR_POWER_ON = 'power_on' ATTR_MODE = 'mode' ATTR_PERIOD = 'period' ATTR_CYCLES = 'cycles' ATTR_SPREAD = 'spread' ATTR_CHANGE = 'change' PULSE_MODE_BLINK = 'blink' PULSE_MODE_BREATHE = 'breathe' PULSE_MODE_PING = 'ping' PULSE_MODE_STROBE = 'strobe' PULSE_MODE_SOLID = 'solid' PULSE_MODES = [PULSE_MODE_BLINK, PULSE_MODE_BREATHE, PULSE_MODE_PING, PULSE_MODE_STROBE, PULSE_MODE_SOLID] LIFX_EFFECT_SCHEMA = vol.Schema({ vol.Optional(ATTR_ENTITY_ID): cv.entity_ids, vol.Optional(ATTR_POWER_ON, default=True): cv.boolean, }) LIFX_EFFECT_PULSE_SCHEMA = LIFX_EFFECT_SCHEMA.extend({ ATTR_BRIGHTNESS: VALID_BRIGHTNESS, ATTR_BRIGHTNESS_PCT: VALID_BRIGHTNESS_PCT, ATTR_COLOR_NAME: cv.string, ATTR_RGB_COLOR: vol.All(vol.ExactSequence((cv.byte, cv.byte, cv.byte)), vol.Coerce(tuple)), ATTR_COLOR_TEMP: vol.All(vol.Coerce(int), vol.Range(min=1)), ATTR_KELVIN: vol.All(vol.Coerce(int), vol.Range(min=0)), ATTR_PERIOD: vol.All(vol.Coerce(float), vol.Range(min=0.05)), ATTR_CYCLES: vol.All(vol.Coerce(float), vol.Range(min=1)), ATTR_MODE: vol.In(PULSE_MODES), }) LIFX_EFFECT_COLORLOOP_SCHEMA = LIFX_EFFECT_SCHEMA.extend({ ATTR_BRIGHTNESS: VALID_BRIGHTNESS, ATTR_BRIGHTNESS_PCT: VALID_BRIGHTNESS_PCT, ATTR_PERIOD: vol.All(vol.Coerce(float), vol.Clamp(min=0.05)), ATTR_CHANGE: vol.All(vol.Coerce(float), vol.Clamp(min=0, max=360)), ATTR_SPREAD: vol.All(vol.Coerce(float), vol.Clamp(min=0, max=360)), ATTR_TRANSITION: vol.All(vol.Coerce(float), vol.Range(min=0)), }) LIFX_EFFECT_STOP_SCHEMA = vol.Schema({ vol.Optional(ATTR_ENTITY_ID): cv.entity_ids, }) def aiolifx(): """Return the aiolifx module.""" import aiolifx as aiolifx_module return aiolifx_module def aiolifx_effects(): """Return the aiolifx_effects module.""" import aiolifx_effects as aiolifx_effects_module return aiolifx_effects_module @asyncio.coroutine def async_setup_platform(hass, config, async_add_devices, discovery_info=None): """Set up the LIFX platform.""" if sys.platform == 'win32': _LOGGER.warning("The lifx platform is known to not work on Windows. " "Consider using the lifx_legacy platform instead") server_addr = config.get(CONF_SERVER) lifx_manager = LIFXManager(hass, async_add_devices) lifx_discovery = aiolifx().LifxDiscovery( hass.loop, lifx_manager, discovery_interval=DISCOVERY_INTERVAL, broadcast_ip=config.get(CONF_BROADCAST)) coro = hass.loop.create_datagram_endpoint( lambda: lifx_discovery, local_addr=(server_addr, UDP_BROADCAST_PORT)) hass.async_add_job(coro) @callback def cleanup(event): """Clean up resources.""" lifx_discovery.cleanup() hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, cleanup) return True def lifx_features(device): """Return a feature map for this device, or a default map if unknown.""" return aiolifx().products.features_map.get(device.product) or \ aiolifx().products.features_map.get(1) def find_hsbk(**kwargs): """Find the desired color from a number of possible inputs.""" hue, saturation, brightness, kelvin = [None]*4 preprocess_turn_on_alternatives(kwargs) if ATTR_RGB_COLOR in kwargs: hue, saturation, brightness = \ color_util.color_RGB_to_hsv(*kwargs[ATTR_RGB_COLOR]) saturation = convert_8_to_16(saturation) brightness = convert_8_to_16(brightness) kelvin = 3500 if ATTR_XY_COLOR in kwargs: hue, saturation = color_util.color_xy_to_hs(*kwargs[ATTR_XY_COLOR]) saturation = convert_8_to_16(saturation) kelvin = 3500 if ATTR_COLOR_TEMP in kwargs: kelvin = int(color_util.color_temperature_mired_to_kelvin( kwargs[ATTR_COLOR_TEMP])) saturation = 0 if ATTR_BRIGHTNESS in kwargs: brightness = convert_8_to_16(kwargs[ATTR_BRIGHTNESS]) hsbk = [hue, saturation, brightness, kelvin] return None if hsbk == [None]*4 else hsbk def merge_hsbk(base, change): """Copy change on top of base, except when None.""" if change is None: return None return list(map(lambda x, y: y if y is not None else x, base, change)) class LIFXManager(object): """Representation of all known LIFX entities.""" def __init__(self, hass, async_add_devices): """Initialize the light.""" self.entities = {} self.hass = hass self.async_add_devices = async_add_devices self.effects_conductor = aiolifx_effects().Conductor(loop=hass.loop) descriptions = load_yaml_config_file( path.join(path.dirname(__file__), 'services.yaml')) self.register_set_state(descriptions) self.register_effects(descriptions) def register_set_state(self, descriptions): """Register the LIFX set_state service call.""" @asyncio.coroutine def async_service_handle(service): """Apply a service.""" tasks = [] for light in self.service_to_entities(service): if service.service == SERVICE_LIFX_SET_STATE: task = light.async_set_state(**service.data) tasks.append(self.hass.async_add_job(task)) if tasks: yield from asyncio.wait(tasks, loop=self.hass.loop) self.hass.services.async_register( DOMAIN, SERVICE_LIFX_SET_STATE, async_service_handle, descriptions.get(SERVICE_LIFX_SET_STATE), schema=LIFX_SET_STATE_SCHEMA) def register_effects(self, descriptions): """Register the LIFX effects as hass service calls.""" @asyncio.coroutine def async_service_handle(service): """Apply a service, i.e. start an effect.""" entities = self.service_to_entities(service) if entities: yield from self.start_effect( entities, service.service, **service.data) self.hass.services.async_register( DOMAIN, SERVICE_EFFECT_PULSE, async_service_handle, descriptions.get(SERVICE_EFFECT_PULSE), schema=LIFX_EFFECT_PULSE_SCHEMA) self.hass.services.async_register( DOMAIN, SERVICE_EFFECT_COLORLOOP, async_service_handle, descriptions.get(SERVICE_EFFECT_COLORLOOP), schema=LIFX_EFFECT_COLORLOOP_SCHEMA) self.hass.services.async_register( DOMAIN, SERVICE_EFFECT_STOP, async_service_handle, descriptions.get(SERVICE_EFFECT_STOP), schema=LIFX_EFFECT_STOP_SCHEMA) @asyncio.coroutine def start_effect(self, entities, service, **kwargs): """Start a light effect on entities.""" devices = list(map(lambda l: l.device, entities)) if service == SERVICE_EFFECT_PULSE: effect = aiolifx_effects().EffectPulse( power_on=kwargs.get(ATTR_POWER_ON), period=kwargs.get(ATTR_PERIOD), cycles=kwargs.get(ATTR_CYCLES), mode=kwargs.get(ATTR_MODE), hsbk=find_hsbk(**kwargs), ) yield from self.effects_conductor.start(effect, devices) elif service == SERVICE_EFFECT_COLORLOOP: preprocess_turn_on_alternatives(kwargs) brightness = None if ATTR_BRIGHTNESS in kwargs: brightness = convert_8_to_16(kwargs[ATTR_BRIGHTNESS]) effect = aiolifx_effects().EffectColorloop( power_on=kwargs.get(ATTR_POWER_ON), period=kwargs.get(ATTR_PERIOD), change=kwargs.get(ATTR_CHANGE), spread=kwargs.get(ATTR_SPREAD), transition=kwargs.get(ATTR_TRANSITION), brightness=brightness, ) yield from self.effects_conductor.start(effect, devices) elif service == SERVICE_EFFECT_STOP: yield from self.effects_conductor.stop(devices) def service_to_entities(self, service): """Return the known devices that a service call mentions.""" entity_ids = extract_entity_ids(self.hass, service) if entity_ids: entities = [entity for entity in self.entities.values() if entity.entity_id in entity_ids] else: entities = list(self.entities.values()) return entities @callback def register(self, device): """Handler for newly detected bulb.""" self.hass.async_add_job(self.async_register(device)) @asyncio.coroutine def async_register(self, device): """Handler for newly detected bulb.""" if device.mac_addr in self.entities: entity = self.entities[device.mac_addr] entity.registered = True _LOGGER.debug("%s register AGAIN", entity.who) yield from entity.update_hass() else: _LOGGER.debug("%s register NEW", device.ip_addr) # Read initial state ack = AwaitAioLIFX().wait version_resp = yield from ack(device.get_version) if version_resp: color_resp = yield from ack(device.get_color) if version_resp is None or color_resp is None: _LOGGER.error("Failed to initialize %s", device.ip_addr) else: device.timeout = MESSAGE_TIMEOUT device.retry_count = MESSAGE_RETRIES device.unregister_timeout = UNAVAILABLE_GRACE if lifx_features(device)["multizone"]: entity = LIFXStrip(device, self.effects_conductor) elif lifx_features(device)["color"]: entity = LIFXColor(device, self.effects_conductor) else: entity = LIFXWhite(device, self.effects_conductor) _LOGGER.debug("%s register READY", entity.who) self.entities[device.mac_addr] = entity self.async_add_devices([entity], True) @callback def unregister(self, device): """Handle disappearing bulbs.""" if device.mac_addr in self.entities: entity = self.entities[device.mac_addr] _LOGGER.debug("%s unregister", entity.who) entity.registered = False self.hass.async_add_job(entity.async_update_ha_state()) class AwaitAioLIFX: """Wait for an aiolifx callback and return the message.""" def __init__(self): """Initialize the wrapper.""" self.device = None self.message = None self.event = asyncio.Event() @callback def callback(self, device, message): """Handle responses.""" self.device = device self.message = message self.event.set() @asyncio.coroutine def wait(self, method): """Call an aiolifx method and wait for its response.""" self.device = None self.message = None self.event.clear() method(callb=self.callback) yield from self.event.wait() return self.message def convert_8_to_16(value): """Scale an 8 bit level into 16 bits.""" return (value << 8) | value def convert_16_to_8(value): """Scale a 16 bit level into 8 bits.""" return value >> 8 class LIFXLight(Light): """Representation of a LIFX light.""" def __init__(self, device, effects_conductor): """Initialize the light.""" self.device = device self.effects_conductor = effects_conductor self.registered = True self.postponed_update = None self.lock = asyncio.Lock() @property def available(self): """Return the availability of the device.""" return self.registered @property def name(self): """Return the name of the device.""" return self.device.label @property def who(self): """Return a string identifying the device.""" return "%s (%s)" % (self.device.ip_addr, self.name) @property def min_mireds(self): """Return the coldest color_temp that this light supports.""" kelvin = lifx_features(self.device)['max_kelvin'] return math.floor(color_util.color_temperature_kelvin_to_mired(kelvin)) @property def max_mireds(self): """Return the warmest color_temp that this light supports.""" kelvin = lifx_features(self.device)['min_kelvin'] return math.ceil(color_util.color_temperature_kelvin_to_mired(kelvin)) @property def supported_features(self): """Flag supported features.""" support = SUPPORT_BRIGHTNESS | SUPPORT_TRANSITION | SUPPORT_EFFECT device_features = lifx_features(self.device) if device_features['min_kelvin'] != device_features['max_kelvin']: support |= SUPPORT_COLOR_TEMP return support @property def brightness(self): """Return the brightness of this light between 0..255.""" brightness = convert_16_to_8(self.device.color[2]) _LOGGER.debug("brightness: %d", brightness) return brightness @property def color_temp(self): """Return the color temperature.""" kelvin = self.device.color[3] temperature = color_util.color_temperature_kelvin_to_mired(kelvin) _LOGGER.debug("color_temp: %d", temperature) return temperature @property def is_on(self): """Return true if device is on.""" return self.device.power_level != 0 @property def effect(self): """Return the name of the currently running effect.""" effect = self.effects_conductor.effect(self.device) if effect: return 'lifx_effect_' + effect.name return None @asyncio.coroutine def update_hass(self, now=None): """Request new status and push it to hass.""" self.postponed_update = None yield from self.async_update() yield from self.async_update_ha_state() @asyncio.coroutine def update_during_transition(self, when): """Update state at the start and end of a transition.""" if self.postponed_update: self.postponed_update() # Transition has started yield from self.update_hass() # Transition has ended if when > 0: self.postponed_update = async_track_point_in_utc_time( self.hass, self.update_hass, util.dt.utcnow() + timedelta(milliseconds=when)) @asyncio.coroutine def async_turn_on(self, **kwargs): """Turn the device on.""" kwargs[ATTR_POWER] = True self.hass.async_add_job(self.async_set_state(**kwargs)) @asyncio.coroutine def async_turn_off(self, **kwargs): """Turn the device off.""" kwargs[ATTR_POWER] = False self.hass.async_add_job(self.async_set_state(**kwargs)) @asyncio.coroutine def async_set_state(self, **kwargs): """Set a color on the light and turn it on/off.""" with (yield from self.lock): bulb = self.device yield from self.effects_conductor.stop([bulb]) if ATTR_EFFECT in kwargs: yield from self.default_effect(**kwargs) return if ATTR_INFRARED in kwargs: bulb.set_infrared(convert_8_to_16(kwargs[ATTR_INFRARED])) if ATTR_TRANSITION in kwargs: fade = int(kwargs[ATTR_TRANSITION] * 1000) else: fade = 0 # These are both False if ATTR_POWER is not set power_on = kwargs.get(ATTR_POWER, False) power_off = not kwargs.get(ATTR_POWER, True) hsbk = find_hsbk(**kwargs) # Send messages, waiting for ACK each time ack = AwaitAioLIFX().wait if not self.is_on: if power_off: yield from self.set_power(ack, False) if hsbk: yield from self.set_color(ack, hsbk, kwargs) if power_on: yield from self.set_power(ack, True, duration=fade) else: if power_on: yield from self.set_power(ack, True) if hsbk: yield from self.set_color(ack, hsbk, kwargs, duration=fade) if power_off: yield from self.set_power(ack, False, duration=fade) # Avoid state ping-pong by holding off updates as the state settles yield from asyncio.sleep(0.3) # Update when the transition starts and ends yield from self.update_during_transition(fade) @asyncio.coroutine def set_power(self, ack, pwr, duration=0): """Send a power change to the device.""" yield from ack(partial(self.device.set_power, pwr, duration=duration)) @asyncio.coroutine def set_color(self, ack, hsbk, kwargs, duration=0): """Send a color change to the device.""" hsbk = merge_hsbk(self.device.color, hsbk) yield from ack(partial(self.device.set_color, hsbk, duration=duration)) @asyncio.coroutine def default_effect(self, **kwargs): """Start an effect with default parameters.""" service = kwargs[ATTR_EFFECT] data = { ATTR_ENTITY_ID: self.entity_id, } yield from self.hass.services.async_call(DOMAIN, service, data) @asyncio.coroutine def async_update(self): """Update bulb status.""" _LOGGER.debug("%s async_update", self.who) if self.available and not self.lock.locked(): yield from AwaitAioLIFX().wait(self.device.get_color) class LIFXWhite(LIFXLight): """Representation of a white-only LIFX light.""" @property def effect_list(self): """Return the list of supported effects for this light.""" return [ SERVICE_EFFECT_PULSE, SERVICE_EFFECT_STOP, ] class LIFXColor(LIFXLight): """Representation of a color LIFX light.""" @property def supported_features(self): """Flag supported features.""" support = super().supported_features support |= SUPPORT_RGB_COLOR | SUPPORT_XY_COLOR return support @property def effect_list(self): """Return the list of supported effects for this light.""" return [ SERVICE_EFFECT_COLORLOOP, SERVICE_EFFECT_PULSE, SERVICE_EFFECT_STOP, ] @property def rgb_color(self): """Return the RGB value.""" hue, sat, bri, _ = self.device.color return color_util.color_hsv_to_RGB( hue, convert_16_to_8(sat), convert_16_to_8(bri)) class LIFXStrip(LIFXColor): """Representation of a LIFX light strip with multiple zones.""" @asyncio.coroutine def set_color(self, ack, hsbk, kwargs, duration=0): """Send a color change to the device.""" bulb = self.device num_zones = len(bulb.color_zones) zones = kwargs.get(ATTR_ZONES) if zones is None: # Fast track: setting all zones to the same brightness and color # can be treated as a single-zone bulb. if hsbk[2] is not None and hsbk[3] is not None: yield from super().set_color(ack, hsbk, kwargs, duration) return zones = list(range(0, num_zones)) else: zones = list(filter(lambda x: x < num_zones, set(zones))) # Zone brightness is not reported when powered off if not self.is_on and hsbk[2] is None: yield from self.set_power(ack, True) yield from asyncio.sleep(0.3) yield from self.update_color_zones() yield from self.set_power(ack, False) yield from asyncio.sleep(0.3) # Send new color to each zone for index, zone in enumerate(zones): zone_hsbk = merge_hsbk(bulb.color_zones[zone], hsbk) apply = 1 if (index == len(zones)-1) else 0 set_zone = partial(bulb.set_color_zones, start_index=zone, end_index=zone, color=zone_hsbk, duration=duration, apply=apply) yield from ack(set_zone) @asyncio.coroutine def async_update(self): """Update strip status.""" if self.available and not self.lock.locked(): yield from super().async_update() yield from self.update_color_zones() @asyncio.coroutine def update_color_zones(self): """Get updated color information for each zone.""" zone = 0 top = 1 while self.available and zone < top: # Each get_color_zones can update 8 zones at once resp = yield from AwaitAioLIFX().wait(partial( self.device.get_color_zones, start_index=zone)) if resp: zone += 8 top = resp.count