ElkM1 integration, add strict types to config_flow (#70323)

pull/70334/head
Glenn Waters 2022-04-20 13:46:36 -04:00 committed by GitHub
parent b0ed42a5a5
commit 2a99084911
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 38 additions and 22 deletions

View File

@ -97,6 +97,7 @@ homeassistant.components.elgato.*
homeassistant.components.elkm1.__init__ homeassistant.components.elkm1.__init__
homeassistant.components.elkm1.alarm_control_panel homeassistant.components.elkm1.alarm_control_panel
homeassistant.components.elkm1.climate homeassistant.components.elkm1.climate
homeassistant.components.elkm1.config_flow
homeassistant.components.elkm1.discovery homeassistant.components.elkm1.discovery
homeassistant.components.elkm1.light homeassistant.components.elkm1.light
homeassistant.components.elkm1.scene homeassistant.components.elkm1.scene

View File

@ -6,8 +6,8 @@ import logging
from typing import Any from typing import Any
from urllib.parse import urlparse from urllib.parse import urlparse
import elkm1_lib as elkm1
from elkm1_lib.discovery import ElkSystem from elkm1_lib.discovery import ElkSystem
from elkm1_lib.elk import Elk
import voluptuous as vol import voluptuous as vol
from homeassistant import config_entries, exceptions from homeassistant import config_entries, exceptions
@ -84,7 +84,7 @@ async def validate_input(data: dict[str, str], mac: str | None) -> dict[str, str
if requires_password and (not userid or not password): if requires_password and (not userid or not password):
raise InvalidAuth raise InvalidAuth
elk = elkm1.Elk( elk = Elk(
{"url": url, "userid": userid, "password": password, "element_list": ["panel"]} {"url": url, "userid": userid, "password": password, "element_list": ["panel"]}
) )
elk.connect() elk.connect()
@ -105,14 +105,14 @@ async def validate_input(data: dict[str, str], mac: str | None) -> dict[str, str
return {"title": device_name, CONF_HOST: url, CONF_PREFIX: slugify(prefix)} return {"title": device_name, CONF_HOST: url, CONF_PREFIX: slugify(prefix)}
def _address_from_discovery(device: ElkSystem): def _address_from_discovery(device: ElkSystem) -> str:
"""Append the port only if its non-standard.""" """Append the port only if its non-standard."""
if device.port in STANDARD_PORTS: if device.port in STANDARD_PORTS:
return device.ip_address return device.ip_address
return f"{device.ip_address}:{device.port}" return f"{device.ip_address}:{device.port}"
def _make_url_from_data(data): def _make_url_from_data(data: dict[str, str]) -> str:
if host := data.get(CONF_HOST): if host := data.get(CONF_HOST):
return host return host
@ -133,7 +133,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1 VERSION = 1
def __init__(self): def __init__(self) -> None:
"""Initialize the elkm1 config flow.""" """Initialize the elkm1 config flow."""
self._discovered_device: ElkSystem | None = None self._discovered_device: ElkSystem | None = None
self._discovered_devices: dict[str, ElkSystem] = {} self._discovered_devices: dict[str, ElkSystem] = {}
@ -267,9 +267,11 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
}, },
) )
async def async_step_discovered_connection(self, user_input=None): async def async_step_discovered_connection(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle connecting the device when we have a discovery.""" """Handle connecting the device when we have a discovery."""
errors = {} errors: dict[str, str] | None = {}
device = self._discovered_device device = self._discovered_device
assert device is not None assert device is not None
if user_input is not None: if user_input is not None:
@ -279,7 +281,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
else: else:
user_input[CONF_PREFIX] = "" user_input[CONF_PREFIX] = ""
errors, result = await self._async_create_or_error(user_input, False) errors, result = await self._async_create_or_error(user_input, False)
if not errors: if result is not None:
return result return result
default_proto = PORT_PROTOCOL_MAP.get(device.port, DEFAULT_SECURE_PROTOCOL) default_proto = PORT_PROTOCOL_MAP.get(device.port, DEFAULT_SECURE_PROTOCOL)
@ -297,9 +299,11 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
description_placeholders=_placeholders_from_device(device), description_placeholders=_placeholders_from_device(device),
) )
async def async_step_manual_connection(self, user_input=None): async def async_step_manual_connection(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle connecting the device when we need manual entry.""" """Handle connecting the device when we need manual entry."""
errors = {} errors: dict[str, str] | None = {}
if user_input is not None: if user_input is not None:
# We might be able to discover the device via directed UDP # We might be able to discover the device via directed UDP
# in case its on another subnet # in case its on another subnet
@ -314,7 +318,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
# 2601 if secure is turned on even though they may want insecure # 2601 if secure is turned on even though they may want insecure
user_input[CONF_ADDRESS] = device.ip_address user_input[CONF_ADDRESS] = device.ip_address
errors, result = await self._async_create_or_error(user_input, False) errors, result = await self._async_create_or_error(user_input, False)
if not errors: if result is not None:
return result return result
return self.async_show_form( return self.async_show_form(
@ -332,7 +336,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
errors=errors, errors=errors,
) )
async def async_step_import(self, user_input): async def async_step_import(self, user_input: dict[str, Any]) -> FlowResult:
"""Handle import.""" """Handle import."""
_LOGGER.debug("Elk is importing from yaml") _LOGGER.debug("Elk is importing from yaml")
url = _make_url_from_data(user_input) url = _make_url_from_data(user_input)
@ -344,8 +348,10 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
_LOGGER.debug( _LOGGER.debug(
"Importing is trying to fill unique id from discovery for %s", host "Importing is trying to fill unique id from discovery for %s", host
) )
if is_ip_address(host) and ( if (
device := await async_discover_device(self.hass, host) host
and is_ip_address(host)
and (device := await async_discover_device(self.hass, host))
): ):
await self.async_set_unique_id( await self.async_set_unique_id(
dr.format_mac(device.mac_address), raise_on_progress=False dr.format_mac(device.mac_address), raise_on_progress=False
@ -355,9 +361,10 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
errors, result = await self._async_create_or_error(user_input, True) errors, result = await self._async_create_or_error(user_input, True)
if errors: if errors:
return self.async_abort(reason=list(errors.values())[0]) return self.async_abort(reason=list(errors.values())[0])
assert result is not None
return result return result
def _url_already_configured(self, url): def _url_already_configured(self, url: str) -> bool:
"""See if we already have a elkm1 matching user input configured.""" """See if we already have a elkm1 matching user input configured."""
existing_hosts = { existing_hosts = {
urlparse(entry.data[CONF_HOST]).hostname urlparse(entry.data[CONF_HOST]).hostname

View File

@ -869,6 +869,17 @@ no_implicit_optional = true
warn_return_any = true warn_return_any = true
warn_unreachable = true warn_unreachable = true
[mypy-homeassistant.components.elkm1.config_flow]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.elkm1.discovery] [mypy-homeassistant.components.elkm1.discovery]
check_untyped_defs = true check_untyped_defs = true
disallow_incomplete_defs = true disallow_incomplete_defs = true

View File

@ -50,11 +50,8 @@ def _patch_elk(elk=None):
@contextmanager @contextmanager
def _patcher(): def _patcher():
with patch( with patch("homeassistant.components.elkm1.config_flow.Elk", new=_elk,), patch(
"homeassistant.components.elkm1.config_flow.elkm1.Elk", "homeassistant.components.elkm1.config_flow.Elk",
new=_elk,
), patch(
"homeassistant.components.elkm1.config_flow.elkm1.Elk",
new=_elk, new=_elk,
): ):
yield yield

View File

@ -650,7 +650,7 @@ async def test_form_invalid_auth(hass):
mocked_elk = mock_elk(invalid_auth=True, sync_complete=True) mocked_elk = mock_elk(invalid_auth=True, sync_complete=True)
with patch( with patch(
"homeassistant.components.elkm1.config_flow.elkm1.Elk", "homeassistant.components.elkm1.config_flow.Elk",
return_value=mocked_elk, return_value=mocked_elk,
): ):
result2 = await hass.config_entries.flow.async_configure( result2 = await hass.config_entries.flow.async_configure(
@ -677,7 +677,7 @@ async def test_form_invalid_auth_no_password(hass):
mocked_elk = mock_elk(invalid_auth=True, sync_complete=True) mocked_elk = mock_elk(invalid_auth=True, sync_complete=True)
with patch( with patch(
"homeassistant.components.elkm1.config_flow.elkm1.Elk", "homeassistant.components.elkm1.config_flow.Elk",
return_value=mocked_elk, return_value=mocked_elk,
): ):
result2 = await hass.config_entries.flow.async_configure( result2 = await hass.config_entries.flow.async_configure(