Improve config flow type hints (part 2) (#124344)

pull/124582/head
epenet 2024-08-25 18:36:44 +02:00 committed by GitHub
parent 58b7711bdd
commit ebc49d938a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 79 additions and 53 deletions

View File

@ -75,11 +75,10 @@ class AuroraABBConfigFlow(ConfigFlow, domain=DOMAIN):
VERSION = 1
def __init__(self):
def __init__(self) -> None:
"""Initialise the config flow."""
self.config = None
self._com_ports_list: list[str] | None = None
self._default_com_port = None
self._default_com_port: str | None = None
async def async_step_user(
self, user_input: dict[str, Any] | None = None

View File

@ -22,11 +22,11 @@ class AussieBroadbandConfigFlow(ConfigFlow, domain=DOMAIN):
VERSION = 1
def __init__(self):
def __init__(self) -> None:
"""Initialize the config flow."""
self.data: dict = {}
self.options: dict = {CONF_SERVICES: []}
self.services: list[dict[str]] = []
self.services: list[dict[str, Any]] = []
self.client: AussieBB | None = None
self._reauth_username: str | None = None

View File

@ -35,15 +35,11 @@ from .const import (
_LOGGER = logging.getLogger(__name__)
def host_port(data):
"""Return a list with host and port."""
return (data[CONF_HOST], data[CONF_PORT])
def create_schema(previous_input=None):
"""Create a schema with given values as default."""
if previous_input is not None:
host, port = host_port(previous_input)
host = previous_input[CONF_HOST]
port = previous_input[CONF_PORT]
else:
host = DEFAULT_HOST
port = DEFAULT_PORT
@ -70,9 +66,9 @@ class BleBoxConfigFlow(ConfigFlow, domain=DOMAIN):
VERSION = 1
def __init__(self):
def __init__(self) -> None:
"""Initialize the BleBox config flow."""
self.device_config = {}
self.device_config: dict[str, Any] = {}
def handle_step_exception(
self, step, exception, schema, host, port, message_id, log_fn
@ -146,7 +142,9 @@ class BleBoxConfigFlow(ConfigFlow, domain=DOMAIN):
},
)
async def async_step_user(self, user_input=None):
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle initial user-triggered config step."""
hass = self.hass
schema = create_schema(user_input)
@ -159,14 +157,14 @@ class BleBoxConfigFlow(ConfigFlow, domain=DOMAIN):
description_placeholders={},
)
addr = host_port(user_input)
host = user_input[CONF_HOST]
port = user_input[CONF_PORT]
username = user_input.get(CONF_USERNAME)
password = user_input.get(CONF_PASSWORD)
for entry in self._async_current_entries():
if addr == host_port(entry.data):
host, port = addr
if host == entry.data[CONF_HOST] and port == entry.data[CONF_PORT]:
return self.async_abort(
reason=ADDRESS_ALREADY_CONFIGURED,
description_placeholders={"address": f"{host}:{port}"},
@ -174,27 +172,35 @@ class BleBoxConfigFlow(ConfigFlow, domain=DOMAIN):
websession = get_maybe_authenticated_session(hass, password, username)
api_host = ApiHost(*addr, DEFAULT_SETUP_TIMEOUT, websession, hass.loop, _LOGGER)
api_host = ApiHost(
host, port, DEFAULT_SETUP_TIMEOUT, websession, hass.loop, _LOGGER
)
try:
product = await Box.async_from_host(api_host)
except UnsupportedBoxVersion as ex:
return self.handle_step_exception(
"user", ex, schema, *addr, UNSUPPORTED_VERSION, _LOGGER.debug
"user",
ex,
schema,
host,
port,
UNSUPPORTED_VERSION,
_LOGGER.debug,
)
except UnauthorizedRequest as ex:
return self.handle_step_exception(
"user", ex, schema, *addr, CANNOT_CONNECT, _LOGGER.error
"user", ex, schema, host, port, CANNOT_CONNECT, _LOGGER.error
)
except Error as ex:
return self.handle_step_exception(
"user", ex, schema, *addr, CANNOT_CONNECT, _LOGGER.warning
"user", ex, schema, host, port, CANNOT_CONNECT, _LOGGER.warning
)
except RuntimeError as ex:
return self.handle_step_exception(
"user", ex, schema, *addr, UNKNOWN, _LOGGER.error
"user", ex, schema, host, port, UNKNOWN, _LOGGER.error
)
# Check if configured but IP changed since

View File

@ -5,7 +5,7 @@ import errno
from functools import partial
import logging
import socket
from typing import Any
from typing import TYPE_CHECKING, Any
import broadlink as blk
from broadlink.exceptions import (
@ -39,9 +39,11 @@ class BroadlinkFlowHandler(ConfigFlow, domain=DOMAIN):
def __init__(self) -> None:
"""Initialize the Broadlink flow."""
self.device = None
self.device: blk.Device | None = None
async def async_set_device(self, device, raise_on_progress=True):
async def async_set_device(
self, device: blk.Device, raise_on_progress: bool = True
) -> None:
"""Define a device for the config flow."""
if device.type not in DEVICE_TYPES:
_LOGGER.error(
@ -90,7 +92,9 @@ class BroadlinkFlowHandler(ConfigFlow, domain=DOMAIN):
await self.async_set_device(device)
return await self.async_step_auth()
async def async_step_user(self, user_input=None):
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initiated by the user."""
errors = {}
@ -127,6 +131,8 @@ class BroadlinkFlowHandler(ConfigFlow, domain=DOMAIN):
)
return await self.async_step_auth()
if TYPE_CHECKING:
assert self.device
if device.mac == self.device.mac:
await self.async_set_device(device, raise_on_progress=False)
return await self.async_step_auth()

View File

@ -29,11 +29,11 @@ class FlowHandler(ConfigFlow, domain=DOMAIN):
VERSION = 1
def __init__(self):
def __init__(self) -> None:
"""Initialize flow."""
self._ignore_cec = set()
self._known_hosts = set()
self._wanted_uuid = set()
self._ignore_cec = set[str]()
self._known_hosts = set[str]()
self._wanted_uuid = set[str]()
@staticmethod
@callback
@ -43,7 +43,9 @@ class FlowHandler(ConfigFlow, domain=DOMAIN):
"""Get the options flow for this handler."""
return CastOptionsFlowHandler(config_entry)
async def async_step_user(self, user_input=None):
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initialized by the user."""
if self._async_current_entries():
return self.async_abort(reason="single_instance_allowed")

View File

@ -40,9 +40,9 @@ class DirecTVConfigFlow(ConfigFlow, domain=DOMAIN):
VERSION = 1
def __init__(self):
def __init__(self) -> None:
"""Set up the instance."""
self.discovery_info = {}
self.discovery_info: dict[str, Any] = {}
async def async_step_user(
self, user_input: dict[str, Any] | None = None

View File

@ -1,9 +1,11 @@
"""Config flow to configure flood monitoring gauges."""
from typing import Any
from aioeafm import get_stations
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN
@ -14,21 +16,23 @@ class UKFloodsFlowHandler(ConfigFlow, domain=DOMAIN):
VERSION = 1
def __init__(self):
def __init__(self) -> None:
"""Handle a UK Floods config flow."""
self.stations = {}
self.stations: dict[str, str] = {}
async def async_step_user(self, user_input=None):
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow start."""
errors = {}
errors: dict[str, str] = {}
if user_input is not None:
station = self.stations[user_input["station"]]
await self.async_set_unique_id(station, raise_on_progress=False)
selected_station = self.stations[user_input["station"]]
await self.async_set_unique_id(selected_station, raise_on_progress=False)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=user_input["station"],
data={"station": station},
data={"station": selected_station},
)
session = async_get_clientsession(hass=self.hass)

