Add discovery support to elkm1 (#65205)
parent
cf70ad10e8
commit
f943f30492
|
@ -257,7 +257,14 @@ omit =
|
|||
homeassistant/components/egardia/*
|
||||
homeassistant/components/eight_sleep/*
|
||||
homeassistant/components/eliqonline/sensor.py
|
||||
homeassistant/components/elkm1/*
|
||||
homeassistant/components/elkm1/__init__.py
|
||||
homeassistant/components/elkm1/alarm_control_panel.py
|
||||
homeassistant/components/elkm1/climate.py
|
||||
homeassistant/components/elkm1/discovery.py
|
||||
homeassistant/components/elkm1/light.py
|
||||
homeassistant/components/elkm1/scene.py
|
||||
homeassistant/components/elkm1/sensor.py
|
||||
homeassistant/components/elkm1/switch.py
|
||||
homeassistant/components/elmax/__init__.py
|
||||
homeassistant/components/elmax/common.py
|
||||
homeassistant/components/elmax/const.py
|
||||
|
|
|
@ -6,6 +6,7 @@ import logging
|
|||
import re
|
||||
from types import MappingProxyType
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import async_timeout
|
||||
import elkm1_lib as elkm1
|
||||
|
@ -28,15 +29,15 @@ from homeassistant.core import HomeAssistant, ServiceCall, callback
|
|||
from homeassistant.exceptions import ConfigEntryNotReady, HomeAssistantError
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
from homeassistant.helpers.entity import DeviceInfo, Entity
|
||||
from homeassistant.helpers.event import async_track_time_interval
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
import homeassistant.util.dt as dt_util
|
||||
from homeassistant.util.network import is_ip_address
|
||||
|
||||
from .const import (
|
||||
ATTR_KEY,
|
||||
ATTR_KEY_NAME,
|
||||
ATTR_KEYPAD_ID,
|
||||
BARE_TEMP_CELSIUS,
|
||||
BARE_TEMP_FAHRENHEIT,
|
||||
CONF_AREA,
|
||||
CONF_AUTO_CONFIGURE,
|
||||
CONF_COUNTER,
|
||||
|
@ -48,9 +49,18 @@ from .const import (
|
|||
CONF_TASK,
|
||||
CONF_THERMOSTAT,
|
||||
CONF_ZONE,
|
||||
DISCOVER_SCAN_TIMEOUT,
|
||||
DISCOVERY_INTERVAL,
|
||||
DOMAIN,
|
||||
ELK_ELEMENTS,
|
||||
EVENT_ELKM1_KEYPAD_KEY_PRESSED,
|
||||
LOGIN_TIMEOUT,
|
||||
)
|
||||
from .discovery import (
|
||||
async_discover_device,
|
||||
async_discover_devices,
|
||||
async_trigger_discovery,
|
||||
async_update_entry_from_discovery,
|
||||
)
|
||||
|
||||
SYNC_TIMEOUT = 120
|
||||
|
@ -127,28 +137,28 @@ DEVICE_SCHEMA_SUBDOMAIN = vol.Schema(
|
|||
}
|
||||
)
|
||||
|
||||
DEVICE_SCHEMA = vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_HOST): cv.string,
|
||||
vol.Optional(CONF_PREFIX, default=""): vol.All(cv.string, vol.Lower),
|
||||
vol.Optional(CONF_USERNAME, default=""): cv.string,
|
||||
vol.Optional(CONF_PASSWORD, default=""): cv.string,
|
||||
vol.Optional(CONF_AUTO_CONFIGURE, default=False): cv.boolean,
|
||||
# cv.temperature_unit will mutate 'C' -> '°C' and 'F' -> '°F'
|
||||
vol.Optional(
|
||||
CONF_TEMPERATURE_UNIT, default=BARE_TEMP_FAHRENHEIT
|
||||
): cv.temperature_unit,
|
||||
vol.Optional(CONF_AREA, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_COUNTER, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_KEYPAD, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_OUTPUT, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_PLC, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_SETTING, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_TASK, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_THERMOSTAT, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_ZONE, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
},
|
||||
_host_validator,
|
||||
DEVICE_SCHEMA = vol.All(
|
||||
cv.deprecated(CONF_TEMPERATURE_UNIT),
|
||||
vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_HOST): cv.string,
|
||||
vol.Optional(CONF_PREFIX, default=""): vol.All(cv.string, vol.Lower),
|
||||
vol.Optional(CONF_USERNAME, default=""): cv.string,
|
||||
vol.Optional(CONF_PASSWORD, default=""): cv.string,
|
||||
vol.Optional(CONF_AUTO_CONFIGURE, default=False): cv.boolean,
|
||||
vol.Optional(CONF_TEMPERATURE_UNIT, default="F"): cv.temperature_unit,
|
||||
vol.Optional(CONF_AREA, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_COUNTER, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_KEYPAD, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_OUTPUT, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_PLC, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_SETTING, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_TASK, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_THERMOSTAT, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
vol.Optional(CONF_ZONE, default={}): DEVICE_SCHEMA_SUBDOMAIN,
|
||||
},
|
||||
_host_validator,
|
||||
),
|
||||
)
|
||||
|
||||
CONFIG_SCHEMA = vol.Schema(
|
||||
|
@ -162,6 +172,14 @@ async def async_setup(hass: HomeAssistant, hass_config: ConfigType) -> bool:
|
|||
hass.data.setdefault(DOMAIN, {})
|
||||
_create_elk_services(hass)
|
||||
|
||||
async def _async_discovery(*_: Any) -> None:
|
||||
async_trigger_discovery(
|
||||
hass, await async_discover_devices(hass, DISCOVER_SCAN_TIMEOUT)
|
||||
)
|
||||
|
||||
asyncio.create_task(_async_discovery())
|
||||
async_track_time_interval(hass, _async_discovery, DISCOVERY_INTERVAL)
|
||||
|
||||
if DOMAIN not in hass_config:
|
||||
return True
|
||||
|
||||
|
@ -204,13 +222,15 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
"""Set up Elk-M1 Control from a config entry."""
|
||||
conf: MappingProxyType[str, Any] = entry.data
|
||||
|
||||
host = urlparse(entry.data[CONF_HOST]).hostname
|
||||
|
||||
_LOGGER.debug("Setting up elkm1 %s", conf["host"])
|
||||
|
||||
temperature_unit = TEMP_FAHRENHEIT
|
||||
if conf[CONF_TEMPERATURE_UNIT] in (BARE_TEMP_CELSIUS, TEMP_CELSIUS):
|
||||
temperature_unit = TEMP_CELSIUS
|
||||
if not entry.unique_id or ":" not in entry.unique_id and is_ip_address(host):
|
||||
if device := await async_discover_device(hass, host):
|
||||
async_update_entry_from_discovery(hass, entry, device)
|
||||
|
||||
config: dict[str, Any] = {"temperature_unit": temperature_unit}
|
||||
config: dict[str, Any] = {}
|
||||
|
||||
if not conf[CONF_AUTO_CONFIGURE]:
|
||||
# With elkm1-lib==0.7.16 and later auto configure is available
|
||||
|
@ -253,11 +273,16 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
keypad.add_callback(_element_changed)
|
||||
|
||||
try:
|
||||
if not await async_wait_for_elk_to_sync(elk, SYNC_TIMEOUT, conf[CONF_HOST]):
|
||||
if not await async_wait_for_elk_to_sync(
|
||||
elk, LOGIN_TIMEOUT, SYNC_TIMEOUT, conf[CONF_HOST]
|
||||
):
|
||||
return False
|
||||
except asyncio.TimeoutError as exc:
|
||||
raise ConfigEntryNotReady from exc
|
||||
raise ConfigEntryNotReady(f"Timed out connecting to {conf[CONF_HOST]}") from exc
|
||||
|
||||
elk_temp_unit = elk.panel.temperature_units # pylint: disable=no-member
|
||||
temperature_unit = TEMP_CELSIUS if elk_temp_unit == "C" else TEMP_FAHRENHEIT
|
||||
config["temperature_unit"] = temperature_unit
|
||||
hass.data[DOMAIN][entry.entry_id] = {
|
||||
"elk": elk,
|
||||
"prefix": conf[CONF_PREFIX],
|
||||
|
@ -298,38 +323,42 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
return unload_ok
|
||||
|
||||
|
||||
async def async_wait_for_elk_to_sync(elk, timeout, conf_host):
|
||||
async def async_wait_for_elk_to_sync(
|
||||
elk: elkm1.Elk, login_timeout: int, sync_timeout: int, conf_host: str
|
||||
) -> bool:
|
||||
"""Wait until the elk has finished sync. Can fail login or timeout."""
|
||||
|
||||
sync_event = asyncio.Event()
|
||||
login_event = asyncio.Event()
|
||||
|
||||
def login_status(succeeded):
|
||||
nonlocal success
|
||||
|
||||
success = succeeded
|
||||
if succeeded:
|
||||
_LOGGER.debug("ElkM1 login succeeded")
|
||||
login_event.set()
|
||||
else:
|
||||
elk.disconnect()
|
||||
_LOGGER.error("ElkM1 login failed; invalid username or password")
|
||||
event.set()
|
||||
login_event.set()
|
||||
sync_event.set()
|
||||
|
||||
def sync_complete():
|
||||
event.set()
|
||||
sync_event.set()
|
||||
|
||||
success = True
|
||||
event = asyncio.Event()
|
||||
elk.add_handler("login", login_status)
|
||||
elk.add_handler("sync_complete", sync_complete)
|
||||
try:
|
||||
async with async_timeout.timeout(timeout):
|
||||
await event.wait()
|
||||
except asyncio.TimeoutError:
|
||||
_LOGGER.error(
|
||||
"Timed out after %d seconds while trying to sync with ElkM1 at %s",
|
||||
timeout,
|
||||
conf_host,
|
||||
)
|
||||
elk.disconnect()
|
||||
raise
|
||||
events = ((login_event, login_timeout), (sync_event, sync_timeout))
|
||||
|
||||
for event, timeout in events:
|
||||
try:
|
||||
async with async_timeout.timeout(timeout):
|
||||
await event.wait()
|
||||
except asyncio.TimeoutError:
|
||||
elk.disconnect()
|
||||
raise
|
||||
|
||||
return success
|
||||
|
||||
|
@ -392,6 +421,7 @@ class ElkEntity(Entity):
|
|||
self._elk = elk
|
||||
self._element = element
|
||||
self._prefix = elk_data["prefix"]
|
||||
self._name_prefix = f"{self._prefix} " if self._prefix else ""
|
||||
self._temperature_unit = elk_data["config"]["temperature_unit"]
|
||||
# unique_id starts with elkm1_ iff there is no prefix
|
||||
# it starts with elkm1m_{prefix} iff there is a prefix
|
||||
|
@ -410,7 +440,7 @@ class ElkEntity(Entity):
|
|||
@property
|
||||
def name(self):
|
||||
"""Name of the element."""
|
||||
return f"{self._prefix}{self._element.name}"
|
||||
return f"{self._name_prefix}{self._element.name}"
|
||||
|
||||
@property
|
||||
def unique_id(self):
|
||||
|
|
|
@ -1,27 +1,42 @@
|
|||
"""Config flow for Elk-M1 Control integration."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import elkm1_lib as elkm1
|
||||
from elkm1_lib.discovery import ElkSystem
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant import config_entries, exceptions
|
||||
from homeassistant.components import dhcp
|
||||
from homeassistant.const import (
|
||||
CONF_ADDRESS,
|
||||
CONF_HOST,
|
||||
CONF_PASSWORD,
|
||||
CONF_PREFIX,
|
||||
CONF_PROTOCOL,
|
||||
CONF_TEMPERATURE_UNIT,
|
||||
CONF_USERNAME,
|
||||
TEMP_CELSIUS,
|
||||
TEMP_FAHRENHEIT,
|
||||
)
|
||||
from homeassistant.data_entry_flow import FlowResult
|
||||
from homeassistant.helpers import device_registry as dr
|
||||
from homeassistant.helpers.typing import DiscoveryInfoType
|
||||
from homeassistant.util import slugify
|
||||
|
||||
from . import async_wait_for_elk_to_sync
|
||||
from .const import CONF_AUTO_CONFIGURE, DOMAIN
|
||||
from .const import CONF_AUTO_CONFIGURE, DISCOVER_SCAN_TIMEOUT, DOMAIN, LOGIN_TIMEOUT
|
||||
from .discovery import (
|
||||
_short_mac,
|
||||
async_discover_device,
|
||||
async_discover_devices,
|
||||
async_update_entry_from_discovery,
|
||||
)
|
||||
|
||||
CONF_DEVICE = "device"
|
||||
|
||||
SECURE_PORT = 2601
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
@ -32,25 +47,20 @@ PROTOCOL_MAP = {
|
|||
"serial": "serial://",
|
||||
}
|
||||
|
||||
DATA_SCHEMA = vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_PROTOCOL, default="secure"): vol.In(
|
||||
["secure", "TLS 1.2", "non-secure", "serial"]
|
||||
),
|
||||
vol.Required(CONF_ADDRESS): str,
|
||||
vol.Optional(CONF_USERNAME, default=""): str,
|
||||
vol.Optional(CONF_PASSWORD, default=""): str,
|
||||
vol.Optional(CONF_PREFIX, default=""): str,
|
||||
vol.Optional(CONF_TEMPERATURE_UNIT, default=TEMP_FAHRENHEIT): vol.In(
|
||||
[TEMP_FAHRENHEIT, TEMP_CELSIUS]
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
VALIDATE_TIMEOUT = 35
|
||||
|
||||
BASE_SCHEMA = {
|
||||
vol.Optional(CONF_USERNAME, default=""): str,
|
||||
vol.Optional(CONF_PASSWORD, default=""): str,
|
||||
}
|
||||
|
||||
async def validate_input(data):
|
||||
SECURE_PROTOCOLS = ["secure", "TLS 1.2"]
|
||||
ALL_PROTOCOLS = [*SECURE_PROTOCOLS, "non-secure", "serial"]
|
||||
DEFAULT_SECURE_PROTOCOL = "secure"
|
||||
DEFAULT_NON_SECURE_PROTOCOL = "non-secure"
|
||||
|
||||
|
||||
async def validate_input(data: dict[str, str], mac: str | None) -> dict[str, str]:
|
||||
"""Validate the user input allows us to connect.
|
||||
|
||||
Data has the keys from DATA_SCHEMA with values provided by the user.
|
||||
|
@ -70,11 +80,16 @@ async def validate_input(data):
|
|||
)
|
||||
elk.connect()
|
||||
|
||||
if not await async_wait_for_elk_to_sync(elk, VALIDATE_TIMEOUT, url):
|
||||
if not await async_wait_for_elk_to_sync(elk, LOGIN_TIMEOUT, VALIDATE_TIMEOUT, url):
|
||||
raise InvalidAuth
|
||||
|
||||
device_name = data[CONF_PREFIX] if data[CONF_PREFIX] else "ElkM1"
|
||||
# Return info that you want to store in the config entry.
|
||||
short_mac = _short_mac(mac) if mac else None
|
||||
if prefix and prefix != short_mac:
|
||||
device_name = prefix
|
||||
elif mac:
|
||||
device_name = f"ElkM1 {short_mac}"
|
||||
else:
|
||||
device_name = "ElkM1"
|
||||
return {"title": device_name, CONF_HOST: url, CONF_PREFIX: slugify(prefix)}
|
||||
|
||||
|
||||
|
@ -87,6 +102,13 @@ def _make_url_from_data(data):
|
|||
return f"{protocol}{address}"
|
||||
|
||||
|
||||
def _placeholders_from_device(device: ElkSystem) -> dict[str, str]:
|
||||
return {
|
||||
"mac_address": _short_mac(device.mac_address),
|
||||
"host": f"{device.ip_address}:{device.port}",
|
||||
}
|
||||
|
||||
|
||||
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
"""Handle a config flow for Elk-M1 Control."""
|
||||
|
||||
|
@ -94,53 +116,200 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
|
||||
def __init__(self):
|
||||
"""Initialize the elkm1 config flow."""
|
||||
self.importing = False
|
||||
self._discovered_device: ElkSystem | None = None
|
||||
self._discovered_devices: dict[str, ElkSystem] = {}
|
||||
|
||||
async def async_step_user(self, user_input=None):
|
||||
async def async_step_dhcp(self, discovery_info: dhcp.DhcpServiceInfo) -> FlowResult:
|
||||
"""Handle discovery via dhcp."""
|
||||
self._discovered_device = ElkSystem(
|
||||
discovery_info.macaddress, discovery_info.ip, 0
|
||||
)
|
||||
return await self._async_handle_discovery()
|
||||
|
||||
async def async_step_discovery(
|
||||
self, discovery_info: DiscoveryInfoType
|
||||
) -> FlowResult:
|
||||
"""Handle discovery."""
|
||||
self._discovered_device = ElkSystem(
|
||||
discovery_info["mac_address"],
|
||||
discovery_info["ip_address"],
|
||||
discovery_info["port"],
|
||||
)
|
||||
return await self._async_handle_discovery()
|
||||
|
||||
async def _async_handle_discovery(self) -> FlowResult:
|
||||
"""Handle any discovery."""
|
||||
device = self._discovered_device
|
||||
assert device is not None
|
||||
mac = dr.format_mac(device.mac_address)
|
||||
host = device.ip_address
|
||||
await self.async_set_unique_id(mac)
|
||||
for entry in self._async_current_entries(include_ignore=False):
|
||||
if (
|
||||
entry.unique_id == mac
|
||||
or urlparse(entry.data[CONF_HOST]).hostname == host
|
||||
):
|
||||
if async_update_entry_from_discovery(self.hass, entry, device):
|
||||
self.hass.async_create_task(
|
||||
self.hass.config_entries.async_reload(entry.entry_id)
|
||||
)
|
||||
return self.async_abort(reason="already_configured")
|
||||
self.context[CONF_HOST] = host
|
||||
for progress in self._async_in_progress():
|
||||
if progress.get("context", {}).get(CONF_HOST) == host:
|
||||
return self.async_abort(reason="already_in_progress")
|
||||
if not device.port:
|
||||
if discovered_device := await async_discover_device(self.hass, host):
|
||||
self._discovered_device = discovered_device
|
||||
else:
|
||||
return self.async_abort(reason="cannot_connect")
|
||||
return await self.async_step_discovery_confirm()
|
||||
|
||||
async def async_step_discovery_confirm(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> FlowResult:
|
||||
"""Confirm discovery."""
|
||||
assert self._discovered_device is not None
|
||||
self.context["title_placeholders"] = _placeholders_from_device(
|
||||
self._discovered_device
|
||||
)
|
||||
return await self.async_step_discovered_connection()
|
||||
|
||||
async def async_step_user(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> FlowResult:
|
||||
"""Handle the initial step."""
|
||||
errors = {}
|
||||
if user_input is not None:
|
||||
if self._url_already_configured(_make_url_from_data(user_input)):
|
||||
return self.async_abort(reason="address_already_configured")
|
||||
if mac := user_input[CONF_DEVICE]:
|
||||
await self.async_set_unique_id(mac, raise_on_progress=False)
|
||||
self._discovered_device = self._discovered_devices[mac]
|
||||
return await self.async_step_discovered_connection()
|
||||
return await self.async_step_manual_connection()
|
||||
|
||||
try:
|
||||
info = await validate_input(user_input)
|
||||
current_unique_ids = self._async_current_ids()
|
||||
current_hosts = {
|
||||
urlparse(entry.data[CONF_HOST]).hostname
|
||||
for entry in self._async_current_entries(include_ignore=False)
|
||||
}
|
||||
discovered_devices = await async_discover_devices(
|
||||
self.hass, DISCOVER_SCAN_TIMEOUT
|
||||
)
|
||||
self._discovered_devices = {
|
||||
dr.format_mac(device.mac_address): device for device in discovered_devices
|
||||
}
|
||||
devices_name: dict[str | None, str] = {
|
||||
mac: f"{_short_mac(device.mac_address)} ({device.ip_address})"
|
||||
for mac, device in self._discovered_devices.items()
|
||||
if mac not in current_unique_ids and device.ip_address not in current_hosts
|
||||
}
|
||||
if not devices_name:
|
||||
return await self.async_step_manual_connection()
|
||||
devices_name[None] = "Manual Entry"
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
data_schema=vol.Schema({vol.Required(CONF_DEVICE): vol.In(devices_name)}),
|
||||
)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
errors["base"] = "cannot_connect"
|
||||
except InvalidAuth:
|
||||
errors["base"] = "invalid_auth"
|
||||
except Exception: # pylint: disable=broad-except
|
||||
_LOGGER.exception("Unexpected exception")
|
||||
errors["base"] = "unknown"
|
||||
async def _async_create_or_error(
|
||||
self, user_input: dict[str, Any], importing: bool
|
||||
) -> tuple[dict[str, str] | None, FlowResult | None]:
|
||||
"""Try to connect and create the entry or error."""
|
||||
if self._url_already_configured(_make_url_from_data(user_input)):
|
||||
return None, self.async_abort(reason="address_already_configured")
|
||||
|
||||
if "base" not in errors:
|
||||
await self.async_set_unique_id(user_input[CONF_PREFIX])
|
||||
self._abort_if_unique_id_configured()
|
||||
try:
|
||||
info = await validate_input(user_input, self.unique_id)
|
||||
except asyncio.TimeoutError:
|
||||
return {CONF_HOST: "cannot_connect"}, None
|
||||
except InvalidAuth:
|
||||
return {CONF_PASSWORD: "invalid_auth"}, None
|
||||
except Exception: # pylint: disable=broad-except
|
||||
_LOGGER.exception("Unexpected exception")
|
||||
return {"base": "unknown"}, None
|
||||
|
||||
if self.importing:
|
||||
return self.async_create_entry(title=info["title"], data=user_input)
|
||||
if importing:
|
||||
return None, self.async_create_entry(title=info["title"], data=user_input)
|
||||
|
||||
return self.async_create_entry(
|
||||
title=info["title"],
|
||||
data={
|
||||
CONF_HOST: info[CONF_HOST],
|
||||
CONF_USERNAME: user_input[CONF_USERNAME],
|
||||
CONF_PASSWORD: user_input[CONF_PASSWORD],
|
||||
CONF_AUTO_CONFIGURE: True,
|
||||
CONF_TEMPERATURE_UNIT: user_input[CONF_TEMPERATURE_UNIT],
|
||||
CONF_PREFIX: info[CONF_PREFIX],
|
||||
},
|
||||
)
|
||||
return None, self.async_create_entry(
|
||||
title=info["title"],
|
||||
data={
|
||||
CONF_HOST: info[CONF_HOST],
|
||||
CONF_USERNAME: user_input[CONF_USERNAME],
|
||||
CONF_PASSWORD: user_input[CONF_PASSWORD],
|
||||
CONF_AUTO_CONFIGURE: True,
|
||||
CONF_PREFIX: info[CONF_PREFIX],
|
||||
},
|
||||
)
|
||||
|
||||
async def async_step_discovered_connection(self, user_input=None):
|
||||
"""Handle connecting the device when we have a discovery."""
|
||||
errors = {}
|
||||
device = self._discovered_device
|
||||
assert device is not None
|
||||
if user_input is not None:
|
||||
user_input[CONF_ADDRESS] = f"{device.ip_address}:{device.port}"
|
||||
if self._async_current_entries():
|
||||
user_input[CONF_PREFIX] = _short_mac(device.mac_address)
|
||||
else:
|
||||
user_input[CONF_PREFIX] = ""
|
||||
if device.port != SECURE_PORT:
|
||||
user_input[CONF_PROTOCOL] = DEFAULT_NON_SECURE_PROTOCOL
|
||||
errors, result = await self._async_create_or_error(user_input, False)
|
||||
if not errors:
|
||||
return result
|
||||
|
||||
base_schmea = BASE_SCHEMA.copy()
|
||||
if device.port == SECURE_PORT:
|
||||
base_schmea[
|
||||
vol.Required(CONF_PROTOCOL, default=DEFAULT_SECURE_PROTOCOL)
|
||||
] = vol.In(SECURE_PROTOCOLS)
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="user", data_schema=DATA_SCHEMA, errors=errors
|
||||
step_id="discovered_connection",
|
||||
data_schema=vol.Schema(base_schmea),
|
||||
errors=errors,
|
||||
description_placeholders=_placeholders_from_device(device),
|
||||
)
|
||||
|
||||
async def async_step_manual_connection(self, user_input=None):
|
||||
"""Handle connecting the device when we need manual entry."""
|
||||
errors = {}
|
||||
if user_input is not None:
|
||||
# We might be able to discover the device via directed UDP
|
||||
# in case its on another subnet
|
||||
if device := await async_discover_device(
|
||||
self.hass, user_input[CONF_ADDRESS]
|
||||
):
|
||||
await self.async_set_unique_id(dr.format_mac(device.mac_address))
|
||||
self._abort_if_unique_id_configured()
|
||||
user_input[CONF_ADDRESS] = f"{device.ip_address}:{device.port}"
|
||||
errors, result = await self._async_create_or_error(user_input, False)
|
||||
if not errors:
|
||||
return result
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="manual_connection",
|
||||
data_schema=vol.Schema(
|
||||
{
|
||||
**BASE_SCHEMA,
|
||||
vol.Required(CONF_ADDRESS): str,
|
||||
vol.Optional(CONF_PREFIX, default=""): str,
|
||||
vol.Required(
|
||||
CONF_PROTOCOL, default=DEFAULT_SECURE_PROTOCOL
|
||||
): vol.In(ALL_PROTOCOLS),
|
||||
}
|
||||
),
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
async def async_step_import(self, user_input):
|
||||
"""Handle import."""
|
||||
self.importing = True
|
||||
return await self.async_step_user(user_input)
|
||||
if device := await async_discover_device(
|
||||
self.hass, urlparse(user_input[CONF_HOST]).hostname
|
||||
):
|
||||
await self.async_set_unique_id(dr.format_mac(device.mac_address))
|
||||
self._abort_if_unique_id_configured()
|
||||
return (await self._async_create_or_error(user_input, True))[1]
|
||||
|
||||
def _url_already_configured(self, url):
|
||||
"""See if we already have a elkm1 matching user input configured."""
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
"""Support the ElkM1 Gold and ElkM1 EZ8 alarm/integration panels."""
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
from elkm1_lib.const import Max
|
||||
import voluptuous as vol
|
||||
|
||||
|
@ -7,6 +9,8 @@ from homeassistant.const import ATTR_CODE, CONF_ZONE
|
|||
|
||||
DOMAIN = "elkm1"
|
||||
|
||||
LOGIN_TIMEOUT = 15
|
||||
|
||||
CONF_AUTO_CONFIGURE = "auto_configure"
|
||||
CONF_AREA = "area"
|
||||
CONF_COUNTER = "counter"
|
||||
|
@ -18,9 +22,8 @@ CONF_SETTING = "setting"
|
|||
CONF_TASK = "task"
|
||||
CONF_THERMOSTAT = "thermostat"
|
||||
|
||||
|
||||
BARE_TEMP_FAHRENHEIT = "F"
|
||||
BARE_TEMP_CELSIUS = "C"
|
||||
DISCOVER_SCAN_TIMEOUT = 10
|
||||
DISCOVERY_INTERVAL = timedelta(minutes=15)
|
||||
|
||||
ELK_ELEMENTS = {
|
||||
CONF_AREA: Max.AREAS.value,
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
"""The elkm1 integration discovery."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from dataclasses import asdict
|
||||
import logging
|
||||
|
||||
from elkm1_lib.discovery import AIOELKDiscovery, ElkSystem
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.components import network
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.helpers import device_registry as dr
|
||||
|
||||
from .const import DISCOVER_SCAN_TIMEOUT, DOMAIN
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _short_mac(mac_address: str) -> str:
|
||||
return mac_address.replace(":", "")[-6:]
|
||||
|
||||
|
||||
@callback
|
||||
def async_update_entry_from_discovery(
|
||||
hass: HomeAssistant,
|
||||
entry: config_entries.ConfigEntry,
|
||||
device: ElkSystem,
|
||||
) -> bool:
|
||||
"""Update a config entry from a discovery."""
|
||||
if not entry.unique_id or ":" not in entry.unique_id:
|
||||
return hass.config_entries.async_update_entry(
|
||||
entry, unique_id=dr.format_mac(device.mac_address)
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
async def async_discover_devices(
|
||||
hass: HomeAssistant, timeout: int, address: str | None = None
|
||||
) -> list[ElkSystem]:
|
||||
"""Discover elkm1 devices."""
|
||||
if address:
|
||||
targets = [address]
|
||||
else:
|
||||
targets = [
|
||||
str(address)
|
||||
for address in await network.async_get_ipv4_broadcast_addresses(hass)
|
||||
]
|
||||
|
||||
scanner = AIOELKDiscovery()
|
||||
combined_discoveries: dict[str, ElkSystem] = {}
|
||||
for idx, discovered in enumerate(
|
||||
await asyncio.gather(
|
||||
*[
|
||||
scanner.async_scan(timeout=timeout, address=address)
|
||||
for address in targets
|
||||
],
|
||||
return_exceptions=True,
|
||||
)
|
||||
):
|
||||
if isinstance(discovered, Exception):
|
||||
_LOGGER.debug("Scanning %s failed with error: %s", targets[idx], discovered)
|
||||
continue
|
||||
for device in discovered:
|
||||
assert isinstance(device, ElkSystem)
|
||||
combined_discoveries[device.ip_address] = device
|
||||
|
||||
return list(combined_discoveries.values())
|
||||
|
||||
|
||||
async def async_discover_device(hass: HomeAssistant, host: str) -> ElkSystem | None:
|
||||
"""Direct discovery at a single ip instead of broadcast."""
|
||||
# If we are missing the unique_id we should be able to fetch it
|
||||
# from the device by doing a directed discovery at the host only
|
||||
for device in await async_discover_devices(hass, DISCOVER_SCAN_TIMEOUT, host):
|
||||
if device.ip_address == host:
|
||||
return device
|
||||
return None
|
||||
|
||||
|
||||
@callback
|
||||
def async_trigger_discovery(
|
||||
hass: HomeAssistant,
|
||||
discovered_devices: list[ElkSystem],
|
||||
) -> None:
|
||||
"""Trigger config flows for discovered devices."""
|
||||
for device in discovered_devices:
|
||||
hass.async_create_task(
|
||||
hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": config_entries.SOURCE_DISCOVERY},
|
||||
data=asdict(device),
|
||||
)
|
||||
)
|
|
@ -2,8 +2,10 @@
|
|||
"domain": "elkm1",
|
||||
"name": "Elk-M1 Control",
|
||||
"documentation": "https://www.home-assistant.io/integrations/elkm1",
|
||||
"requirements": ["elkm1-lib==1.0.0"],
|
||||
"requirements": ["elkm1-lib==1.2.0"],
|
||||
"dhcp": [{"macaddress":"00409D*"}],
|
||||
"codeowners": ["@gwww", "@bdraco"],
|
||||
"dependencies": ["network"],
|
||||
"config_flow": true,
|
||||
"iot_class": "local_push",
|
||||
"loggers": ["elkm1_lib"]
|
||||
|
|
|
@ -1,8 +1,16 @@
|
|||
{
|
||||
"config": {
|
||||
"flow_title": "{mac_address} ({host})",
|
||||
"step": {
|
||||
"user": {
|
||||
"title": "Connect to Elk-M1 Control",
|
||||
"description": "Choose a discovered system or 'Manual Entry' if no devices have been discovered.",
|
||||
"data": {
|
||||
"device": "Device"
|
||||
}
|
||||
},
|
||||
"manual_connection": {
|
||||
"title": "[%key:component::elkm1::config::step::user::title%]",
|
||||
"description": "The address string must be in the form 'address[:port]' for 'secure' and 'non-secure'. Example: '192.168.1.1'. The port is optional and defaults to 2101 for 'non-secure' and 2601 for 'secure'. For the serial protocol, the address must be in the form 'tty[:baud]'. Example: '/dev/ttyS1'. The baud is optional and defaults to 115200.",
|
||||
"data": {
|
||||
"protocol": "Protocol",
|
||||
|
@ -12,6 +20,16 @@
|
|||
"prefix": "A unique prefix (leave blank if you only have one ElkM1).",
|
||||
"temperature_unit": "The temperature unit ElkM1 uses."
|
||||
}
|
||||
},
|
||||
"discovered_connection": {
|
||||
"title": "[%key:component::elkm1::config::step::user::title%]",
|
||||
"description": "Connect to the discovered system: {mac_address} ({host})",
|
||||
"data": {
|
||||
"protocol": "[%key:component::elkm1::config::step::manual_connection::data::protocol%]",
|
||||
"username": "[%key:common::config_flow::data::username%]",
|
||||
"password": "[%key:common::config_flow::data::password%]",
|
||||
"temperature_unit": "[%key:component::elkm1::config::step::manual_connection::data::temperature_unit%]"
|
||||
}
|
||||
}
|
||||
},
|
||||
"error": {
|
||||
|
@ -20,8 +38,10 @@
|
|||
"unknown": "[%key:common::config_flow::error::unknown%]"
|
||||
},
|
||||
"abort": {
|
||||
"already_in_progress": "[%key:common::config_flow::abort::already_in_progress%]",
|
||||
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
|
||||
"already_configured": "An ElkM1 with this prefix is already configured",
|
||||
"address_already_configured": "An ElkM1 with this address is already configured"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,25 +2,43 @@
|
|||
"config": {
|
||||
"abort": {
|
||||
"address_already_configured": "An ElkM1 with this address is already configured",
|
||||
"already_configured": "An ElkM1 with this prefix is already configured"
|
||||
"already_configured": "An ElkM1 with this prefix is already configured",
|
||||
"already_in_progress": "Configuration flow is already in progress",
|
||||
"cannot_connect": "Failed to connect"
|
||||
},
|
||||
"error": {
|
||||
"cannot_connect": "Failed to connect",
|
||||
"invalid_auth": "Invalid authentication",
|
||||
"unknown": "Unexpected error"
|
||||
},
|
||||
"flow_title": "{mac_address} ({host})",
|
||||
"step": {
|
||||
"user": {
|
||||
"discovered_connection": {
|
||||
"data": {
|
||||
"password": "Password",
|
||||
"protocol": "Protocol",
|
||||
"username": "Username"
|
||||
},
|
||||
"description": "Connect to the discovered system: {mac_address} ({host})",
|
||||
"title": "Connect to Elk-M1 Control"
|
||||
},
|
||||
"manual_connection": {
|
||||
"data": {
|
||||
"address": "The IP address or domain or serial port if connecting via serial.",
|
||||
"password": "Password",
|
||||
"prefix": "A unique prefix (leave blank if you only have one ElkM1).",
|
||||
"protocol": "Protocol",
|
||||
"temperature_unit": "The temperature unit ElkM1 uses.",
|
||||
"username": "Username"
|
||||
},
|
||||
"description": "The address string must be in the form 'address[:port]' for 'secure' and 'non-secure'. Example: '192.168.1.1'. The port is optional and defaults to 2101 for 'non-secure' and 2601 for 'secure'. For the serial protocol, the address must be in the form 'tty[:baud]'. Example: '/dev/ttyS1'. The baud is optional and defaults to 115200.",
|
||||
"title": "Connect to Elk-M1 Control"
|
||||
},
|
||||
"user": {
|
||||
"data": {
|
||||
"device": "Device"
|
||||
},
|
||||
"description": "Choose a discovered system or 'Manual Entry' if no devices have been discovered.",
|
||||
"title": "Connect to Elk-M1 Control"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,6 +67,10 @@ DHCP = [
|
|||
"domain": "broadlink",
|
||||
"macaddress": "B4430D*"
|
||||
},
|
||||
{
|
||||
"domain": "elkm1",
|
||||
"macaddress": "00409D*"
|
||||
},
|
||||
{
|
||||
"domain": "emonitor",
|
||||
"hostname": "emonitor*",
|
||||
|
|
|
@ -605,7 +605,7 @@ elgato==3.0.0
|
|||
eliqonline==1.2.2
|
||||
|
||||
# homeassistant.components.elkm1
|
||||
elkm1-lib==1.0.0
|
||||
elkm1-lib==1.2.0
|
||||
|
||||
# homeassistant.components.elmax
|
||||
elmax_api==0.0.2
|
||||
|
|
|
@ -388,7 +388,7 @@ dynalite_devices==0.1.46
|
|||
elgato==3.0.0
|
||||
|
||||
# homeassistant.components.elkm1
|
||||
elkm1-lib==1.0.0
|
||||
elkm1-lib==1.2.0
|
||||
|
||||
# homeassistant.components.elmax
|
||||
elmax_api==0.0.2
|
||||
|
|
|
@ -1 +1,61 @@
|
|||
"""Tests for the Elk-M1 Control integration."""
|
||||
|
||||
from contextlib import contextmanager
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from elkm1_lib.discovery import ElkSystem
|
||||
|
||||
MOCK_IP_ADDRESS = "127.0.0.1"
|
||||
MOCK_MAC = "aa:bb:cc:dd:ee:ff"
|
||||
ELK_DISCOVERY = ElkSystem(MOCK_MAC, MOCK_IP_ADDRESS, 2601)
|
||||
ELK_NON_SECURE_DISCOVERY = ElkSystem(MOCK_MAC, MOCK_IP_ADDRESS, 2101)
|
||||
|
||||
|
||||
def mock_elk(invalid_auth=None, sync_complete=None, exception=None):
|
||||
"""Mock m1lib Elk."""
|
||||
|
||||
def handler_callbacks(type_, callback):
|
||||
nonlocal invalid_auth, sync_complete
|
||||
if exception:
|
||||
raise exception
|
||||
if type_ == "login":
|
||||
callback(not invalid_auth)
|
||||
elif type_ == "sync_complete" and sync_complete:
|
||||
callback()
|
||||
|
||||
mocked_elk = MagicMock()
|
||||
mocked_elk.add_handler.side_effect = handler_callbacks
|
||||
return mocked_elk
|
||||
|
||||
|
||||
def _patch_discovery(device=None, no_device=False):
|
||||
async def _discovery(*args, **kwargs):
|
||||
return [] if no_device else [device or ELK_DISCOVERY]
|
||||
|
||||
@contextmanager
|
||||
def _patcher():
|
||||
with patch(
|
||||
"homeassistant.components.elkm1.discovery.AIOELKDiscovery.async_scan",
|
||||
new=_discovery,
|
||||
):
|
||||
yield
|
||||
|
||||
return _patcher()
|
||||
|
||||
|
||||
def _patch_elk(elk=None):
|
||||
def _elk(*args, **kwargs):
|
||||
return elk if elk else mock_elk()
|
||||
|
||||
@contextmanager
|
||||
def _patcher():
|
||||
with patch(
|
||||
"homeassistant.components.elkm1.config_flow.elkm1.Elk",
|
||||
new=_elk,
|
||||
), patch(
|
||||
"homeassistant.components.elkm1.config_flow.elkm1.Elk",
|
||||
new=_elk,
|
||||
):
|
||||
yield
|
||||
|
||||
return _patcher()
|
||||
|
|
|
@ -1,43 +1,48 @@
|
|||
"""Test the Elk-M1 Control config flow."""
|
||||
from dataclasses import asdict
|
||||
from unittest.mock import patch
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
import pytest
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.components import dhcp
|
||||
from homeassistant.components.elkm1.const import DOMAIN
|
||||
from homeassistant.const import CONF_HOST, CONF_PASSWORD
|
||||
from homeassistant.data_entry_flow import RESULT_TYPE_ABORT, RESULT_TYPE_FORM
|
||||
|
||||
from . import (
|
||||
ELK_DISCOVERY,
|
||||
ELK_NON_SECURE_DISCOVERY,
|
||||
MOCK_IP_ADDRESS,
|
||||
MOCK_MAC,
|
||||
_patch_discovery,
|
||||
_patch_elk,
|
||||
mock_elk,
|
||||
)
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
DHCP_DISCOVERY = dhcp.DhcpServiceInfo(MOCK_IP_ADDRESS, "", MOCK_MAC)
|
||||
ELK_DISCOVERY_INFO = asdict(ELK_DISCOVERY)
|
||||
MODULE = "homeassistant.components.elkm1"
|
||||
|
||||
|
||||
def mock_elk(invalid_auth=None, sync_complete=None):
|
||||
"""Mock m1lib Elk."""
|
||||
|
||||
def handler_callbacks(type_, callback):
|
||||
nonlocal invalid_auth, sync_complete
|
||||
|
||||
if type_ == "login":
|
||||
if invalid_auth is not None:
|
||||
callback(not invalid_auth)
|
||||
elif type_ == "sync_complete" and sync_complete:
|
||||
callback()
|
||||
|
||||
mocked_elk = MagicMock()
|
||||
mocked_elk.add_handler.side_effect = handler_callbacks
|
||||
return mocked_elk
|
||||
|
||||
|
||||
async def test_form_user_with_secure_elk(hass):
|
||||
async def test_form_user_with_secure_elk_no_discovery(hass):
|
||||
"""Test we can setup a secure elk."""
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
with _patch_discovery(no_device=True):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["errors"] == {}
|
||||
assert result["step_id"] == "manual_connection"
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.elkm1.config_flow.elkm1.Elk",
|
||||
return_value=mocked_elk,
|
||||
), patch(
|
||||
with _patch_discovery(no_device=True), _patch_elk(elk=mocked_elk), patch(
|
||||
"homeassistant.components.elkm1.async_setup", return_value=True
|
||||
) as mock_setup, patch(
|
||||
"homeassistant.components.elkm1.async_setup_entry",
|
||||
|
@ -50,7 +55,6 @@ async def test_form_user_with_secure_elk(hass):
|
|||
"address": "1.2.3.4",
|
||||
"username": "test-username",
|
||||
"password": "test-password",
|
||||
"temperature_unit": "°F",
|
||||
"prefix": "",
|
||||
},
|
||||
)
|
||||
|
@ -63,28 +67,227 @@ async def test_form_user_with_secure_elk(hass):
|
|||
"host": "elks://1.2.3.4",
|
||||
"password": "test-password",
|
||||
"prefix": "",
|
||||
"temperature_unit": "°F",
|
||||
"username": "test-username",
|
||||
}
|
||||
assert len(mock_setup.mock_calls) == 1
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_form_user_with_tls_elk(hass):
|
||||
"""Test we can setup a secure elk."""
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
async def test_form_user_with_secure_elk_no_discovery_ip_already_configured(hass):
|
||||
"""Test we abort when we try to configure the same ip."""
|
||||
config_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={CONF_HOST: f"elks://{MOCK_IP_ADDRESS}"},
|
||||
unique_id="cc:cc:cc:cc:cc:cc",
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
with _patch_discovery(no_device=True):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["errors"] == {}
|
||||
assert result["step_id"] == "manual_connection"
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.elkm1.config_flow.elkm1.Elk",
|
||||
return_value=mocked_elk,
|
||||
), patch(
|
||||
with _patch_discovery(no_device=True), _patch_elk(elk=mocked_elk):
|
||||
result2 = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
"protocol": "secure",
|
||||
"address": "127.0.0.1",
|
||||
"username": "test-username",
|
||||
"password": "test-password",
|
||||
"prefix": "",
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result2["type"] == RESULT_TYPE_ABORT
|
||||
assert result2["reason"] == "address_already_configured"
|
||||
|
||||
|
||||
async def test_form_user_with_secure_elk_with_discovery(hass):
|
||||
"""Test we can setup a secure elk."""
|
||||
|
||||
with _patch_discovery():
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["errors"] is None
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
|
||||
|
||||
with _patch_elk(elk=mocked_elk):
|
||||
result2 = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{"device": MOCK_MAC},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
with _patch_discovery(), _patch_elk(elk=mocked_elk), patch(
|
||||
"homeassistant.components.elkm1.async_setup", return_value=True
|
||||
) as mock_setup, patch(
|
||||
"homeassistant.components.elkm1.async_setup_entry",
|
||||
return_value=True,
|
||||
) as mock_setup_entry:
|
||||
result3 = await hass.config_entries.flow.async_configure(
|
||||
result2["flow_id"],
|
||||
{
|
||||
"username": "test-username",
|
||||
"password": "test-password",
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result3["type"] == "create_entry"
|
||||
assert result3["title"] == "ElkM1 ddeeff"
|
||||
assert result3["data"] == {
|
||||
"auto_configure": True,
|
||||
"host": "elks://127.0.0.1:2601",
|
||||
"password": "test-password",
|
||||
"prefix": "",
|
||||
"username": "test-username",
|
||||
}
|
||||
assert result3["result"].unique_id == "aa:bb:cc:dd:ee:ff"
|
||||
assert len(mock_setup.mock_calls) == 1
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_form_user_with_secure_elk_with_discovery_pick_manual(hass):
|
||||
"""Test we can setup a secure elk with discovery but user picks manual and directed discovery fails."""
|
||||
|
||||
with _patch_discovery():
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["errors"] is None
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
|
||||
|
||||
with _patch_elk(elk=mocked_elk):
|
||||
result2 = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{"device": None},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
with _patch_discovery(), _patch_elk(elk=mocked_elk), patch(
|
||||
"homeassistant.components.elkm1.async_setup", return_value=True
|
||||
) as mock_setup, patch(
|
||||
"homeassistant.components.elkm1.async_setup_entry",
|
||||
return_value=True,
|
||||
) as mock_setup_entry:
|
||||
result3 = await hass.config_entries.flow.async_configure(
|
||||
result2["flow_id"],
|
||||
{
|
||||
"protocol": "secure",
|
||||
"address": "1.2.3.4",
|
||||
"username": "test-username",
|
||||
"password": "test-password",
|
||||
"prefix": "",
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result3["type"] == "create_entry"
|
||||
assert result3["title"] == "ElkM1"
|
||||
assert result3["data"] == {
|
||||
"auto_configure": True,
|
||||
"host": "elks://1.2.3.4",
|
||||
"password": "test-password",
|
||||
"prefix": "",
|
||||
"username": "test-username",
|
||||
}
|
||||
assert result3["result"].unique_id is None
|
||||
assert len(mock_setup.mock_calls) == 1
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_form_user_with_secure_elk_with_discovery_pick_manual_direct_discovery(
|
||||
hass,
|
||||
):
|
||||
"""Test we can setup a secure elk with discovery but user picks manual and directed discovery succeeds."""
|
||||
|
||||
with _patch_discovery():
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["errors"] is None
|
||||
assert result["step_id"] == "user"
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
|
||||
|
||||
with _patch_elk(elk=mocked_elk):
|
||||
result2 = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{"device": None},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
with _patch_discovery(), _patch_elk(elk=mocked_elk), patch(
|
||||
"homeassistant.components.elkm1.async_setup", return_value=True
|
||||
) as mock_setup, patch(
|
||||
"homeassistant.components.elkm1.async_setup_entry",
|
||||
return_value=True,
|
||||
) as mock_setup_entry:
|
||||
result3 = await hass.config_entries.flow.async_configure(
|
||||
result2["flow_id"],
|
||||
{
|
||||
"protocol": "secure",
|
||||
"address": "127.0.0.1",
|
||||
"username": "test-username",
|
||||
"password": "test-password",
|
||||
"prefix": "",
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result3["type"] == "create_entry"
|
||||
assert result3["title"] == "ElkM1 ddeeff"
|
||||
assert result3["data"] == {
|
||||
"auto_configure": True,
|
||||
"host": "elks://127.0.0.1:2601",
|
||||
"password": "test-password",
|
||||
"prefix": "",
|
||||
"username": "test-username",
|
||||
}
|
||||
assert result3["result"].unique_id == MOCK_MAC
|
||||
assert len(mock_setup.mock_calls) == 1
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_form_user_with_tls_elk_no_discovery(hass):
|
||||
"""Test we can setup a secure elk."""
|
||||
|
||||
with _patch_discovery(no_device=True):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["errors"] == {}
|
||||
assert result["step_id"] == "manual_connection"
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
|
||||
|
||||
with _patch_discovery(no_device=True), _patch_elk(elk=mocked_elk), patch(
|
||||
"homeassistant.components.elkm1.async_setup", return_value=True
|
||||
) as mock_setup, patch(
|
||||
"homeassistant.components.elkm1.async_setup_entry",
|
||||
|
@ -97,7 +300,6 @@ async def test_form_user_with_tls_elk(hass):
|
|||
"address": "1.2.3.4",
|
||||
"username": "test-username",
|
||||
"password": "test-password",
|
||||
"temperature_unit": "°F",
|
||||
"prefix": "",
|
||||
},
|
||||
)
|
||||
|
@ -110,28 +312,28 @@ async def test_form_user_with_tls_elk(hass):
|
|||
"host": "elksv1_2://1.2.3.4",
|
||||
"password": "test-password",
|
||||
"prefix": "",
|
||||
"temperature_unit": "°F",
|
||||
"username": "test-username",
|
||||
}
|
||||
assert len(mock_setup.mock_calls) == 1
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_form_user_with_non_secure_elk(hass):
|
||||
async def test_form_user_with_non_secure_elk_no_discovery(hass):
|
||||
"""Test we can setup a non-secure elk."""
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
with _patch_discovery(no_device=True):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["errors"] == {}
|
||||
assert result["step_id"] == "manual_connection"
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=None, sync_complete=True)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.elkm1.config_flow.elkm1.Elk",
|
||||
return_value=mocked_elk,
|
||||
), patch(
|
||||
with _patch_discovery(no_device=True), _patch_elk(elk=mocked_elk), patch(
|
||||
"homeassistant.components.elkm1.async_setup", return_value=True
|
||||
) as mock_setup, patch(
|
||||
"homeassistant.components.elkm1.async_setup_entry",
|
||||
|
@ -142,7 +344,6 @@ async def test_form_user_with_non_secure_elk(hass):
|
|||
{
|
||||
"protocol": "non-secure",
|
||||
"address": "1.2.3.4",
|
||||
"temperature_unit": "°F",
|
||||
"prefix": "guest_house",
|
||||
},
|
||||
)
|
||||
|
@ -156,27 +357,27 @@ async def test_form_user_with_non_secure_elk(hass):
|
|||
"prefix": "guest_house",
|
||||
"username": "",
|
||||
"password": "",
|
||||
"temperature_unit": "°F",
|
||||
}
|
||||
assert len(mock_setup.mock_calls) == 1
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_form_user_with_serial_elk(hass):
|
||||
async def test_form_user_with_serial_elk_no_discovery(hass):
|
||||
"""Test we can setup a serial elk."""
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
with _patch_discovery(no_device=True):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == "form"
|
||||
assert result["errors"] == {}
|
||||
assert result["step_id"] == "manual_connection"
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=None, sync_complete=True)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.elkm1.config_flow.elkm1.Elk",
|
||||
return_value=mocked_elk,
|
||||
), patch(
|
||||
with _patch_discovery(no_device=True), _patch_elk(elk=mocked_elk), patch(
|
||||
"homeassistant.components.elkm1.async_setup", return_value=True
|
||||
) as mock_setup, patch(
|
||||
"homeassistant.components.elkm1.async_setup_entry",
|
||||
|
@ -187,7 +388,6 @@ async def test_form_user_with_serial_elk(hass):
|
|||
{
|
||||
"protocol": "serial",
|
||||
"address": "/dev/ttyS0:115200",
|
||||
"temperature_unit": "°C",
|
||||
"prefix": "",
|
||||
},
|
||||
)
|
||||
|
@ -201,7 +401,6 @@ async def test_form_user_with_serial_elk(hass):
|
|||
"prefix": "",
|
||||
"username": "",
|
||||
"password": "",
|
||||
"temperature_unit": "°C",
|
||||
}
|
||||
assert len(mock_setup.mock_calls) == 1
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
@ -209,18 +408,19 @@ async def test_form_user_with_serial_elk(hass):
|
|||
|
||||
async def test_form_cannot_connect(hass):
|
||||
"""Test we handle cannot connect error."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
with _patch_discovery(no_device=True):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=None, sync_complete=None)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.elkm1.config_flow.elkm1.Elk",
|
||||
return_value=mocked_elk,
|
||||
), patch(
|
||||
with _patch_discovery(no_device=True), _patch_elk(elk=mocked_elk), patch(
|
||||
"homeassistant.components.elkm1.config_flow.VALIDATE_TIMEOUT",
|
||||
0,
|
||||
), patch(
|
||||
"homeassistant.components.elkm1.config_flow.LOGIN_TIMEOUT",
|
||||
0,
|
||||
):
|
||||
result2 = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
|
@ -229,13 +429,43 @@ async def test_form_cannot_connect(hass):
|
|||
"address": "1.2.3.4",
|
||||
"username": "test-username",
|
||||
"password": "test-password",
|
||||
"temperature_unit": "°F",
|
||||
"prefix": "",
|
||||
},
|
||||
)
|
||||
|
||||
assert result2["type"] == "form"
|
||||
assert result2["errors"] == {"base": "cannot_connect"}
|
||||
assert result2["errors"] == {CONF_HOST: "cannot_connect"}
|
||||
|
||||
|
||||
async def test_unknown_exception(hass):
|
||||
"""Test we handle an unknown exception during connecting."""
|
||||
with _patch_discovery(no_device=True):
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=None, sync_complete=None, exception=OSError)
|
||||
|
||||
with _patch_discovery(no_device=True), _patch_elk(elk=mocked_elk), patch(
|
||||
"homeassistant.components.elkm1.config_flow.VALIDATE_TIMEOUT",
|
||||
0,
|
||||
), patch(
|
||||
"homeassistant.components.elkm1.config_flow.LOGIN_TIMEOUT",
|
||||
0,
|
||||
):
|
||||
result2 = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
"protocol": "secure",
|
||||
"address": "1.2.3.4",
|
||||
"username": "test-username",
|
||||
"password": "test-password",
|
||||
"prefix": "",
|
||||
},
|
||||
)
|
||||
|
||||
assert result2["type"] == "form"
|
||||
assert result2["errors"] == {"base": "unknown"}
|
||||
|
||||
|
||||
async def test_form_invalid_auth(hass):
|
||||
|
@ -257,23 +487,46 @@ async def test_form_invalid_auth(hass):
|
|||
"address": "1.2.3.4",
|
||||
"username": "test-username",
|
||||
"password": "test-password",
|
||||
"temperature_unit": "°F",
|
||||
"prefix": "",
|
||||
},
|
||||
)
|
||||
|
||||
assert result2["type"] == "form"
|
||||
assert result2["errors"] == {"base": "invalid_auth"}
|
||||
assert result2["errors"] == {CONF_PASSWORD: "invalid_auth"}
|
||||
|
||||
|
||||
async def test_form_invalid_auth_no_password(hass):
|
||||
"""Test we handle invalid auth error when no password is provided."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=True, sync_complete=True)
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.elkm1.config_flow.elkm1.Elk",
|
||||
return_value=mocked_elk,
|
||||
):
|
||||
result2 = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
"protocol": "secure",
|
||||
"address": "1.2.3.4",
|
||||
"username": "test-username",
|
||||
"password": "",
|
||||
"prefix": "",
|
||||
},
|
||||
)
|
||||
|
||||
assert result2["type"] == "form"
|
||||
assert result2["errors"] == {CONF_PASSWORD: "invalid_auth"}
|
||||
|
||||
|
||||
async def test_form_import(hass):
|
||||
"""Test we get the form with import source."""
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
|
||||
with patch(
|
||||
"homeassistant.components.elkm1.config_flow.elkm1.Elk",
|
||||
return_value=mocked_elk,
|
||||
), patch(
|
||||
with _patch_discovery(no_device=True), _patch_elk(elk=mocked_elk), patch(
|
||||
"homeassistant.components.elkm1.async_setup", return_value=True
|
||||
) as mock_setup, patch(
|
||||
"homeassistant.components.elkm1.async_setup_entry",
|
||||
|
@ -332,3 +585,381 @@ async def test_form_import(hass):
|
|||
}
|
||||
assert len(mock_setup.mock_calls) == 1
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_form_import_device_discovered(hass):
|
||||
"""Test we can import with discovery."""
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
|
||||
with _patch_discovery(), _patch_elk(elk=mocked_elk), patch(
|
||||
"homeassistant.components.elkm1.async_setup", return_value=True
|
||||
) as mock_setup, patch(
|
||||
"homeassistant.components.elkm1.async_setup_entry",
|
||||
return_value=True,
|
||||
) as mock_setup_entry:
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": config_entries.SOURCE_IMPORT},
|
||||
data={
|
||||
"host": "elks://127.0.0.1",
|
||||
"username": "friend",
|
||||
"password": "love",
|
||||
"temperature_unit": "C",
|
||||
"auto_configure": False,
|
||||
"keypad": {
|
||||
"enabled": True,
|
||||
"exclude": [],
|
||||
"include": [[1, 1], [2, 2], [3, 3]],
|
||||
},
|
||||
"output": {"enabled": False, "exclude": [], "include": []},
|
||||
"counter": {"enabled": False, "exclude": [], "include": []},
|
||||
"plc": {"enabled": False, "exclude": [], "include": []},
|
||||
"prefix": "ohana",
|
||||
"setting": {"enabled": False, "exclude": [], "include": []},
|
||||
"area": {"enabled": False, "exclude": [], "include": []},
|
||||
"task": {"enabled": False, "exclude": [], "include": []},
|
||||
"thermostat": {"enabled": False, "exclude": [], "include": []},
|
||||
"zone": {
|
||||
"enabled": True,
|
||||
"exclude": [[15, 15], [28, 208]],
|
||||
"include": [],
|
||||
},
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == "create_entry"
|
||||
assert result["title"] == "ohana"
|
||||
assert result["result"].unique_id == MOCK_MAC
|
||||
assert result["data"] == {
|
||||
"auto_configure": False,
|
||||
"host": "elks://127.0.0.1",
|
||||
"keypad": {"enabled": True, "exclude": [], "include": [[1, 1], [2, 2], [3, 3]]},
|
||||
"output": {"enabled": False, "exclude": [], "include": []},
|
||||
"password": "love",
|
||||
"plc": {"enabled": False, "exclude": [], "include": []},
|
||||
"prefix": "ohana",
|
||||
"setting": {"enabled": False, "exclude": [], "include": []},
|
||||
"area": {"enabled": False, "exclude": [], "include": []},
|
||||
"counter": {"enabled": False, "exclude": [], "include": []},
|
||||
"task": {"enabled": False, "exclude": [], "include": []},
|
||||
"temperature_unit": "C",
|
||||
"thermostat": {"enabled": False, "exclude": [], "include": []},
|
||||
"username": "friend",
|
||||
"zone": {"enabled": True, "exclude": [[15, 15], [28, 208]], "include": []},
|
||||
}
|
||||
assert len(mock_setup.mock_calls) == 1
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"source, data",
|
||||
[
|
||||
(config_entries.SOURCE_DHCP, DHCP_DISCOVERY),
|
||||
(config_entries.SOURCE_DISCOVERY, ELK_DISCOVERY_INFO),
|
||||
],
|
||||
)
|
||||
async def test_discovered_by_dhcp_or_discovery_mac_address_mismatch_host_already_configured(
|
||||
hass, source, data
|
||||
):
|
||||
"""Test we abort if the host is already configured but the mac does not match."""
|
||||
config_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={CONF_HOST: f"elks://{MOCK_IP_ADDRESS}"},
|
||||
unique_id="cc:cc:cc:cc:cc:cc",
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
with _patch_discovery(), _patch_elk():
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": source}, data=data
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == RESULT_TYPE_ABORT
|
||||
assert result["reason"] == "already_configured"
|
||||
|
||||
assert config_entry.unique_id == "cc:cc:cc:cc:cc:cc"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"source, data",
|
||||
[
|
||||
(config_entries.SOURCE_DHCP, DHCP_DISCOVERY),
|
||||
(config_entries.SOURCE_DISCOVERY, ELK_DISCOVERY_INFO),
|
||||
],
|
||||
)
|
||||
async def test_discovered_by_dhcp_or_discovery_adds_missing_unique_id(
|
||||
hass, source, data
|
||||
):
|
||||
"""Test we add a missing unique id to the config entry."""
|
||||
config_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={CONF_HOST: f"elks://{MOCK_IP_ADDRESS}"},
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
with _patch_discovery(), _patch_elk():
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": source}, data=data
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == RESULT_TYPE_ABORT
|
||||
assert result["reason"] == "already_configured"
|
||||
|
||||
assert config_entry.unique_id == MOCK_MAC
|
||||
|
||||
|
||||
async def test_discovered_by_discovery_and_dhcp(hass):
|
||||
"""Test we get the form with discovery and abort for dhcp source when we get both."""
|
||||
|
||||
with _patch_discovery(), _patch_elk():
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": config_entries.SOURCE_DISCOVERY},
|
||||
data=ELK_DISCOVERY_INFO,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
assert result["type"] == RESULT_TYPE_FORM
|
||||
assert result["errors"] == {}
|
||||
|
||||
with _patch_discovery(), _patch_elk():
|
||||
result2 = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": config_entries.SOURCE_DHCP},
|
||||
data=DHCP_DISCOVERY,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
assert result2["type"] == RESULT_TYPE_ABORT
|
||||
assert result2["reason"] == "already_in_progress"
|
||||
|
||||
with _patch_discovery(), _patch_elk():
|
||||
result3 = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": config_entries.SOURCE_DHCP},
|
||||
data=dhcp.DhcpServiceInfo(
|
||||
hostname="any",
|
||||
ip=MOCK_IP_ADDRESS,
|
||||
macaddress="00:00:00:00:00:00",
|
||||
),
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
assert result3["type"] == RESULT_TYPE_ABORT
|
||||
assert result3["reason"] == "already_in_progress"
|
||||
|
||||
|
||||
async def test_discovered_by_discovery(hass):
|
||||
"""Test we can setup when discovered from discovery."""
|
||||
|
||||
with _patch_discovery(), _patch_elk():
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": config_entries.SOURCE_DISCOVERY},
|
||||
data=ELK_DISCOVERY_INFO,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == RESULT_TYPE_FORM
|
||||
assert result["step_id"] == "discovered_connection"
|
||||
assert result["errors"] == {}
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
|
||||
|
||||
with _patch_discovery(), _patch_elk(elk=mocked_elk), patch(
|
||||
"homeassistant.components.elkm1.async_setup", return_value=True
|
||||
) as mock_setup, patch(
|
||||
"homeassistant.components.elkm1.async_setup_entry",
|
||||
return_value=True,
|
||||
) as mock_setup_entry:
|
||||
result2 = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
"username": "test-username",
|
||||
"password": "test-password",
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result2["type"] == "create_entry"
|
||||
assert result2["title"] == "ElkM1 ddeeff"
|
||||
assert result2["data"] == {
|
||||
"auto_configure": True,
|
||||
"host": "elks://127.0.0.1:2601",
|
||||
"password": "test-password",
|
||||
"prefix": "",
|
||||
"username": "test-username",
|
||||
}
|
||||
assert len(mock_setup.mock_calls) == 1
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_discovered_by_discovery_url_already_configured(hass):
|
||||
"""Test we abort when we discover a device that is already setup."""
|
||||
config_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={CONF_HOST: f"elks://{MOCK_IP_ADDRESS}"},
|
||||
unique_id="cc:cc:cc:cc:cc:cc",
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
with _patch_discovery(), _patch_elk():
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": config_entries.SOURCE_DISCOVERY},
|
||||
data=ELK_DISCOVERY_INFO,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == RESULT_TYPE_ABORT
|
||||
assert result["reason"] == "already_configured"
|
||||
|
||||
|
||||
async def test_discovered_by_dhcp_udp_responds(hass):
|
||||
"""Test we can setup when discovered from dhcp but with udp response."""
|
||||
|
||||
with _patch_discovery(), _patch_elk():
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_DHCP}, data=DHCP_DISCOVERY
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == RESULT_TYPE_FORM
|
||||
assert result["step_id"] == "discovered_connection"
|
||||
assert result["errors"] == {}
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
|
||||
|
||||
with _patch_discovery(), _patch_elk(elk=mocked_elk), patch(
|
||||
"homeassistant.components.elkm1.async_setup", return_value=True
|
||||
) as mock_setup, patch(
|
||||
"homeassistant.components.elkm1.async_setup_entry",
|
||||
return_value=True,
|
||||
) as mock_setup_entry:
|
||||
result2 = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
"username": "test-username",
|
||||
"password": "test-password",
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result2["type"] == "create_entry"
|
||||
assert result2["title"] == "ElkM1 ddeeff"
|
||||
assert result2["data"] == {
|
||||
"auto_configure": True,
|
||||
"host": "elks://127.0.0.1:2601",
|
||||
"password": "test-password",
|
||||
"prefix": "",
|
||||
"username": "test-username",
|
||||
}
|
||||
assert len(mock_setup.mock_calls) == 1
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_discovered_by_dhcp_udp_responds_with_nonsecure_port(hass):
|
||||
"""Test we can setup when discovered from dhcp but with udp response using the non-secure port."""
|
||||
|
||||
with _patch_discovery(device=ELK_NON_SECURE_DISCOVERY), _patch_elk():
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_DHCP}, data=DHCP_DISCOVERY
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == RESULT_TYPE_FORM
|
||||
assert result["step_id"] == "discovered_connection"
|
||||
assert result["errors"] == {}
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
|
||||
|
||||
with _patch_discovery(device=ELK_NON_SECURE_DISCOVERY), _patch_elk(
|
||||
elk=mocked_elk
|
||||
), patch(
|
||||
"homeassistant.components.elkm1.async_setup", return_value=True
|
||||
) as mock_setup, patch(
|
||||
"homeassistant.components.elkm1.async_setup_entry",
|
||||
return_value=True,
|
||||
) as mock_setup_entry:
|
||||
result2 = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
"username": "test-username",
|
||||
"password": "test-password",
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result2["type"] == "create_entry"
|
||||
assert result2["title"] == "ElkM1 ddeeff"
|
||||
assert result2["data"] == {
|
||||
"auto_configure": True,
|
||||
"host": "elk://127.0.0.1:2101",
|
||||
"password": "test-password",
|
||||
"prefix": "",
|
||||
"username": "test-username",
|
||||
}
|
||||
assert len(mock_setup.mock_calls) == 1
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_discovered_by_dhcp_udp_responds_existing_config_entry(hass):
|
||||
"""Test we can setup when discovered from dhcp but with udp response with an existing config entry."""
|
||||
config_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={CONF_HOST: "elks://6.6.6.6"},
|
||||
unique_id="cc:cc:cc:cc:cc:cc",
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
with _patch_discovery(), _patch_elk():
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_DHCP}, data=DHCP_DISCOVERY
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == RESULT_TYPE_FORM
|
||||
assert result["step_id"] == "discovered_connection"
|
||||
assert result["errors"] == {}
|
||||
|
||||
mocked_elk = mock_elk(invalid_auth=False, sync_complete=True)
|
||||
|
||||
with _patch_discovery(), _patch_elk(elk=mocked_elk), patch(
|
||||
"homeassistant.components.elkm1.async_setup", return_value=True
|
||||
) as mock_setup, patch(
|
||||
"homeassistant.components.elkm1.async_setup_entry",
|
||||
return_value=True,
|
||||
) as mock_setup_entry:
|
||||
result2 = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
"username": "test-username",
|
||||
"password": "test-password",
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result2["type"] == "create_entry"
|
||||
assert result2["title"] == "ElkM1 ddeeff"
|
||||
assert result2["data"] == {
|
||||
"auto_configure": True,
|
||||
"host": "elks://127.0.0.1:2601",
|
||||
"password": "test-password",
|
||||
"prefix": "ddeeff",
|
||||
"username": "test-username",
|
||||
}
|
||||
assert len(mock_setup.mock_calls) == 1
|
||||
assert len(mock_setup_entry.mock_calls) == 2
|
||||
|
||||
|
||||
async def test_discovered_by_dhcp_no_udp_response(hass):
|
||||
"""Test we can setup when discovered from dhcp but no udp response."""
|
||||
|
||||
with _patch_discovery(no_device=True), _patch_elk():
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_DHCP}, data=DHCP_DISCOVERY
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == RESULT_TYPE_ABORT
|
||||
assert result["reason"] == "cannot_connect"
|
||||
|
|
Loading…
Reference in New Issue