core/homeassistant/components/risco/config_flow.py

258 lines
8.5 KiB
Python
Raw Normal View History

"""Config flow for Risco integration."""
from __future__ import annotations
from collections.abc import Mapping
import logging
from typing import Any
from pyrisco import CannotConnectError, RiscoCloud, RiscoLocal, UnauthorizedError
import voluptuous as vol
from homeassistant import config_entries, core
2020-08-22 17:15:03 +00:00
from homeassistant.const import (
CONF_HOST,
2020-08-22 17:15:03 +00:00
CONF_PASSWORD,
CONF_PIN,
CONF_PORT,
2020-08-22 17:15:03 +00:00
CONF_SCAN_INTERVAL,
CONF_TYPE,
2020-08-22 17:15:03 +00:00
CONF_USERNAME,
STATE_ALARM_ARMED_AWAY,
STATE_ALARM_ARMED_CUSTOM_BYPASS,
STATE_ALARM_ARMED_HOME,
STATE_ALARM_ARMED_NIGHT,
2020-08-22 17:15:03 +00:00
)
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import (
CONF_CODE_ARM_REQUIRED,
CONF_CODE_DISARM_REQUIRED,
CONF_HA_STATES_TO_RISCO,
CONF_RISCO_STATES_TO_HA,
DEFAULT_OPTIONS,
DOMAIN,
RISCO_STATES,
TYPE_LOCAL,
)
_LOGGER = logging.getLogger(__name__)
CLOUD_SCHEMA = vol.Schema(
{
vol.Required(CONF_USERNAME): str,
vol.Required(CONF_PASSWORD): str,
vol.Required(CONF_PIN): str,
}
)
LOCAL_SCHEMA = vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Required(CONF_PORT, default=1000): int,
vol.Required(CONF_PIN): str,
}
)
HA_STATES = [
STATE_ALARM_ARMED_AWAY,
STATE_ALARM_ARMED_HOME,
STATE_ALARM_ARMED_NIGHT,
STATE_ALARM_ARMED_CUSTOM_BYPASS,
]
async def validate_cloud_input(hass: core.HomeAssistant, data) -> dict[str, str]:
"""Validate the user input allows us to connect to Risco Cloud.
Data has the keys from CLOUD_SCHEMA with values provided by the user.
"""
risco = RiscoCloud(data[CONF_USERNAME], data[CONF_PASSWORD], data[CONF_PIN])
try:
await risco.login(async_get_clientsession(hass))
finally:
await risco.close()
return {"title": risco.site_name}
async def validate_local_input(
hass: core.HomeAssistant, data: Mapping[str, str]
) -> dict[str, str]:
"""Validate the user input allows us to connect to a local panel.
Data has the keys from LOCAL_SCHEMA with values provided by the user.
"""
risco = RiscoLocal(data[CONF_HOST], data[CONF_PORT], data[CONF_PIN])
await risco.connect()
site_id = risco.id
await risco.disconnect()
return {"title": site_id}
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Risco."""
VERSION = 1
def __init__(self) -> None:
"""Init the config flow."""
self._reauth_entry: config_entries.ConfigEntry | None = None
2020-08-22 17:15:03 +00:00
@staticmethod
@core.callback
def async_get_options_flow(
config_entry: config_entries.ConfigEntry,
) -> RiscoOptionsFlowHandler:
2020-08-22 17:15:03 +00:00
"""Define the config flow to handle options."""
return RiscoOptionsFlowHandler(config_entry)
async def async_step_user(self, user_input=None):
"""Handle the initial step."""
return self.async_show_menu(
step_id="user",
menu_options=["cloud", "local"],
)
async def async_step_cloud(self, user_input=None):
"""Configure a cloud based alarm."""
errors = {}
if user_input is not None:
if not self._reauth_entry:
await self.async_set_unique_id(user_input[CONF_USERNAME])
self._abort_if_unique_id_configured()
2020-08-22 13:54:12 +00:00
try:
info = await validate_cloud_input(self.hass, user_input)
except CannotConnectError:
errors["base"] = "cannot_connect"
except UnauthorizedError:
errors["base"] = "invalid_auth"
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
2020-08-22 13:54:12 +00:00
else:
if not self._reauth_entry:
return self.async_create_entry(title=info["title"], data=user_input)
self.hass.config_entries.async_update_entry(
self._reauth_entry,
data=user_input,
unique_id=user_input[CONF_USERNAME],
)
await self.hass.config_entries.async_reload(self._reauth_entry.entry_id)
return self.async_abort(reason="reauth_successful")
return self.async_show_form(
step_id="cloud", data_schema=CLOUD_SCHEMA, errors=errors
)
async def async_step_reauth(self, entry_data: Mapping[str, Any]) -> FlowResult:
"""Handle configuration by re-auth."""
self._reauth_entry = await self.async_set_unique_id(entry_data[CONF_USERNAME])
return await self.async_step_cloud()
async def async_step_local(self, user_input=None):
"""Configure a local based alarm."""
errors = {}
if user_input is not None:
try:
info = await validate_local_input(self.hass, user_input)
except CannotConnectError:
_LOGGER.exception("Cannot connect")
errors["base"] = "cannot_connect"
except UnauthorizedError:
errors["base"] = "invalid_auth"
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
await self.async_set_unique_id(info["title"])
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=info["title"], data={**user_input, **{CONF_TYPE: TYPE_LOCAL}}
)
return self.async_show_form(
step_id="local", data_schema=LOCAL_SCHEMA, errors=errors
)
2020-08-22 17:15:03 +00:00
class RiscoOptionsFlowHandler(config_entries.OptionsFlow):
"""Handle a Risco options flow."""
def __init__(self, config_entry: config_entries.ConfigEntry) -> None:
2020-08-22 17:15:03 +00:00
"""Initialize."""
self.config_entry = config_entry
self._data = {**DEFAULT_OPTIONS, **config_entry.options}
2020-08-22 17:15:03 +00:00
def _options_schema(self):
return vol.Schema(
{
vol.Required(
CONF_SCAN_INTERVAL, default=self._data[CONF_SCAN_INTERVAL]
): int,
vol.Required(
CONF_CODE_ARM_REQUIRED, default=self._data[CONF_CODE_ARM_REQUIRED]
): bool,
vol.Required(
CONF_CODE_DISARM_REQUIRED,
default=self._data[CONF_CODE_DISARM_REQUIRED],
): bool,
}
)
async def async_step_init(self, user_input=None):
"""Manage the options."""
if user_input is not None:
self._data = {**self._data, **user_input}
return await self.async_step_risco_to_ha()
2020-08-22 17:15:03 +00:00
return self.async_show_form(step_id="init", data_schema=self._options_schema())
async def async_step_risco_to_ha(self, user_input=None):
"""Map Risco states to HA states."""
if user_input is not None:
self._data[CONF_RISCO_STATES_TO_HA] = user_input
return await self.async_step_ha_to_risco()
risco_to_ha = self._data[CONF_RISCO_STATES_TO_HA]
options = vol.Schema(
{
vol.Required(risco_state, default=risco_to_ha[risco_state]): vol.In(
HA_STATES
)
for risco_state in RISCO_STATES
}
)
return self.async_show_form(step_id="risco_to_ha", data_schema=options)
async def async_step_ha_to_risco(self, user_input=None):
"""Map HA states to Risco states."""
if user_input is not None:
self._data[CONF_HA_STATES_TO_RISCO] = user_input
return self.async_create_entry(title="", data=self._data)
options = {}
risco_to_ha = self._data[CONF_RISCO_STATES_TO_HA]
# we iterate over HA_STATES, instead of set(self._risco_to_ha.values())
# to ensure a consistent order
for ha_state in HA_STATES:
if ha_state not in risco_to_ha.values():
continue
values = [
risco_state
for risco_state in RISCO_STATES
if risco_to_ha[risco_state] == ha_state
]
current = self._data[CONF_HA_STATES_TO_RISCO].get(ha_state)
if current not in values:
current = values[0]
options[vol.Required(ha_state, default=current)] = vol.In(values)
return self.async_show_form(
step_id="ha_to_risco", data_schema=vol.Schema(options)
)