Add NASweb integration (#98118)

* Add NASweb integration

* Fix DeviceInfo import

* Remove commented out code

* Change class name for uniquness

* Drop CoordinatorEntity inheritance

* Rename class Output to more descriptive: RelaySwitch

* Update required webio-api version

* Implement on-the-fly addition/removal of entities

* Set coordinator name matching device name

* Set entities with too old status as unavailable

* Drop Optional in favor of modern typing

* Fix spelling of a variable

* Rename commons to more fitting name: helper

* Remove redundant code

* Let unload fail when there is no coordinator

* Fix bad docstring

* Rename cord to coordinator for clarity

* Remove default value for pop and let it raise exception

* Drop workaround and use get_url from helper.network

* Use webhook to send data from device

* Deinitialize coordinator when no longer needed

* Use Python formattable string

* Use dataclass to store integration data in hass.data

* Raise ConfigEntryNotReady when appropriate

* Refactor NASwebData class

* Move RelaySwitch to switch.py

* Fix ConfigFlow tests

* Create issues when entry fails to load

* Respond when correctly received status update

* Depend on webhook instead of http

* Create issue when status is not received during entry set up

* Make issue_id unique across integration entries

* Remove unnecessary initializations

* Inherit CoordinatorEntity to avoid code duplication

* Optimize property access via assignment in __init__

* Use preexisting mechanism to fill schema with user input

* Fix translation strings

* Handle unavailable or unreachable internal url

* Implement custom coordinator for push driven data updates

* Move module-specific constants to respective modules

* Fix requirements_all.txt

* Fix CODEOWNERS file

* Raise ConfigEntryError instead of issue creation

* Fix entity registry import

* Use HassKey as key in hass.data

* Use typed ConfigEntry

* Store runtime data in config entry

* Rewrite to be more Pythonic

* Move add/remove of switch entities to switch.py

* Skip unnecessary check

* Remove unnecessary type hints

* Remove unnecessary nonlocal

* Use a more descriptive docstring

* Add docstrings to NASwebCoordinator

* Fix formatting

* Use correct return type

* Fix tests to align with changed code

* Remove commented code

* Use serial number as config entry id

* Catch AbortFlow exception

* Update tests to check ConfigEntry Unique ID

* Remove unnecessary form abort
pull/130127/head
nasWebio 2024-11-08 12:03:32 +01:00 committed by GitHub
parent 5d5908a03f
commit ed1366f463
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 1017 additions and 0 deletions

View File

@ -330,6 +330,7 @@ homeassistant.components.mysensors.*
homeassistant.components.myuplink.*
homeassistant.components.nam.*
homeassistant.components.nanoleaf.*
homeassistant.components.nasweb.*
homeassistant.components.neato.*
homeassistant.components.nest.*
homeassistant.components.netatmo.*

View File

@ -970,6 +970,8 @@ build.json @home-assistant/supervisor
/tests/components/nam/ @bieniu
/homeassistant/components/nanoleaf/ @milanmeu @joostlek
/tests/components/nanoleaf/ @milanmeu @joostlek
/homeassistant/components/nasweb/ @nasWebio
/tests/components/nasweb/ @nasWebio
/homeassistant/components/neato/ @Santobert
/tests/components/neato/ @Santobert
/homeassistant/components/nederlandse_spoorwegen/ @YarmoM

View File

@ -0,0 +1,125 @@
"""The NASweb integration."""
from __future__ import annotations
import logging
from webio_api import WebioAPI
from webio_api.api_client import AuthError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.network import NoURLAvailableError
from homeassistant.util.hass_dict import HassKey
from .const import DOMAIN, MANUFACTURER, SUPPORT_EMAIL
from .coordinator import NASwebCoordinator
from .nasweb_data import NASwebData
PLATFORMS: list[Platform] = [Platform.SWITCH]
NASWEB_CONFIG_URL = "https://{host}/page"
_LOGGER = logging.getLogger(__name__)
type NASwebConfigEntry = ConfigEntry[NASwebCoordinator]
DATA_NASWEB: HassKey[NASwebData] = HassKey(DOMAIN)
async def async_setup_entry(hass: HomeAssistant, entry: NASwebConfigEntry) -> bool:
"""Set up NASweb from a config entry."""
if DATA_NASWEB not in hass.data:
data = NASwebData()
data.initialize(hass)
hass.data[DATA_NASWEB] = data
nasweb_data = hass.data[DATA_NASWEB]
webio_api = WebioAPI(
entry.data[CONF_HOST], entry.data[CONF_USERNAME], entry.data[CONF_PASSWORD]
)
try:
if not await webio_api.check_connection():
raise ConfigEntryNotReady(
f"[{entry.data[CONF_HOST]}] Check connection failed"
)
if not await webio_api.refresh_device_info():
_LOGGER.error("[%s] Refresh device info failed", entry.data[CONF_HOST])
raise ConfigEntryError(
translation_key="config_entry_error_internal_error",
translation_placeholders={"support_email": SUPPORT_EMAIL},
)
webio_serial = webio_api.get_serial_number()
if webio_serial is None:
_LOGGER.error("[%s] Serial number not available", entry.data[CONF_HOST])
raise ConfigEntryError(
translation_key="config_entry_error_internal_error",
translation_placeholders={"support_email": SUPPORT_EMAIL},
)
if entry.unique_id != webio_serial:
_LOGGER.error(
"[%s] Serial number doesn't match config entry", entry.data[CONF_HOST]
)
raise ConfigEntryError(translation_key="config_entry_error_serial_mismatch")
coordinator = NASwebCoordinator(
hass, webio_api, name=f"NASweb[{webio_api.get_name()}]"
)
entry.runtime_data = coordinator
nasweb_data.notify_coordinator.add_coordinator(webio_serial, entry.runtime_data)
webhook_url = nasweb_data.get_webhook_url(hass)
if not await webio_api.status_subscription(webhook_url, True):
_LOGGER.error("Failed to subscribe for status updates from webio")
raise ConfigEntryError(
translation_key="config_entry_error_internal_error",
translation_placeholders={"support_email": SUPPORT_EMAIL},
)
if not await nasweb_data.notify_coordinator.check_connection(webio_serial):
_LOGGER.error("Did not receive status from device")
raise ConfigEntryError(
translation_key="config_entry_error_no_status_update",
translation_placeholders={"support_email": SUPPORT_EMAIL},
)
except TimeoutError as error:
raise ConfigEntryNotReady(
f"[{entry.data[CONF_HOST]}] Check connection reached timeout"
) from error
except AuthError as error:
raise ConfigEntryError(
translation_key="config_entry_error_invalid_authentication"
) from error
except NoURLAvailableError as error:
raise ConfigEntryError(
translation_key="config_entry_error_missing_internal_url"
) from error
device_registry = dr.async_get(hass)
device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
identifiers={(DOMAIN, webio_serial)},
manufacturer=MANUFACTURER,
name=webio_api.get_name(),
configuration_url=NASWEB_CONFIG_URL.format(host=entry.data[CONF_HOST]),
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: NASwebConfigEntry) -> bool:
"""Unload a config entry."""
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
nasweb_data = hass.data[DATA_NASWEB]
coordinator = entry.runtime_data
serial = entry.unique_id
if serial is not None:
nasweb_data.notify_coordinator.remove_coordinator(serial)
if nasweb_data.can_be_deinitialized():
nasweb_data.deinitialize(hass)
hass.data.pop(DATA_NASWEB)
webhook_url = nasweb_data.get_webhook_url(hass)
await coordinator.webio_api.status_subscription(webhook_url, False)
return unload_ok

