211 lines
7.3 KiB
Python
211 lines
7.3 KiB
Python
"""Collection of useful functions for the HomeKit component."""
|
|
from collections import OrderedDict, namedtuple
|
|
import logging
|
|
|
|
import voluptuous as vol
|
|
|
|
from homeassistant.components import fan, media_player, sensor
|
|
from homeassistant.const import (
|
|
ATTR_CODE, ATTR_SUPPORTED_FEATURES, CONF_NAME, CONF_TYPE, TEMP_CELSIUS)
|
|
from homeassistant.core import split_entity_id
|
|
import homeassistant.helpers.config_validation as cv
|
|
import homeassistant.util.temperature as temp_util
|
|
|
|
from .const import (
|
|
CONF_FEATURE, CONF_FEATURE_LIST, CONF_LINKED_BATTERY_SENSOR,
|
|
CONF_LOW_BATTERY_THRESHOLD, DEFAULT_LOW_BATTERY_THRESHOLD, FEATURE_ON_OFF,
|
|
FEATURE_PLAY_PAUSE, FEATURE_PLAY_STOP, FEATURE_TOGGLE_MUTE,
|
|
HOMEKIT_NOTIFY_ID, TYPE_FAUCET, TYPE_OUTLET, TYPE_SHOWER, TYPE_SPRINKLER,
|
|
TYPE_SWITCH, TYPE_VALVE)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
BASIC_INFO_SCHEMA = vol.Schema({
|
|
vol.Optional(CONF_NAME): cv.string,
|
|
vol.Optional(CONF_LINKED_BATTERY_SENSOR): cv.entity_domain(sensor.DOMAIN),
|
|
vol.Optional(CONF_LOW_BATTERY_THRESHOLD,
|
|
default=DEFAULT_LOW_BATTERY_THRESHOLD): cv.positive_int,
|
|
})
|
|
|
|
FEATURE_SCHEMA = BASIC_INFO_SCHEMA.extend({
|
|
vol.Optional(CONF_FEATURE_LIST, default=None): cv.ensure_list,
|
|
})
|
|
|
|
CODE_SCHEMA = BASIC_INFO_SCHEMA.extend({
|
|
vol.Optional(ATTR_CODE, default=None): vol.Any(None, cv.string),
|
|
})
|
|
|
|
MEDIA_PLAYER_SCHEMA = vol.Schema({
|
|
vol.Required(CONF_FEATURE): vol.All(
|
|
cv.string, vol.In((FEATURE_ON_OFF, FEATURE_PLAY_PAUSE,
|
|
FEATURE_PLAY_STOP, FEATURE_TOGGLE_MUTE))),
|
|
})
|
|
|
|
SWITCH_TYPE_SCHEMA = BASIC_INFO_SCHEMA.extend({
|
|
vol.Optional(CONF_TYPE, default=TYPE_SWITCH): vol.All(
|
|
cv.string, vol.In((
|
|
TYPE_FAUCET, TYPE_OUTLET, TYPE_SHOWER, TYPE_SPRINKLER,
|
|
TYPE_SWITCH, TYPE_VALVE))),
|
|
})
|
|
|
|
|
|
def validate_entity_config(values):
|
|
"""Validate config entry for CONF_ENTITY."""
|
|
if not isinstance(values, dict):
|
|
raise vol.Invalid('expected a dictionary')
|
|
|
|
entities = {}
|
|
for entity_id, config in values.items():
|
|
entity = cv.entity_id(entity_id)
|
|
domain, _ = split_entity_id(entity)
|
|
|
|
if not isinstance(config, dict):
|
|
raise vol.Invalid('The configuration for {} must be '
|
|
' a dictionary.'.format(entity))
|
|
|
|
if domain in ('alarm_control_panel', 'lock'):
|
|
config = CODE_SCHEMA(config)
|
|
|
|
elif domain == media_player.const.DOMAIN:
|
|
config = FEATURE_SCHEMA(config)
|
|
feature_list = {}
|
|
for feature in config[CONF_FEATURE_LIST]:
|
|
params = MEDIA_PLAYER_SCHEMA(feature)
|
|
key = params.pop(CONF_FEATURE)
|
|
if key in feature_list:
|
|
raise vol.Invalid('A feature can be added only once for {}'
|
|
.format(entity))
|
|
feature_list[key] = params
|
|
config[CONF_FEATURE_LIST] = feature_list
|
|
|
|
elif domain == 'switch':
|
|
config = SWITCH_TYPE_SCHEMA(config)
|
|
|
|
else:
|
|
config = BASIC_INFO_SCHEMA(config)
|
|
|
|
entities[entity] = config
|
|
return entities
|
|
|
|
|
|
def validate_media_player_features(state, feature_list):
|
|
"""Validate features for media players."""
|
|
features = state.attributes.get(ATTR_SUPPORTED_FEATURES, 0)
|
|
|
|
supported_modes = []
|
|
if features & (media_player.const.SUPPORT_TURN_ON |
|
|
media_player.const.SUPPORT_TURN_OFF):
|
|
supported_modes.append(FEATURE_ON_OFF)
|
|
if features & (media_player.const.SUPPORT_PLAY |
|
|
media_player.const.SUPPORT_PAUSE):
|
|
supported_modes.append(FEATURE_PLAY_PAUSE)
|
|
if features & (media_player.const.SUPPORT_PLAY |
|
|
media_player.const.SUPPORT_STOP):
|
|
supported_modes.append(FEATURE_PLAY_STOP)
|
|
if features & media_player.const.SUPPORT_VOLUME_MUTE:
|
|
supported_modes.append(FEATURE_TOGGLE_MUTE)
|
|
|
|
error_list = []
|
|
for feature in feature_list:
|
|
if feature not in supported_modes:
|
|
error_list.append(feature)
|
|
|
|
if error_list:
|
|
_LOGGER.error('%s does not support features: %s',
|
|
state.entity_id, error_list)
|
|
return False
|
|
return True
|
|
|
|
|
|
SpeedRange = namedtuple('SpeedRange', ('start', 'target'))
|
|
SpeedRange.__doc__ += """ Maps Home Assistant speed \
|
|
values to percentage based HomeKit speeds.
|
|
start: Start of the range (inclusive).
|
|
target: Percentage to use to determine HomeKit percentages \
|
|
from HomeAssistant speed.
|
|
"""
|
|
|
|
|
|
class HomeKitSpeedMapping:
|
|
"""Supports conversion between Home Assistant and HomeKit fan speeds."""
|
|
|
|
def __init__(self, speed_list):
|
|
"""Initialize a new SpeedMapping object."""
|
|
if speed_list[0] != fan.SPEED_OFF:
|
|
_LOGGER.warning("%s does not contain the speed setting "
|
|
"%s as its first element. "
|
|
"Assuming that %s is equivalent to 'off'.",
|
|
speed_list, fan.SPEED_OFF, speed_list[0])
|
|
self.speed_ranges = OrderedDict()
|
|
list_size = len(speed_list)
|
|
for index, speed in enumerate(speed_list):
|
|
# By dividing by list_size -1 the following
|
|
# desired attributes hold true:
|
|
# * index = 0 => 0%, equal to "off"
|
|
# * index = len(speed_list) - 1 => 100 %
|
|
# * all other indices are equally distributed
|
|
target = index * 100 / (list_size - 1)
|
|
start = index * 100 / list_size
|
|
self.speed_ranges[speed] = SpeedRange(start, target)
|
|
|
|
def speed_to_homekit(self, speed):
|
|
"""Map Home Assistant speed state to HomeKit speed."""
|
|
if speed is None:
|
|
return None
|
|
speed_range = self.speed_ranges[speed]
|
|
return speed_range.target
|
|
|
|
def speed_to_states(self, speed):
|
|
"""Map HomeKit speed to Home Assistant speed state."""
|
|
for state, speed_range in reversed(self.speed_ranges.items()):
|
|
if speed_range.start <= speed:
|
|
return state
|
|
return list(self.speed_ranges.keys())[0]
|
|
|
|
|
|
def show_setup_message(hass, pincode):
|
|
"""Display persistent notification with setup information."""
|
|
pin = pincode.decode()
|
|
_LOGGER.info('Pincode: %s', pin)
|
|
message = 'To set up Home Assistant in the Home App, enter the ' \
|
|
'following code:\n### {}'.format(pin)
|
|
hass.components.persistent_notification.create(
|
|
message, 'HomeKit Setup', HOMEKIT_NOTIFY_ID)
|
|
|
|
|
|
def dismiss_setup_message(hass):
|
|
"""Dismiss persistent notification and remove QR code."""
|
|
hass.components.persistent_notification.dismiss(HOMEKIT_NOTIFY_ID)
|
|
|
|
|
|
def convert_to_float(state):
|
|
"""Return float of state, catch errors."""
|
|
try:
|
|
return float(state)
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def temperature_to_homekit(temperature, unit):
|
|
"""Convert temperature to Celsius for HomeKit."""
|
|
return round(temp_util.convert(temperature, unit, TEMP_CELSIUS) * 2) / 2
|
|
|
|
|
|
def temperature_to_states(temperature, unit):
|
|
"""Convert temperature back from Celsius to Home Assistant unit."""
|
|
return round(temp_util.convert(temperature, TEMP_CELSIUS, unit) * 2) / 2
|
|
|
|
|
|
def density_to_air_quality(density):
|
|
"""Map PM2.5 density to HomeKit AirQuality level."""
|
|
if density <= 35:
|
|
return 1
|
|
if density <= 75:
|
|
return 2
|
|
if density <= 115:
|
|
return 3
|
|
if density <= 150:
|
|
return 4
|
|
return 5
|