View File

@ -1,5 +1,7 @@
"""Config flow to configure ecobee."""
from typing import Any
from pyecobee import (
ECOBEE_API_KEY,
ECOBEE_CONFIG_FILENAME,
@ -8,7 +10,7 @@ from pyecobee import (
)
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_API_KEY
from homeassistant.exceptions import HomeAssistantError
from homeassistant.util.json import load_json_object
@ -23,9 +25,11 @@ class EcobeeFlowHandler(ConfigFlow, domain=DOMAIN):
def __init__(self) -> None:
"""Initialize the ecobee flow."""
self._ecobee = None
self._ecobee: Ecobee | None = None
async def async_step_user(self, user_input=None):
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initiated by the user."""
if self._async_current_entries():
# Config entry already exists, only one allowed.

View File

@ -1,6 +1,7 @@
"""Config flow for SiteSage Emonitor integration."""
import logging
from typing import Any
from aioemonitor import Emonitor
import aiohttp
@ -33,12 +34,14 @@ class EmonitorConfigFlow(ConfigFlow, domain=DOMAIN):
VERSION = 1
def __init__(self):
def __init__(self) -> None:
"""Initialize Emonitor ConfigFlow."""
self.discovered_ip = None
self.discovered_info = None
self.discovered_ip: str | None = None
self.discovered_info: dict[str, str] | None = None
async def async_step_user(self, user_input=None):
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
errors = {}
if user_input is not None:

View File

@ -27,18 +27,20 @@ class FireServiceRotaFlowHandler(ConfigFlow, domain=DOMAIN):
VERSION = 1
def __init__(self):
def __init__(self) -> None:
"""Initialize config flow."""
self.api = None
self._base_url = None
self._username = None
self._password = None
self._existing_entry = None
self._description_placeholders = None
self._existing_entry: dict[str, Any] | None = None
self._description_placeholders: dict[str, str] | None = None
async def async_step_user(self, user_input=None):
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initiated by the user."""
errors = {}
errors: dict[str, str] = {}
if user_input is None:
return self._show_setup_form(user_input, errors)