core/homeassistant/components/sma/config_flow.py

253 lines
8.8 KiB
Python

"""Config flow for the sma integration."""
from __future__ import annotations
from collections.abc import Mapping
import logging
from typing import Any
import pysma
import voluptuous as vol
from yarl import URL
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import (
CONF_HOST,
CONF_MAC,
CONF_NAME,
CONF_PASSWORD,
CONF_SSL,
CONF_VERIFY_SSL,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.service_info.dhcp import DhcpServiceInfo
from .const import CONF_GROUP, DOMAIN, GROUPS
_LOGGER = logging.getLogger(__name__)
async def validate_input(
hass: HomeAssistant,
user_input: dict[str, Any],
data: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Validate the user input allows us to connect."""
session = async_get_clientsession(hass, verify_ssl=user_input[CONF_VERIFY_SSL])
protocol = "https" if user_input[CONF_SSL] else "http"
host = data[CONF_HOST] if data is not None else user_input[CONF_HOST]
url = URL.build(scheme=protocol, host=host)
sma = pysma.SMA(
session, str(url), user_input[CONF_PASSWORD], group=user_input[CONF_GROUP]
)
# new_session raises SmaAuthenticationException on failure
await sma.new_session()
device_info = await sma.device_info()
await sma.close_session()
return device_info
class SmaConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for SMA."""
VERSION = 1
MINOR_VERSION = 2
def __init__(self) -> None:
"""Initialize."""
self._data: dict[str, Any] = {
CONF_HOST: vol.UNDEFINED,
CONF_SSL: False,
CONF_VERIFY_SSL: True,
CONF_GROUP: GROUPS[0],
CONF_PASSWORD: vol.UNDEFINED,
}
self._discovery_data: dict[str, Any] = {}
async def _handle_user_input(
self, user_input: dict[str, Any], discovery: bool = False
) -> tuple[dict[str, str], dict[str, str]]:
"""Handle the user input."""
errors: dict[str, str] = {}
device_info: dict[str, str] = {}
if not discovery:
self._data[CONF_HOST] = user_input[CONF_HOST]
self._data[CONF_SSL] = user_input[CONF_SSL]
self._data[CONF_VERIFY_SSL] = user_input[CONF_VERIFY_SSL]
self._data[CONF_GROUP] = user_input[CONF_GROUP]
self._data[CONF_PASSWORD] = user_input[CONF_PASSWORD]
try:
device_info = await validate_input(
self.hass, user_input=user_input, data=self._data
)
except pysma.exceptions.SmaConnectionException:
errors["base"] = "cannot_connect"
except pysma.exceptions.SmaAuthenticationException:
errors["base"] = "invalid_auth"
except pysma.exceptions.SmaReadException:
errors["base"] = "cannot_retrieve_device_info"
except Exception:
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
return errors, device_info
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""First step in config flow."""
errors: dict[str, str] = {}
if user_input is not None:
errors, device_info = await self._handle_user_input(user_input=user_input)
if not errors:
await self.async_set_unique_id(
str(device_info["serial"]), raise_on_progress=False
)
self._abort_if_unique_id_configured(updates=self._data)
return self.async_create_entry(
title=self._data[CONF_HOST], data=self._data
)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_HOST, default=self._data[CONF_HOST]): cv.string,
vol.Optional(CONF_SSL, default=self._data[CONF_SSL]): cv.boolean,
vol.Optional(
CONF_VERIFY_SSL, default=self._data[CONF_VERIFY_SSL]
): cv.boolean,
vol.Optional(CONF_GROUP, default=self._data[CONF_GROUP]): vol.In(
GROUPS
),
vol.Required(CONF_PASSWORD): cv.string,
}
),
errors=errors,
)
async def async_step_reauth(
self, entry_data: Mapping[str, Any]
) -> ConfigFlowResult:
"""Handle reauth on credential failure."""
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Prepare reauth."""
errors: dict[str, str] = {}
if user_input is not None:
reauth_entry = self._get_reauth_entry()
errors, device_info = await self._handle_user_input(
user_input={
**reauth_entry.data,
CONF_PASSWORD: user_input[CONF_PASSWORD],
}
)
if not errors:
return self.async_update_reload_and_abort(
reauth_entry,
data_updates={CONF_PASSWORD: user_input[CONF_PASSWORD]},
)
return self.async_show_form(
step_id="reauth_confirm",
data_schema=vol.Schema(
{
vol.Required(CONF_PASSWORD): cv.string,
}
),
errors=errors,
)
async def async_step_dhcp(
self, discovery_info: DhcpServiceInfo
) -> ConfigFlowResult:
"""Handle DHCP discovery."""
self._discovery_data[CONF_HOST] = discovery_info.ip
self._discovery_data[CONF_MAC] = format_mac(discovery_info.macaddress)
self._discovery_data[CONF_NAME] = discovery_info.hostname
self._data[CONF_HOST] = discovery_info.ip
self._data[CONF_MAC] = format_mac(self._discovery_data[CONF_MAC])
_LOGGER.debug(
"DHCP discovery detected SMA device: %s, IP: %s, MAC: %s",
self._discovery_data[CONF_NAME],
self._discovery_data[CONF_HOST],
self._discovery_data[CONF_MAC],
)
existing_entries_with_host = [
entry
for entry in self._async_current_entries(include_ignore=False)
if entry.data.get(CONF_HOST) == self._data[CONF_HOST]
and not entry.data.get(CONF_MAC)
]
# If we have an existing entry with the same host but no MAC address,
# we update the entry with the MAC address and reload it.
if existing_entries_with_host:
entry = existing_entries_with_host[0]
self.async_update_reload_and_abort(
entry, data_updates={CONF_MAC: self._data[CONF_MAC]}
)
# Finally, check if the hostname (which represents the SMA serial number) is unique
serial_number = discovery_info.hostname.lower()
# Example hostname: sma12345678-01
# Remove 'sma' prefix and strip everything after the dash (including the dash)
if serial_number.startswith("sma"):
serial_number = serial_number.removeprefix("sma")
serial_number = serial_number.split("-", 1)[0]
await self.async_set_unique_id(serial_number)
self._abort_if_unique_id_configured()
return await self.async_step_discovery_confirm()
async def async_step_discovery_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm discovery."""
errors: dict[str, str] = {}
if user_input is not None:
errors, device_info = await self._handle_user_input(
user_input=user_input, discovery=True
)
if not errors:
return self.async_create_entry(
title=self._data[CONF_HOST], data=self._data
)
return self.async_show_form(
step_id="discovery_confirm",
data_schema=vol.Schema(
{
vol.Optional(CONF_SSL, default=self._data[CONF_SSL]): cv.boolean,
vol.Optional(
CONF_VERIFY_SSL, default=self._data[CONF_VERIFY_SSL]
): cv.boolean,
vol.Optional(CONF_GROUP, default=self._data[CONF_GROUP]): vol.In(
GROUPS
),
vol.Required(CONF_PASSWORD): cv.string,
}
),
description_placeholders={CONF_HOST: self._data[CONF_HOST]},
errors=errors,
)