core/homeassistant/components/homekit/util.py

358 lines
11 KiB
Python

"""Collection of useful functions for the HomeKit component."""
from collections import OrderedDict, namedtuple
import io
import logging
import os
import secrets
import socket
import pyqrcode
import voluptuous as vol
from homeassistant.components import fan, media_player, sensor
from homeassistant.const import (
ATTR_CODE,
ATTR_SUPPORTED_FEATURES,
CONF_NAME,
CONF_TYPE,
TEMP_CELSIUS,
)
from homeassistant.core import HomeAssistant, split_entity_id
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.storage import STORAGE_DIR
import homeassistant.util.temperature as temp_util
from .const import (
CONF_FEATURE,
CONF_FEATURE_LIST,
CONF_LINKED_BATTERY_SENSOR,
CONF_LOW_BATTERY_THRESHOLD,
DEFAULT_LOW_BATTERY_THRESHOLD,
DOMAIN,
FEATURE_ON_OFF,
FEATURE_PLAY_PAUSE,
FEATURE_PLAY_STOP,
FEATURE_TOGGLE_MUTE,
HOMEKIT_FILE,
HOMEKIT_PAIRING_QR,
HOMEKIT_PAIRING_QR_SECRET,
TYPE_FAUCET,
TYPE_OUTLET,
TYPE_SHOWER,
TYPE_SPRINKLER,
TYPE_SWITCH,
TYPE_VALVE,
)
_LOGGER = logging.getLogger(__name__)
MAX_PORT = 65535
BASIC_INFO_SCHEMA = vol.Schema(
{
vol.Optional(CONF_NAME): cv.string,
vol.Optional(CONF_LINKED_BATTERY_SENSOR): cv.entity_domain(sensor.DOMAIN),
vol.Optional(
CONF_LOW_BATTERY_THRESHOLD, default=DEFAULT_LOW_BATTERY_THRESHOLD
): cv.positive_int,
}
)
FEATURE_SCHEMA = BASIC_INFO_SCHEMA.extend(
{vol.Optional(CONF_FEATURE_LIST, default=None): cv.ensure_list}
)
CODE_SCHEMA = BASIC_INFO_SCHEMA.extend(
{vol.Optional(ATTR_CODE, default=None): vol.Any(None, cv.string)}
)
MEDIA_PLAYER_SCHEMA = vol.Schema(
{
vol.Required(CONF_FEATURE): vol.All(
cv.string,
vol.In(
(
FEATURE_ON_OFF,
FEATURE_PLAY_PAUSE,
FEATURE_PLAY_STOP,
FEATURE_TOGGLE_MUTE,
)
),
)
}
)
SWITCH_TYPE_SCHEMA = BASIC_INFO_SCHEMA.extend(
{
vol.Optional(CONF_TYPE, default=TYPE_SWITCH): vol.All(
cv.string,
vol.In(
(
TYPE_FAUCET,
TYPE_OUTLET,
TYPE_SHOWER,
TYPE_SPRINKLER,
TYPE_SWITCH,
TYPE_VALVE,
)
),
)
}
)
def validate_entity_config(values):
"""Validate config entry for CONF_ENTITY."""
if not isinstance(values, dict):
raise vol.Invalid("expected a dictionary")
entities = {}
for entity_id, config in values.items():
entity = cv.entity_id(entity_id)
domain, _ = split_entity_id(entity)
if not isinstance(config, dict):
raise vol.Invalid(f"The configuration for {entity} must be a dictionary.")
if domain in ("alarm_control_panel", "lock"):
config = CODE_SCHEMA(config)
elif domain == media_player.const.DOMAIN:
config = FEATURE_SCHEMA(config)
feature_list = {}
for feature in config[CONF_FEATURE_LIST]:
params = MEDIA_PLAYER_SCHEMA(feature)
key = params.pop(CONF_FEATURE)
if key in feature_list:
raise vol.Invalid(f"A feature can be added only once for {entity}")
feature_list[key] = params
config[CONF_FEATURE_LIST] = feature_list
elif domain == "switch":
config = SWITCH_TYPE_SCHEMA(config)
else:
config = BASIC_INFO_SCHEMA(config)
entities[entity] = config
return entities
def validate_media_player_features(state, feature_list):
"""Validate features for media players."""
features = state.attributes.get(ATTR_SUPPORTED_FEATURES, 0)
supported_modes = []
if features & (
media_player.const.SUPPORT_TURN_ON | media_player.const.SUPPORT_TURN_OFF
):
supported_modes.append(FEATURE_ON_OFF)
if features & (media_player.const.SUPPORT_PLAY | media_player.const.SUPPORT_PAUSE):
supported_modes.append(FEATURE_PLAY_PAUSE)
if features & (media_player.const.SUPPORT_PLAY | media_player.const.SUPPORT_STOP):
supported_modes.append(FEATURE_PLAY_STOP)
if features & media_player.const.SUPPORT_VOLUME_MUTE:
supported_modes.append(FEATURE_TOGGLE_MUTE)
error_list = []
for feature in feature_list:
if feature not in supported_modes:
error_list.append(feature)
if error_list:
_LOGGER.error("%s does not support features: %s", state.entity_id, error_list)
return False
return True
SpeedRange = namedtuple("SpeedRange", ("start", "target"))
SpeedRange.__doc__ += """ Maps Home Assistant speed \
values to percentage based HomeKit speeds.
start: Start of the range (inclusive).
target: Percentage to use to determine HomeKit percentages \
from HomeAssistant speed.
"""
class HomeKitSpeedMapping:
"""Supports conversion between Home Assistant and HomeKit fan speeds."""
def __init__(self, speed_list):
"""Initialize a new SpeedMapping object."""
if speed_list[0] != fan.SPEED_OFF:
_LOGGER.warning(
"%s does not contain the speed setting "
"%s as its first element. "
"Assuming that %s is equivalent to 'off'.",
speed_list,
fan.SPEED_OFF,
speed_list[0],
)
self.speed_ranges = OrderedDict()
list_size = len(speed_list)
for index, speed in enumerate(speed_list):
# By dividing by list_size -1 the following
# desired attributes hold true:
# * index = 0 => 0%, equal to "off"
# * index = len(speed_list) - 1 => 100 %
# * all other indices are equally distributed
target = index * 100 / (list_size - 1)
start = index * 100 / list_size
self.speed_ranges[speed] = SpeedRange(start, target)
def speed_to_homekit(self, speed):
"""Map Home Assistant speed state to HomeKit speed."""
if speed is None:
return None
speed_range = self.speed_ranges[speed]
return round(speed_range.target)
def speed_to_states(self, speed):
"""Map HomeKit speed to Home Assistant speed state."""
for state, speed_range in reversed(self.speed_ranges.items()):
if speed_range.start <= speed:
return state
return list(self.speed_ranges.keys())[0]
def show_setup_message(hass, entry_id, bridge_name, pincode, uri):
"""Display persistent notification with setup information."""
pin = pincode.decode()
_LOGGER.info("Pincode: %s", pin)
buffer = io.BytesIO()
url = pyqrcode.create(uri)
url.svg(buffer, scale=5)
pairing_secret = secrets.token_hex(32)
hass.data[DOMAIN][entry_id][HOMEKIT_PAIRING_QR] = buffer.getvalue()
hass.data[DOMAIN][entry_id][HOMEKIT_PAIRING_QR_SECRET] = pairing_secret
message = (
f"To set up {bridge_name} in the Home App, "
f"scan the QR code or enter the following code:\n"
f"### {pin}\n"
f"![image](/api/homekit/pairingqr?{entry_id}-{pairing_secret})"
)
hass.components.persistent_notification.create(
message, "HomeKit Bridge Setup", entry_id
)
def dismiss_setup_message(hass, entry_id):
"""Dismiss persistent notification and remove QR code."""
hass.components.persistent_notification.dismiss(entry_id)
def convert_to_float(state):
"""Return float of state, catch errors."""
try:
return float(state)
except (ValueError, TypeError):
return None
def temperature_to_homekit(temperature, unit):
"""Convert temperature to Celsius for HomeKit."""
return round(temp_util.convert(temperature, unit, TEMP_CELSIUS), 1)
def temperature_to_states(temperature, unit):
"""Convert temperature back from Celsius to Home Assistant unit."""
return round(temp_util.convert(temperature, TEMP_CELSIUS, unit) * 2) / 2
def density_to_air_quality(density):
"""Map PM2.5 density to HomeKit AirQuality level."""
if density <= 35:
return 1
if density <= 75:
return 2
if density <= 115:
return 3
if density <= 150:
return 4
return 5
def get_persist_filename_for_entry_id(entry_id: str):
"""Determine the filename of the homekit state file."""
return f"{DOMAIN}.{entry_id}.state"
def get_aid_storage_filename_for_entry_id(entry_id: str):
"""Determine the ilename of homekit aid storage file."""
return f"{DOMAIN}.{entry_id}.aids"
def get_persist_fullpath_for_entry_id(hass: HomeAssistant, entry_id: str):
"""Determine the path to the homekit state file."""
return hass.config.path(STORAGE_DIR, get_persist_filename_for_entry_id(entry_id))
def get_aid_storage_fullpath_for_entry_id(hass: HomeAssistant, entry_id: str):
"""Determine the path to the homekit aid storage file."""
return hass.config.path(
STORAGE_DIR, get_aid_storage_filename_for_entry_id(entry_id)
)
def migrate_filesystem_state_data_for_primary_imported_entry_id(
hass: HomeAssistant, entry_id: str
):
"""Migrate the old paths to the storage directory."""
legacy_persist_file_path = hass.config.path(HOMEKIT_FILE)
if os.path.exists(legacy_persist_file_path):
os.rename(
legacy_persist_file_path, get_persist_fullpath_for_entry_id(hass, entry_id)
)
legacy_aid_storage_path = hass.config.path(STORAGE_DIR, "homekit.aids")
if os.path.exists(legacy_aid_storage_path):
os.rename(
legacy_aid_storage_path,
get_aid_storage_fullpath_for_entry_id(hass, entry_id),
)
def remove_state_files_for_entry_id(hass: HomeAssistant, entry_id: str):
"""Remove the state files from disk."""
persist_file_path = get_persist_fullpath_for_entry_id(hass, entry_id)
aid_storage_path = get_aid_storage_fullpath_for_entry_id(hass, entry_id)
os.unlink(persist_file_path)
if os.path.exists(aid_storage_path):
os.unlink(aid_storage_path)
return True
def _get_test_socket():
"""Create a socket to test binding ports."""
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
test_socket.setblocking(False)
test_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return test_socket
def port_is_available(port: int):
"""Check to see if a port is available."""
test_socket = _get_test_socket()
try:
test_socket.bind(("", port))
except OSError:
return False
return True
def find_next_available_port(start_port: int):
"""Find the next available port starting with the given port."""
test_socket = _get_test_socket()
for port in range(start_port, MAX_PORT):
try:
test_socket.bind(("", port))
return port
except OSError:
if port == MAX_PORT:
raise
continue