core/homeassistant/components/fibaro/__init__.py

499 lines
18 KiB
Python

"""Support for the Fibaro devices."""
from collections import defaultdict
import logging
from typing import Optional
from fiblary3.client.v4.client import Client as FibaroClient, StateHandler
import voluptuous as vol
from homeassistant.const import (
ATTR_ARMED,
ATTR_BATTERY_LEVEL,
CONF_DEVICE_CLASS,
CONF_EXCLUDE,
CONF_ICON,
CONF_PASSWORD,
CONF_URL,
CONF_USERNAME,
CONF_WHITE_VALUE,
EVENT_HOMEASSISTANT_STOP,
)
from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.util import convert, slugify
_LOGGER = logging.getLogger(__name__)
ATTR_CURRENT_ENERGY_KWH = "current_energy_kwh"
ATTR_CURRENT_POWER_W = "current_power_w"
CONF_COLOR = "color"
CONF_DEVICE_CONFIG = "device_config"
CONF_DIMMING = "dimming"
CONF_GATEWAYS = "gateways"
CONF_PLUGINS = "plugins"
CONF_RESET_COLOR = "reset_color"
DOMAIN = "fibaro"
FIBARO_CONTROLLERS = "fibaro_controllers"
FIBARO_DEVICES = "fibaro_devices"
FIBARO_COMPONENTS = [
"binary_sensor",
"climate",
"cover",
"light",
"scene",
"sensor",
"switch",
]
FIBARO_TYPEMAP = {
"com.fibaro.multilevelSensor": "sensor",
"com.fibaro.binarySwitch": "switch",
"com.fibaro.multilevelSwitch": "switch",
"com.fibaro.FGD212": "light",
"com.fibaro.FGR": "cover",
"com.fibaro.doorSensor": "binary_sensor",
"com.fibaro.doorWindowSensor": "binary_sensor",
"com.fibaro.FGMS001": "binary_sensor",
"com.fibaro.heatDetector": "binary_sensor",
"com.fibaro.lifeDangerSensor": "binary_sensor",
"com.fibaro.smokeSensor": "binary_sensor",
"com.fibaro.remoteSwitch": "switch",
"com.fibaro.sensor": "sensor",
"com.fibaro.colorController": "light",
"com.fibaro.securitySensor": "binary_sensor",
"com.fibaro.hvac": "climate",
"com.fibaro.setpoint": "climate",
"com.fibaro.FGT001": "climate",
"com.fibaro.thermostatDanfoss": "climate",
}
DEVICE_CONFIG_SCHEMA_ENTRY = vol.Schema(
{
vol.Optional(CONF_DIMMING): cv.boolean,
vol.Optional(CONF_COLOR): cv.boolean,
vol.Optional(CONF_WHITE_VALUE): cv.boolean,
vol.Optional(CONF_RESET_COLOR): cv.boolean,
vol.Optional(CONF_DEVICE_CLASS): cv.string,
vol.Optional(CONF_ICON): cv.string,
}
)
FIBARO_ID_LIST_SCHEMA = vol.Schema([cv.string])
GATEWAY_CONFIG = vol.Schema(
{
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_URL): cv.url,
vol.Optional(CONF_PLUGINS, default=False): cv.boolean,
vol.Optional(CONF_EXCLUDE, default=[]): FIBARO_ID_LIST_SCHEMA,
vol.Optional(CONF_DEVICE_CONFIG, default={}): vol.Schema(
{cv.string: DEVICE_CONFIG_SCHEMA_ENTRY}
),
},
extra=vol.ALLOW_EXTRA,
)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{vol.Required(CONF_GATEWAYS): vol.All(cv.ensure_list, [GATEWAY_CONFIG])}
)
},
extra=vol.ALLOW_EXTRA,
)
class FibaroController:
"""Initiate Fibaro Controller Class."""
def __init__(self, config):
"""Initialize the Fibaro controller."""
self._client = FibaroClient(
config[CONF_URL], config[CONF_USERNAME], config[CONF_PASSWORD]
)
self._scene_map = None
# Whether to import devices from plugins
self._import_plugins = config[CONF_PLUGINS]
self._device_config = config[CONF_DEVICE_CONFIG]
self._room_map = None # Mapping roomId to room object
self._device_map = None # Mapping deviceId to device object
self.fibaro_devices = None # List of devices by type
self._callbacks = {} # Update value callbacks by deviceId
self._state_handler = None # Fiblary's StateHandler object
self._excluded_devices = config[CONF_EXCLUDE]
self.hub_serial = None # Unique serial number of the hub
def connect(self):
"""Start the communication with the Fibaro controller."""
try:
login = self._client.login.get()
info = self._client.info.get()
self.hub_serial = slugify(info.serialNumber)
except AssertionError:
_LOGGER.error("Can't connect to Fibaro HC. " "Please check URL.")
return False
if login is None or login.status is False:
_LOGGER.error(
"Invalid login for Fibaro HC. " "Please check username and password"
)
return False
self._room_map = {room.id: room for room in self._client.rooms.list()}
self._read_devices()
self._read_scenes()
return True
def enable_state_handler(self):
"""Start StateHandler thread for monitoring updates."""
self._state_handler = StateHandler(self._client, self._on_state_change)
def disable_state_handler(self):
"""Stop StateHandler thread used for monitoring updates."""
self._state_handler.stop()
self._state_handler = None
def _on_state_change(self, state):
"""Handle change report received from the HomeCenter."""
callback_set = set()
for change in state.get("changes", []):
try:
dev_id = change.pop("id")
if dev_id not in self._device_map.keys():
continue
device = self._device_map[dev_id]
for property_name, value in change.items():
if property_name == "log":
if value and value != "transfer OK":
_LOGGER.debug("LOG %s: %s", device.friendly_name, value)
continue
if property_name == "logTemp":
continue
if property_name in device.properties:
device.properties[property_name] = value
_LOGGER.debug(
"<- %s.%s = %s", device.ha_id, property_name, str(value)
)
else:
_LOGGER.warning("%s.%s not found", device.ha_id, property_name)
if dev_id in self._callbacks:
callback_set.add(dev_id)
except (ValueError, KeyError):
pass
for item in callback_set:
self._callbacks[item]()
def register(self, device_id, callback):
"""Register device with a callback for updates."""
self._callbacks[device_id] = callback
def get_children(self, device_id):
"""Get a list of child devices."""
return [
device
for device in self._device_map.values()
if device.parentId == device_id
]
def get_siblings(self, device_id):
"""Get the siblings of a device."""
return self.get_children(self._device_map[device_id].parentId)
@staticmethod
def _map_device_to_type(device):
"""Map device to HA device type."""
# Use our lookup table to identify device type
device_type = None
if "type" in device:
device_type = FIBARO_TYPEMAP.get(device.type)
if device_type is None and "baseType" in device:
device_type = FIBARO_TYPEMAP.get(device.baseType)
# We can also identify device type by its capabilities
if device_type is None:
if "setBrightness" in device.actions:
device_type = "light"
elif "turnOn" in device.actions:
device_type = "switch"
elif "open" in device.actions:
device_type = "cover"
elif "value" in device.properties:
if device.properties.value in ("true", "false"):
device_type = "binary_sensor"
else:
device_type = "sensor"
# Switches that control lights should show up as lights
if (
device_type == "switch"
and device.properties.get("isLight", "false") == "true"
):
device_type = "light"
return device_type
def _read_scenes(self):
scenes = self._client.scenes.list()
self._scene_map = {}
for device in scenes:
if not device.visible:
continue
device.fibaro_controller = self
if device.roomID == 0:
room_name = "Unknown"
else:
room_name = self._room_map[device.roomID].name
device.room_name = room_name
device.friendly_name = f"{room_name} {device.name}"
device.ha_id = "scene_{}_{}_{}".format(
slugify(room_name), slugify(device.name), device.id
)
device.unique_id_str = f"{self.hub_serial}.scene.{device.id}"
self._scene_map[device.id] = device
self.fibaro_devices["scene"].append(device)
def _read_devices(self):
"""Read and process the device list."""
devices = self._client.devices.list()
self._device_map = {}
self.fibaro_devices = defaultdict(list)
last_climate_parent = None
for device in devices:
try:
device.fibaro_controller = self
if device.roomID == 0:
room_name = "Unknown"
else:
room_name = self._room_map[device.roomID].name
device.room_name = room_name
device.friendly_name = room_name + " " + device.name
device.ha_id = "{}_{}_{}".format(
slugify(room_name), slugify(device.name), device.id
)
if (
device.enabled
and (
"isPlugin" not in device
or (not device.isPlugin or self._import_plugins)
)
and device.ha_id not in self._excluded_devices
):
device.mapped_type = self._map_device_to_type(device)
device.device_config = self._device_config.get(device.ha_id, {})
else:
device.mapped_type = None
dtype = device.mapped_type
if dtype:
device.unique_id_str = f"{self.hub_serial}.{device.id}"
self._device_map[device.id] = device
if dtype != "climate":
self.fibaro_devices[dtype].append(device)
else:
# if a sibling of this has been added, skip this one
# otherwise add the first visible device in the group
# which is a hack, but solves a problem with FGT having
# hidden compatibility devices before the real device
if last_climate_parent != device.parentId and device.visible:
self.fibaro_devices[dtype].append(device)
last_climate_parent = device.parentId
_LOGGER.debug(
"%s (%s, %s) -> %s %s",
device.ha_id,
device.type,
device.baseType,
dtype,
str(device),
)
except (KeyError, ValueError):
pass
def setup(hass, base_config):
"""Set up the Fibaro Component."""
gateways = base_config[DOMAIN][CONF_GATEWAYS]
hass.data[FIBARO_CONTROLLERS] = {}
def stop_fibaro(event):
"""Stop Fibaro Thread."""
_LOGGER.info("Shutting down Fibaro connection")
for controller in hass.data[FIBARO_CONTROLLERS].values():
controller.disable_state_handler()
hass.data[FIBARO_DEVICES] = {}
for component in FIBARO_COMPONENTS:
hass.data[FIBARO_DEVICES][component] = []
for gateway in gateways:
controller = FibaroController(gateway)
if controller.connect():
hass.data[FIBARO_CONTROLLERS][controller.hub_serial] = controller
for component in FIBARO_COMPONENTS:
hass.data[FIBARO_DEVICES][component].extend(
controller.fibaro_devices[component]
)
if hass.data[FIBARO_CONTROLLERS]:
for component in FIBARO_COMPONENTS:
discovery.load_platform(hass, component, DOMAIN, {}, base_config)
for controller in hass.data[FIBARO_CONTROLLERS].values():
controller.enable_state_handler()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_fibaro)
return True
return False
class FibaroDevice(Entity):
"""Representation of a Fibaro device entity."""
def __init__(self, fibaro_device):
"""Initialize the device."""
self.fibaro_device = fibaro_device
self.controller = fibaro_device.fibaro_controller
self._name = fibaro_device.friendly_name
self.ha_id = fibaro_device.ha_id
async def async_added_to_hass(self):
"""Call when entity is added to hass."""
self.controller.register(self.fibaro_device.id, self._update_callback)
def _update_callback(self):
"""Update the state."""
self.schedule_update_ha_state(True)
@property
def level(self):
"""Get the level of Fibaro device."""
if "value" in self.fibaro_device.properties:
return self.fibaro_device.properties.value
return None
@property
def level2(self):
"""Get the tilt level of Fibaro device."""
if "value2" in self.fibaro_device.properties:
return self.fibaro_device.properties.value2
return None
def dont_know_message(self, action):
"""Make a warning in case we don't know how to perform an action."""
_LOGGER.warning(
"Not sure how to setValue: %s " "(available actions: %s)",
str(self.ha_id),
str(self.fibaro_device.actions),
)
def set_level(self, level):
"""Set the level of Fibaro device."""
self.action("setValue", level)
if "value" in self.fibaro_device.properties:
self.fibaro_device.properties.value = level
if "brightness" in self.fibaro_device.properties:
self.fibaro_device.properties.brightness = level
def set_level2(self, level):
"""Set the level2 of Fibaro device."""
self.action("setValue2", level)
if "value2" in self.fibaro_device.properties:
self.fibaro_device.properties.value2 = level
def call_turn_on(self):
"""Turn on the Fibaro device."""
self.action("turnOn")
def call_turn_off(self):
"""Turn off the Fibaro device."""
self.action("turnOff")
def call_set_color(self, red, green, blue, white):
"""Set the color of Fibaro device."""
red = int(max(0, min(255, red)))
green = int(max(0, min(255, green)))
blue = int(max(0, min(255, blue)))
white = int(max(0, min(255, white)))
color_str = f"{red},{green},{blue},{white}"
self.fibaro_device.properties.color = color_str
self.action("setColor", str(red), str(green), str(blue), str(white))
def action(self, cmd, *args):
"""Perform an action on the Fibaro HC."""
if cmd in self.fibaro_device.actions:
getattr(self.fibaro_device, cmd)(*args)
_LOGGER.debug("-> %s.%s%s called", str(self.ha_id), str(cmd), str(args))
else:
self.dont_know_message(cmd)
@property
def hidden(self) -> bool:
"""Return True if the entity should be hidden from UIs."""
return self.fibaro_device.visible is False
@property
def current_power_w(self):
"""Return the current power usage in W."""
if "power" in self.fibaro_device.properties:
power = self.fibaro_device.properties.power
if power:
return convert(power, float, 0.0)
else:
return None
@property
def current_binary_state(self):
"""Return the current binary state."""
if self.fibaro_device.properties.value == "false":
return False
if (
self.fibaro_device.properties.value == "true"
or int(self.fibaro_device.properties.value) > 0
):
return True
return False
@property
def unique_id(self) -> str:
"""Return a unique ID."""
return self.fibaro_device.unique_id_str
@property
def name(self) -> Optional[str]:
"""Return the name of the device."""
return self._name
@property
def should_poll(self):
"""Get polling requirement from fibaro device."""
return False
def update(self):
"""Call to update state."""
pass
@property
def device_state_attributes(self):
"""Return the state attributes of the device."""
attr = {}
try:
if "battery" in self.fibaro_device.interfaces:
attr[ATTR_BATTERY_LEVEL] = int(
self.fibaro_device.properties.batteryLevel
)
if "fibaroAlarmArm" in self.fibaro_device.interfaces:
attr[ATTR_ARMED] = bool(self.fibaro_device.properties.armed)
if "power" in self.fibaro_device.interfaces:
attr[ATTR_CURRENT_POWER_W] = convert(
self.fibaro_device.properties.power, float, 0.0
)
if "energy" in self.fibaro_device.interfaces:
attr[ATTR_CURRENT_ENERGY_KWH] = convert(
self.fibaro_device.properties.energy, float, 0.0
)
except (ValueError, KeyError):
pass
attr["fibaro_id"] = self.fibaro_device.id
return attr