core/homeassistant/components/xiaomi_aqara/config_flow.py

228 lines
7.7 KiB
Python

"""Config flow to configure Xiaomi Aqara."""
import logging
from socket import gaierror
import voluptuous as vol
from xiaomi_gateway import MULTICAST_PORT, XiaomiGateway, XiaomiGatewayDiscovery
from homeassistant import config_entries
from homeassistant.const import CONF_HOST, CONF_MAC, CONF_NAME, CONF_PORT
from homeassistant.core import callback
from homeassistant.helpers.device_registry import format_mac
# pylint: disable=unused-import
from .const import (
CONF_INTERFACE,
CONF_KEY,
CONF_PROTOCOL,
CONF_SID,
DEFAULT_DISCOVERY_RETRY,
DOMAIN,
ZEROCONF_GATEWAY,
)
_LOGGER = logging.getLogger(__name__)
DEFAULT_GATEWAY_NAME = "Xiaomi Aqara Gateway"
DEFAULT_INTERFACE = "any"
GATEWAY_CONFIG = vol.Schema(
{vol.Optional(CONF_INTERFACE, default=DEFAULT_INTERFACE): str}
)
CONFIG_HOST = {
vol.Optional(CONF_HOST): str,
vol.Optional(CONF_MAC): str,
}
GATEWAY_CONFIG_HOST = GATEWAY_CONFIG.extend(CONFIG_HOST)
GATEWAY_SETTINGS = vol.Schema(
{
vol.Optional(CONF_KEY): vol.All(str, vol.Length(min=16, max=16)),
vol.Optional(CONF_NAME, default=DEFAULT_GATEWAY_NAME): str,
}
)
class XiaomiAqaraFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a Xiaomi Aqara config flow."""
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_PUSH
def __init__(self):
"""Initialize."""
self.host = None
self.interface = DEFAULT_INTERFACE
self.sid = None
self.gateways = None
self.selected_gateway = None
@callback
def async_show_form_step_user(self, errors):
"""Show the form belonging to the user step."""
schema = GATEWAY_CONFIG
if (self.host is None and self.sid is None) or errors:
schema = GATEWAY_CONFIG_HOST
return self.async_show_form(step_id="user", data_schema=schema, errors=errors)
async def async_step_user(self, user_input=None):
"""Handle a flow initialized by the user."""
errors = {}
if user_input is None:
return self.async_show_form_step_user(errors)
self.interface = user_input[CONF_INTERFACE]
# allow optional manual setting of host and mac
if self.host is None and self.sid is None:
self.host = user_input.get(CONF_HOST)
mac_address = user_input.get(CONF_MAC)
# format sid from mac_address
if mac_address is not None:
self.sid = format_mac(mac_address).replace(":", "")
# if host is already known by zeroconf discovery or manual optional settings
if self.host is not None and self.sid is not None:
# Connect to Xiaomi Aqara Gateway
self.selected_gateway = await self.hass.async_add_executor_job(
XiaomiGateway,
self.host,
self.sid,
None,
DEFAULT_DISCOVERY_RETRY,
self.interface,
MULTICAST_PORT,
None,
)
if self.selected_gateway.connection_error:
errors[CONF_HOST] = "invalid_host"
if self.selected_gateway.mac_error:
errors[CONF_MAC] = "invalid_mac"
if errors:
return self.async_show_form_step_user(errors)
return await self.async_step_settings()
# Discover Xiaomi Aqara Gateways in the netwerk to get required SIDs.
xiaomi = XiaomiGatewayDiscovery(self.hass.add_job, [], self.interface)
try:
await self.hass.async_add_executor_job(xiaomi.discover_gateways)
except gaierror:
errors[CONF_INTERFACE] = "invalid_interface"
return self.async_show_form_step_user(errors)
self.gateways = xiaomi.gateways
if len(self.gateways) == 1:
self.selected_gateway = list(self.gateways.values())[0]
self.sid = self.selected_gateway.sid
return await self.async_step_settings()
if len(self.gateways) > 1:
return await self.async_step_select()
errors["base"] = "discovery_error"
return self.async_show_form_step_user(errors)
async def async_step_select(self, user_input=None):
"""Handle multiple aqara gateways found."""
errors = {}
if user_input is not None:
ip_adress = user_input["select_ip"]
self.selected_gateway = self.gateways[ip_adress]
self.sid = self.selected_gateway.sid
return await self.async_step_settings()
select_schema = vol.Schema(
{
vol.Required("select_ip"): vol.In(
[gateway.ip_adress for gateway in self.gateways.values()]
)
}
)
return self.async_show_form(
step_id="select", data_schema=select_schema, errors=errors
)
async def async_step_zeroconf(self, discovery_info):
"""Handle zeroconf discovery."""
name = discovery_info.get("name")
self.host = discovery_info.get("host")
mac_address = discovery_info.get("properties", {}).get("mac")
if not name or not self.host or not mac_address:
return self.async_abort(reason="not_xiaomi_aqara")
# Check if the discovered device is an xiaomi aqara gateway.
if not name.startswith(ZEROCONF_GATEWAY):
_LOGGER.debug(
"Xiaomi device '%s' discovered with host %s, not identified as xiaomi aqara gateway",
name,
self.host,
)
return self.async_abort(reason="not_xiaomi_aqara")
# format mac (include semicolns and make lowercase)
mac_address = format_mac(mac_address)
# format sid from mac_address
self.sid = mac_address.replace(":", "")
unique_id = mac_address
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_configured({CONF_HOST: self.host})
# pylint: disable=no-member # https://github.com/PyCQA/pylint/issues/3167
self.context.update({"title_placeholders": {"name": self.host}})
return await self.async_step_user()
async def async_step_settings(self, user_input=None):
"""Specify settings and connect aqara gateway."""
errors = {}
if user_input is not None:
# get all required data
name = user_input[CONF_NAME]
key = user_input.get(CONF_KEY)
ip_adress = self.selected_gateway.ip_adress
port = self.selected_gateway.port
protocol = self.selected_gateway.proto
if key is not None:
# validate key by issuing stop ringtone playback command.
self.selected_gateway.key = key
valid_key = self.selected_gateway.write_to_hub(self.sid, mid=10000)
else:
valid_key = True
if valid_key:
# format_mac, for a gateway the sid equels the mac address
mac_address = format_mac(self.sid)
# set unique_id
unique_id = mac_address
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=name,
data={
CONF_HOST: ip_adress,
CONF_PORT: port,
CONF_MAC: mac_address,
CONF_INTERFACE: self.interface,
CONF_PROTOCOL: protocol,
CONF_KEY: key,
CONF_SID: self.sid,
},
)
errors[CONF_KEY] = "invalid_key"
return self.async_show_form(
step_id="settings", data_schema=GATEWAY_SETTINGS, errors=errors
)