View File

@ -0,0 +1,137 @@
"""Config flow for NASweb integration."""
from __future__ import annotations
import logging
from typing import Any
import voluptuous as vol
from webio_api import WebioAPI
from webio_api.api_client import AuthError
from homeassistant import config_entries
from homeassistant.config_entries import ConfigFlowResult
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_UNIQUE_ID, CONF_USERNAME
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import AbortFlow
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.network import NoURLAvailableError
from .const import DOMAIN
from .coordinator import NASwebCoordinator
from .nasweb_data import NASwebData
NASWEB_SCHEMA_IMG_URL = (
"https://home-assistant.io/images/integrations/nasweb/nasweb_scheme.png"
)
_LOGGER = logging.getLogger(__name__)
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Required(CONF_USERNAME): str,
vol.Required(CONF_PASSWORD): str,
}
)
async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str, Any]:
"""Validate user-provided data."""
webio_api = WebioAPI(data[CONF_HOST], data[CONF_USERNAME], data[CONF_PASSWORD])
if not await webio_api.check_connection():
raise CannotConnect
try:
await webio_api.refresh_device_info()
except AuthError as e:
raise InvalidAuth from e
nasweb_data = NASwebData()
nasweb_data.initialize(hass)
try:
webio_serial = webio_api.get_serial_number()
if webio_serial is None:
raise MissingNASwebData("Device serial number is not available")
coordinator = NASwebCoordinator(hass, webio_api)
webhook_url = nasweb_data.get_webhook_url(hass)
nasweb_data.notify_coordinator.add_coordinator(webio_serial, coordinator)
subscription = await webio_api.status_subscription(webhook_url, True)
if not subscription:
nasweb_data.notify_coordinator.remove_coordinator(webio_serial)
raise MissingNASwebData(
"Failed to subscribe for status updates from device"
)
result = await nasweb_data.notify_coordinator.check_connection(webio_serial)
nasweb_data.notify_coordinator.remove_coordinator(webio_serial)
if not result:
if subscription:
await webio_api.status_subscription(webhook_url, False)
raise MissingNASwebStatus("Did not receive status from device")
name = webio_api.get_name()
finally:
nasweb_data.deinitialize(hass)
return {"title": name, CONF_UNIQUE_ID: webio_serial}
class NASwebConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for NASweb."""
VERSION = 1
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
errors: dict[str, str] = {}
if user_input is not None:
try:
info = await validate_input(self.hass, user_input)
await self.async_set_unique_id(info[CONF_UNIQUE_ID])
self._abort_if_unique_id_configured()
except CannotConnect:
errors["base"] = "cannot_connect"
except InvalidAuth:
errors["base"] = "invalid_auth"
except NoURLAvailableError:
errors["base"] = "missing_internal_url"
except MissingNASwebData:
errors["base"] = "missing_nasweb_data"
except MissingNASwebStatus:
errors["base"] = "missing_status"
except AbortFlow:
raise
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
return self.async_create_entry(title=info["title"], data=user_input)
return self.async_show_form(
step_id="user",
data_schema=self.add_suggested_values_to_schema(
STEP_USER_DATA_SCHEMA, user_input
),
errors=errors,
description_placeholders={
"nasweb_schema_img": '<img src="' + NASWEB_SCHEMA_IMG_URL + '"/><br>',
},
)
class CannotConnect(HomeAssistantError):
"""Error to indicate we cannot connect."""
class InvalidAuth(HomeAssistantError):
"""Error to indicate there is invalid auth."""
class MissingNASwebData(HomeAssistantError):
"""Error to indicate missing information from NASweb."""
class MissingNASwebStatus(HomeAssistantError):
"""Error to indicate there was no status received from NASweb."""

View File

@ -0,0 +1,7 @@
"""Constants for the NASweb integration."""
DOMAIN = "nasweb"
MANUFACTURER = "chomtech.pl"
STATUS_UPDATE_MAX_TIME_INTERVAL = 60
SUPPORT_EMAIL = "support@chomtech.eu"
WEBHOOK_URL = "{internal_url}/api/webhook/{webhook_id}"

View File

@ -0,0 +1,191 @@
"""Message routing coordinators for handling NASweb push notifications."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from datetime import datetime, timedelta
import logging
import time
from typing import Any
from aiohttp.web import Request, Response
from webio_api import WebioAPI
from webio_api.const import KEY_DEVICE_SERIAL, KEY_OUTPUTS, KEY_TYPE, TYPE_STATUS_UPDATE
from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback
from homeassistant.helpers import event
from homeassistant.helpers.update_coordinator import BaseDataUpdateCoordinatorProtocol
from .const import STATUS_UPDATE_MAX_TIME_INTERVAL
_LOGGER = logging.getLogger(__name__)
class NotificationCoordinator:
"""Coordinator redirecting push notifications for this integration to appropriate NASwebCoordinator."""
def __init__(self) -> None:
"""Initialize coordinator."""
self._coordinators: dict[str, NASwebCoordinator] = {}
def add_coordinator(self, serial: str, coordinator: NASwebCoordinator) -> None:
"""Add NASwebCoordinator to possible notification targets."""
self._coordinators[serial] = coordinator
_LOGGER.debug("Added NASwebCoordinator for NASweb[%s]", serial)
def remove_coordinator(self, serial: str) -> None:
"""Remove NASwebCoordinator from possible notification targets."""
self._coordinators.pop(serial)
_LOGGER.debug("Removed NASwebCoordinator for NASweb[%s]", serial)
def has_coordinators(self) -> bool:
"""Check if there is any registered coordinator for push notifications."""
return len(self._coordinators) > 0
async def check_connection(self, serial: str) -> bool:
"""Wait for first status update to confirm connection with NASweb."""
nasweb_coordinator = self._coordinators.get(serial)
if nasweb_coordinator is None:
_LOGGER.error("Cannot check connection. No device match serial number")
return False
for counter in range(10):
_LOGGER.debug("Checking connection with: %s (%s)", serial, counter)
if nasweb_coordinator.is_connection_confirmed():
return True
await asyncio.sleep(1)
return False
async def handle_webhook_request(
self, hass: HomeAssistant, webhook_id: str, request: Request
) -> Response | None:
"""Handle webhook request from Push API."""
if not self.has_coordinators():
return None
notification = await request.json()
serial = notification.get(KEY_DEVICE_SERIAL, None)
_LOGGER.debug("Received push: %s", notification)
if serial is None:
_LOGGER.warning("Received notification without nasweb identifier")
return None
nasweb_coordinator = self._coordinators.get(serial)
if nasweb_coordinator is None:
_LOGGER.warning("Received notification for not registered nasweb")
return None
await nasweb_coordinator.handle_push_notification(notification)
return Response(body='{"response": "ok"}', content_type="application/json")
class NASwebCoordinator(BaseDataUpdateCoordinatorProtocol):
"""Coordinator managing status of single NASweb device.
Since status updates are managed through push notifications, this class schedules
periodic checks to ensure that devices are marked unavailable if updates
haven't been received for a prolonged period.
"""
def __init__(
self, hass: HomeAssistant, webio_api: WebioAPI, name: str = "NASweb[default]"
) -> None:
"""Initialize NASweb coordinator."""
self._hass = hass
self.name = name
self.webio_api = webio_api
self._last_update: float | None = None
job_name = f"NASwebCoordinator[{name}]"
self._job = HassJob(self._handle_max_update_interval, job_name)
self._unsub_last_update_check: CALLBACK_TYPE | None = None
self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object | None]] = {}
data: dict[str, Any] = {}
data[KEY_OUTPUTS] = self.webio_api.outputs
self.async_set_updated_data(data)
def is_connection_confirmed(self) -> bool:
"""Check whether coordinator received status update from NASweb."""
return self._last_update is not None
@callback
def async_add_listener(
self, update_callback: CALLBACK_TYPE, context: Any = None
) -> Callable[[], None]:
"""Listen for data updates."""
schedule_update_check = not self._listeners
@callback
def remove_listener() -> None:
"""Remove update listener."""
self._listeners.pop(remove_listener)
if not self._listeners:
self._async_unsub_last_update_check()
self._listeners[remove_listener] = (update_callback, context)
# This is the first listener, set up interval.
if schedule_update_check:
self._schedule_last_update_check()
return remove_listener
@callback
def async_set_updated_data(self, data: dict[str, Any]) -> None:
"""Update data and notify listeners."""
self.data = data
self.last_update = self._hass.loop.time()
_LOGGER.debug("Updated %s data", self.name)
if self._listeners:
self._schedule_last_update_check()
self.async_update_listeners()
@callback
def async_update_listeners(self) -> None:
"""Update all registered listeners."""
for update_callback, _ in list(self._listeners.values()):
update_callback()
async def _handle_max_update_interval(self, now: datetime) -> None:
"""Handle max update interval occurrence.
This method is called when `STATUS_UPDATE_MAX_TIME_INTERVAL` has passed without
receiving a status update. It only needs to trigger state update of entities
which then change their state accordingly.
"""
self._unsub_last_update_check = None
if self._listeners:
self.async_update_listeners()
def _schedule_last_update_check(self) -> None:
"""Schedule a task to trigger entities state update after `STATUS_UPDATE_MAX_TIME_INTERVAL`.
This method schedules a task (`_handle_max_update_interval`) to be executed after
`STATUS_UPDATE_MAX_TIME_INTERVAL` seconds without status update, which enables entities
to change their state to unavailable. After each status update this task is rescheduled.
"""
self._async_unsub_last_update_check()
now = self._hass.loop.time()
next_check = (
now + timedelta(seconds=STATUS_UPDATE_MAX_TIME_INTERVAL).total_seconds()
)
self._unsub_last_update_check = event.async_call_at(
self._hass,
self._job,
next_check,
)
def _async_unsub_last_update_check(self) -> None:
"""Cancel any scheduled update check call."""
if self._unsub_last_update_check:
self._unsub_last_update_check()
self._unsub_last_update_check = None
async def handle_push_notification(self, notification: dict) -> None:
"""Handle incoming push notification from NASweb."""
msg_type = notification.get(KEY_TYPE)
_LOGGER.debug("Received push notification: %s", msg_type)
if msg_type == TYPE_STATUS_UPDATE:
await self.process_status_update(notification)
self._last_update = time.time()
async def process_status_update(self, new_status: dict) -> None:
"""Process status update from NASweb."""
self.webio_api.update_device_status(new_status)
new_data = {KEY_OUTPUTS: self.webio_api.outputs}
self.async_set_updated_data(new_data)

View File

@ -0,0 +1,14 @@
{
"domain": "nasweb",
"name": "NASweb",
"codeowners": ["@nasWebio"],
"config_flow": true,
"dependencies": ["webhook"],
"documentation": "https://www.home-assistant.io/integrations/nasweb",
"homekit": {},
"integration_type": "hub",
"iot_class": "local_push",
"requirements": ["webio-api==0.1.8"],
"ssdp": [],
"zeroconf": []
}

View File

@ -0,0 +1,64 @@
"""Dataclass storing integration data in hass.data[DOMAIN]."""
from dataclasses import dataclass, field
import logging
from aiohttp.hdrs import METH_POST
from homeassistant.components.webhook import (
async_generate_id,
async_register as webhook_register,
async_unregister as webhook_unregister,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.network import get_url
from .const import DOMAIN, WEBHOOK_URL
from .coordinator import NotificationCoordinator
_LOGGER = logging.getLogger(__name__)
@dataclass
class NASwebData:
"""Class storing integration data."""
notify_coordinator: NotificationCoordinator = field(
default_factory=NotificationCoordinator
)
webhook_id = ""
def is_initialized(self) -> bool:
"""Return True if instance was initialized and is ready for use."""
return bool(self.webhook_id)
def can_be_deinitialized(self) -> bool:
"""Return whether this instance can be deinitialized."""
return not self.notify_coordinator.has_coordinators()
def initialize(self, hass: HomeAssistant) -> None:
"""Initialize NASwebData instance."""
if self.is_initialized():
return
new_webhook_id = async_generate_id()
webhook_register(
hass,
DOMAIN,
"NASweb",
new_webhook_id,
self.notify_coordinator.handle_webhook_request,
allowed_methods=[METH_POST],
)
self.webhook_id = new_webhook_id
_LOGGER.debug("Registered webhook: %s", self.webhook_id)
def deinitialize(self, hass: HomeAssistant) -> None:
"""Deinitialize NASwebData instance."""
if not self.is_initialized():
return
webhook_unregister(hass, self.webhook_id)
def get_webhook_url(self, hass: HomeAssistant) -> str:
"""Return webhook url for Push API."""
hass_url = get_url(hass, allow_external=False)
return WEBHOOK_URL.format(internal_url=hass_url, webhook_id=self.webhook_id)

View File

@ -0,0 +1,50 @@
{
"config": {
"step": {
"user": {
"title": "Add NASweb device",
"description": "{nasweb_schema_img}NASweb combines the functions of a control panel and the ability to manage building automation. The device monitors the flow of information from sensors and programmable switches and stores settings, definitions and configured actions.",
"data": {
"host": "[%key:common::config_flow::data::host%]",
"username": "[%key:common::config_flow::data::username%]",
"password": "[%key:common::config_flow::data::password%]"
}
}
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"missing_internal_url": "Make sure Home Assistant has valid internal url",
"missing_nasweb_data": "Something isn't right with device internal configuration. Try restarting the device and HomeAssistant.",
"missing_status": "Did not received any status updates within the expected time window. Make sure the Home Assistant Internal URL is reachable from the NASweb device.",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
}
},
"exceptions": {
"config_entry_error_invalid_authentication": {
"message": "Invalid username/password. Most likely user changed password or was removed. Delete this entry and create new one with correct username/password."
},
"config_entry_error_internal_error": {
"message": "Something isn't right with device internal configuration. Try restarting the device and HomeAssistant. If the issue persists contact support at {support_email}"
},
"config_entry_error_no_status_update": {
"message": "Did not received any status updates within the expected time window. Make sure the Home Assistant Internal URL is reachable from the NASweb device. If the issue persists contact support at {support_email}"
},
"config_entry_error_missing_internal_url": {
"message": "[%key:component::nasweb::config::error::missing_internal_url%]"
},
"serial_mismatch": {
"message": "Connected to different NASweb device (serial number mismatch)."
}
},
"entity": {
"switch": {
"switch_output": {
"name": "Relay Switch {index}"
}
}
}
}

View File

@ -0,0 +1,133 @@
"""Platform for NASweb output."""
from __future__ import annotations
import logging
import time
from typing import Any
from webio_api import Output as NASwebOutput
from homeassistant.components.switch import DOMAIN as DOMAIN_SWITCH, SwitchEntity
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
import homeassistant.helpers.entity_registry as er
from homeassistant.helpers.typing import DiscoveryInfoType
from homeassistant.helpers.update_coordinator import (
BaseCoordinatorEntity,
BaseDataUpdateCoordinatorProtocol,
)
from . import NASwebConfigEntry
from .const import DOMAIN, STATUS_UPDATE_MAX_TIME_INTERVAL
from .coordinator import NASwebCoordinator
OUTPUT_TRANSLATION_KEY = "switch_output"
_LOGGER = logging.getLogger(__name__)
def _get_output(coordinator: NASwebCoordinator, index: int) -> NASwebOutput | None:
for out in coordinator.webio_api.outputs:
if out.index == index:
return out
return None
async def async_setup_entry(
hass: HomeAssistant,
config: NASwebConfigEntry,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up switch platform."""
coordinator = config.runtime_data
current_outputs: set[int] = set()
@callback
def _check_entities() -> None:
received_outputs = {out.index for out in coordinator.webio_api.outputs}
added = {i for i in received_outputs if i not in current_outputs}
removed = {i for i in current_outputs if i not in received_outputs}
entities_to_add: list[RelaySwitch] = []
for index in added:
webio_output = _get_output(coordinator, index)
if not isinstance(webio_output, NASwebOutput):
_LOGGER.error("Cannot create RelaySwitch entity without NASwebOutput")
continue
new_output = RelaySwitch(coordinator, webio_output)
entities_to_add.append(new_output)
current_outputs.add(index)
async_add_entities(entities_to_add)
entity_registry = er.async_get(hass)
for index in removed:
unique_id = f"{DOMAIN}.{config.unique_id}.relay_switch.{index}"
if entity_id := entity_registry.async_get_entity_id(
DOMAIN_SWITCH, DOMAIN, unique_id
):
entity_registry.async_remove(entity_id)
current_outputs.remove(index)
else:
_LOGGER.warning("Failed to remove old output: no entity_id")
coordinator.async_add_listener(_check_entities)
_check_entities()
class RelaySwitch(SwitchEntity, BaseCoordinatorEntity):
"""Entity representing NASweb Output."""
def __init__(
self,
coordinator: BaseDataUpdateCoordinatorProtocol,
nasweb_output: NASwebOutput,
) -> None:
"""Initialize RelaySwitch."""
super().__init__(coordinator)
self._output = nasweb_output
self._attr_icon = "mdi:export"
self._attr_has_entity_name = True
self._attr_translation_key = OUTPUT_TRANSLATION_KEY
self._attr_translation_placeholders = {"index": f"{nasweb_output.index:2d}"}
self._attr_unique_id = (
f"{DOMAIN}.{self._output.webio_serial}.relay_switch.{self._output.index}"
)
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, self._output.webio_serial)},
)
async def async_added_to_hass(self) -> None:
"""When entity is added to hass."""
await super().async_added_to_hass()
self._handle_coordinator_update()
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self._attr_is_on = self._output.state
if (
self.coordinator.last_update is None
or time.time() - self._output.last_update >= STATUS_UPDATE_MAX_TIME_INTERVAL
):
self._attr_available = False
else:
self._attr_available = (
self._output.available if self._output.available is not None else False
)
self.async_write_ha_state()
async def async_update(self) -> None:
"""Update the entity.
Only used by the generic entity update service.
Scheduling updates is not necessary, the coordinator takes care of updates via push notifications.
"""
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn On RelaySwitch."""
await self._output.turn_on()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn Off RelaySwitch."""
await self._output.turn_off()

View File

@ -391,6 +391,7 @@ FLOWS = {
"myuplink",
"nam",
"nanoleaf",
"nasweb",
"neato",
"nest",
"netatmo",

View File

@ -4016,6 +4016,12 @@
"config_flow": true,
"iot_class": "local_push"
},
"nasweb": {
"name": "NASweb",
"integration_type": "hub",
"config_flow": true,
"iot_class": "local_push"
},
"neato": {
"name": "Neato Botvac",
"integration_type": "hub",

View File

@ -3056,6 +3056,16 @@ disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.nasweb.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.neato.*]
check_untyped_defs = true
disallow_incomplete_defs = true

View File

@ -2977,6 +2977,9 @@ weatherflow4py==1.0.6
# homeassistant.components.cisco_webex_teams
webexpythonsdk==2.0.1
# homeassistant.components.nasweb
webio-api==0.1.8
# homeassistant.components.webmin
webmin-xmlrpc==0.0.2

View File

@ -2372,6 +2372,9 @@ watchdog==2.3.1
# homeassistant.components.weatherflow_cloud
weatherflow4py==1.0.6
# homeassistant.components.nasweb
webio-api==0.1.8
# homeassistant.components.webmin
webmin-xmlrpc==0.0.2

View File

@ -0,0 +1 @@
"""Tests for the NASweb integration."""

View File

@ -0,0 +1,61 @@
"""Common fixtures for the NASweb tests."""
from collections.abc import Generator
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@pytest.fixture
def mock_setup_entry() -> Generator[AsyncMock]:
"""Override async_setup_entry."""
with patch(
"homeassistant.components.nasweb.async_setup_entry", return_value=True
) as mock_setup_entry:
yield mock_setup_entry
BASE_CONFIG_FLOW = "homeassistant.components.nasweb.config_flow."
BASE_NASWEB_DATA = "homeassistant.components.nasweb.nasweb_data."
BASE_COORDINATOR = "homeassistant.components.nasweb.coordinator."
TEST_SERIAL_NUMBER = "0011223344556677"
@pytest.fixture
def validate_input_all_ok() -> Generator[dict[str, AsyncMock | MagicMock]]:
"""Yield dictionary of mocked functions required for successful test_form execution."""
with (
patch(
BASE_CONFIG_FLOW + "WebioAPI.check_connection",
return_value=True,
) as check_connection,
patch(
BASE_CONFIG_FLOW + "WebioAPI.refresh_device_info",
return_value=True,
) as refresh_device_info,
patch(
BASE_NASWEB_DATA + "NASwebData.get_webhook_url",
return_value="http://127.0.0.1:8123/api/webhook/de705e77291402afa0dd961426e9f19bb53631a9f2a106c52cfd2d2266913c04",
) as get_webhook_url,
patch(
BASE_CONFIG_FLOW + "WebioAPI.get_serial_number",
return_value=TEST_SERIAL_NUMBER,
) as get_serial,
patch(
BASE_CONFIG_FLOW + "WebioAPI.status_subscription",
return_value=True,
) as status_subscription,
patch(
BASE_NASWEB_DATA + "NotificationCoordinator.check_connection",
return_value=True,
) as check_status_confirmation,
):
yield {
BASE_CONFIG_FLOW + "WebioAPI.check_connection": check_connection,
BASE_CONFIG_FLOW + "WebioAPI.refresh_device_info": refresh_device_info,
BASE_NASWEB_DATA + "NASwebData.get_webhook_url": get_webhook_url,
BASE_CONFIG_FLOW + "WebioAPI.get_serial_number": get_serial,
BASE_CONFIG_FLOW + "WebioAPI.status_subscription": status_subscription,
BASE_NASWEB_DATA
+ "NotificationCoordinator.check_connection": check_status_confirmation,
}

View File

@ -0,0 +1,208 @@
"""Test the NASweb config flow."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from webio_api.api_client import AuthError
from homeassistant import config_entries
from homeassistant.components.nasweb.const import DOMAIN
from homeassistant.config_entries import ConfigFlowResult
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers.network import NoURLAvailableError
from .conftest import (
BASE_CONFIG_FLOW,
BASE_COORDINATOR,
BASE_NASWEB_DATA,
TEST_SERIAL_NUMBER,
)
pytestmark = pytest.mark.usefixtures("mock_setup_entry")
TEST_USER_INPUT = {
CONF_HOST: "1.1.1.1",
CONF_USERNAME: "test-username",
CONF_PASSWORD: "test-password",
}
async def _add_test_config_entry(hass: HomeAssistant) -> ConfigFlowResult:
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result.get("type") == FlowResultType.FORM
assert not result.get("errors")
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], TEST_USER_INPUT
)
await hass.async_block_till_done()
return result2
async def test_form(
hass: HomeAssistant,
mock_setup_entry: AsyncMock,
validate_input_all_ok: dict[str, AsyncMock | MagicMock],
) -> None:
"""Test the form."""
result = await _add_test_config_entry(hass)
assert result.get("type") == FlowResultType.CREATE_ENTRY
assert result.get("title") == "1.1.1.1"
assert result.get("data") == TEST_USER_INPUT
config_entry = result.get("result")
assert config_entry is not None
assert config_entry.unique_id == TEST_SERIAL_NUMBER
assert len(mock_setup_entry.mock_calls) == 1
async def test_form_cannot_connect(
hass: HomeAssistant,
validate_input_all_ok: dict[str, AsyncMock | MagicMock],
) -> None:
"""Test cannot connect error."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with patch(BASE_CONFIG_FLOW + "WebioAPI.check_connection", return_value=False):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], TEST_USER_INPUT
)
assert result2.get("type") == FlowResultType.FORM
assert result2.get("errors") == {"base": "cannot_connect"}
async def test_form_invalid_auth(
hass: HomeAssistant,
validate_input_all_ok: dict[str, AsyncMock | MagicMock],
) -> None:
"""Test invalid auth."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with patch(
BASE_CONFIG_FLOW + "WebioAPI.refresh_device_info",
side_effect=AuthError,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], TEST_USER_INPUT
)
assert result2.get("type") == FlowResultType.FORM
assert result2.get("errors") == {"base": "invalid_auth"}
async def test_form_missing_internal_url(
hass: HomeAssistant,
validate_input_all_ok: dict[str, AsyncMock | MagicMock],
) -> None:
"""Test missing internal url."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with patch(
BASE_NASWEB_DATA + "NASwebData.get_webhook_url", side_effect=NoURLAvailableError
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], TEST_USER_INPUT
)
assert result2.get("type") == FlowResultType.FORM
assert result2.get("errors") == {"base": "missing_internal_url"}
async def test_form_missing_nasweb_data(
hass: HomeAssistant,
validate_input_all_ok: dict[str, AsyncMock | MagicMock],
) -> None:
"""Test invalid auth."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with patch(
BASE_CONFIG_FLOW + "WebioAPI.get_serial_number",
return_value=None,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], TEST_USER_INPUT
)
assert result2.get("type") == FlowResultType.FORM
assert result2.get("errors") == {"base": "missing_nasweb_data"}
with patch(BASE_CONFIG_FLOW + "WebioAPI.status_subscription", return_value=False):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], TEST_USER_INPUT
)
assert result2.get("type") == FlowResultType.FORM
assert result2.get("errors") == {"base": "missing_nasweb_data"}
async def test_missing_status(
hass: HomeAssistant,
validate_input_all_ok: dict[str, AsyncMock | MagicMock],
) -> None:
"""Test missing status update."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with patch(
BASE_COORDINATOR + "NotificationCoordinator.check_connection",
return_value=False,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], TEST_USER_INPUT
)
assert result2.get("type") == FlowResultType.FORM
assert result2.get("errors") == {"base": "missing_status"}
async def test_form_exception(
hass: HomeAssistant,
validate_input_all_ok: dict[str, AsyncMock | MagicMock],
) -> None:
"""Test other exceptions."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
with patch(
"homeassistant.components.nasweb.config_flow.validate_input",
side_effect=Exception,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], TEST_USER_INPUT
)
assert result2.get("type") == FlowResultType.FORM
assert result2.get("errors") == {"base": "unknown"}
async def test_form_already_configured(
hass: HomeAssistant,
validate_input_all_ok: dict[str, AsyncMock | MagicMock],
) -> None:
"""Test already configured device."""
result = await _add_test_config_entry(hass)
config_entry = result.get("result")
assert config_entry is not None
assert config_entry.unique_id == TEST_SERIAL_NUMBER
result2_1 = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result2_2 = await hass.config_entries.flow.async_configure(
result2_1["flow_id"], TEST_USER_INPUT
)
await hass.async_block_till_done()
assert result2_2.get("type") == FlowResultType.ABORT
assert result2_2.get("reason") == "already_configured"