Add HTTP protocol support to AsusWRT (#95720)
parent
654c4b6e35
commit
b4797e283f
|
@ -9,6 +9,8 @@ import logging
|
|||
from typing import Any, TypeVar, cast
|
||||
|
||||
from aioasuswrt.asuswrt import AsusWrt as AsusWrtLegacy
|
||||
from aiohttp import ClientSession
|
||||
from pyasuswrt import AsusWrtError, AsusWrtHttp
|
||||
|
||||
from homeassistant.const import (
|
||||
CONF_HOST,
|
||||
|
@ -19,6 +21,7 @@ from homeassistant.const import (
|
|||
CONF_USERNAME,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from homeassistant.helpers.device_registry import format_mac
|
||||
from homeassistant.helpers.update_coordinator import UpdateFailed
|
||||
|
||||
|
@ -31,6 +34,8 @@ from .const import (
|
|||
DEFAULT_INTERFACE,
|
||||
KEY_METHOD,
|
||||
KEY_SENSORS,
|
||||
PROTOCOL_HTTP,
|
||||
PROTOCOL_HTTPS,
|
||||
PROTOCOL_TELNET,
|
||||
SENSORS_BYTES,
|
||||
SENSORS_LOAD_AVG,
|
||||
|
@ -74,6 +79,8 @@ def handle_errors_and_zip(
|
|||
raise UpdateFailed("Received invalid data type")
|
||||
return data
|
||||
|
||||
if isinstance(data, dict):
|
||||
return dict(zip(keys, list(data.values())))
|
||||
if not isinstance(data, list):
|
||||
raise UpdateFailed("Received invalid data type")
|
||||
return dict(zip(keys, data))
|
||||
|
@ -91,6 +98,9 @@ class AsusWrtBridge(ABC):
|
|||
hass: HomeAssistant, conf: dict[str, Any], options: dict[str, Any] | None = None
|
||||
) -> AsusWrtBridge:
|
||||
"""Get Bridge instance."""
|
||||
if conf[CONF_PROTOCOL] in (PROTOCOL_HTTPS, PROTOCOL_HTTP):
|
||||
session = async_get_clientsession(hass)
|
||||
return AsusWrtHttpBridge(conf, session)
|
||||
return AsusWrtLegacyBridge(conf, options)
|
||||
|
||||
def __init__(self, host: str) -> None:
|
||||
|
@ -286,3 +296,116 @@ class AsusWrtLegacyBridge(AsusWrtBridge):
|
|||
async def _get_temperatures(self) -> Any:
|
||||
"""Fetch temperatures information from the router."""
|
||||
return await self._api.async_get_temperature()
|
||||
|
||||
|
||||
class AsusWrtHttpBridge(AsusWrtBridge):
|
||||
"""The Bridge that use HTTP library."""
|
||||
|
||||
def __init__(self, conf: dict[str, Any], session: ClientSession) -> None:
|
||||
"""Initialize Bridge that use HTTP library."""
|
||||
super().__init__(conf[CONF_HOST])
|
||||
self._api: AsusWrtHttp = self._get_api(conf, session)
|
||||
|
||||
@staticmethod
|
||||
def _get_api(conf: dict[str, Any], session: ClientSession) -> AsusWrtHttp:
|
||||
"""Get the AsusWrtHttp API."""
|
||||
return AsusWrtHttp(
|
||||
conf[CONF_HOST],
|
||||
conf[CONF_USERNAME],
|
||||
conf.get(CONF_PASSWORD, ""),
|
||||
use_https=conf[CONF_PROTOCOL] == PROTOCOL_HTTPS,
|
||||
port=conf.get(CONF_PORT),
|
||||
session=session,
|
||||
)
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
"""Get connected status."""
|
||||
return cast(bool, self._api.is_connected)
|
||||
|
||||
async def async_connect(self) -> None:
|
||||
"""Connect to the device."""
|
||||
await self._api.async_connect()
|
||||
|
||||
# get main router properties
|
||||
if mac := self._api.mac:
|
||||
self._label_mac = format_mac(mac)
|
||||
self._firmware = self._api.firmware
|
||||
self._model = self._api.model
|
||||
|
||||
async def async_disconnect(self) -> None:
|
||||
"""Disconnect to the device."""
|
||||
await self._api.async_disconnect()
|
||||
|
||||
async def async_get_connected_devices(self) -> dict[str, WrtDevice]:
|
||||
"""Get list of connected devices."""
|
||||
try:
|
||||
api_devices = await self._api.async_get_connected_devices()
|
||||
except AsusWrtError as exc:
|
||||
raise UpdateFailed(exc) from exc
|
||||
return {
|
||||
format_mac(mac): WrtDevice(dev.ip, dev.name, dev.node)
|
||||
for mac, dev in api_devices.items()
|
||||
}
|
||||
|
||||
async def async_get_available_sensors(self) -> dict[str, dict[str, Any]]:
|
||||
"""Return a dictionary of available sensors for this bridge."""
|
||||
sensors_temperatures = await self._get_available_temperature_sensors()
|
||||
sensors_types = {
|
||||
SENSORS_TYPE_BYTES: {
|
||||
KEY_SENSORS: SENSORS_BYTES,
|
||||
KEY_METHOD: self._get_bytes,
|
||||
},
|
||||
SENSORS_TYPE_LOAD_AVG: {
|
||||
KEY_SENSORS: SENSORS_LOAD_AVG,
|
||||
KEY_METHOD: self._get_load_avg,
|
||||
},
|
||||
SENSORS_TYPE_RATES: {
|
||||
KEY_SENSORS: SENSORS_RATES,
|
||||
KEY_METHOD: self._get_rates,
|
||||
},
|
||||
SENSORS_TYPE_TEMPERATURES: {
|
||||
KEY_SENSORS: sensors_temperatures,
|
||||
KEY_METHOD: self._get_temperatures,
|
||||
},
|
||||
}
|
||||
return sensors_types
|
||||
|
||||
async def _get_available_temperature_sensors(self) -> list[str]:
|
||||
"""Check which temperature information is available on the router."""
|
||||
try:
|
||||
available_temps = await self._api.async_get_temperatures()
|
||||
available_sensors = [
|
||||
t for t in SENSORS_TEMPERATURES if t in available_temps
|
||||
]
|
||||
except AsusWrtError as exc:
|
||||
_LOGGER.warning(
|
||||
(
|
||||
"Failed checking temperature sensor availability for ASUS router"
|
||||
" %s. Exception: %s"
|
||||
),
|
||||
self.host,
|
||||
exc,
|
||||
)
|
||||
return []
|
||||
return available_sensors
|
||||
|
||||
@handle_errors_and_zip(AsusWrtError, SENSORS_BYTES)
|
||||
async def _get_bytes(self) -> Any:
|
||||
"""Fetch byte information from the router."""
|
||||
return await self._api.async_get_traffic_bytes()
|
||||
|
||||
@handle_errors_and_zip(AsusWrtError, SENSORS_RATES)
|
||||
async def _get_rates(self) -> Any:
|
||||
"""Fetch rates information from the router."""
|
||||
return await self._api.async_get_traffic_rates()
|
||||
|
||||
@handle_errors_and_zip(AsusWrtError, SENSORS_LOAD_AVG)
|
||||
async def _get_load_avg(self) -> Any:
|
||||
"""Fetch cpu load avg information from the router."""
|
||||
return await self._api.async_get_loadavg()
|
||||
|
||||
@handle_errors_and_zip(AsusWrtError, None)
|
||||
async def _get_temperatures(self) -> Any:
|
||||
"""Fetch temperatures information from the router."""
|
||||
return await self._api.async_get_temperatures()
|
||||
|
|
|
@ -7,6 +7,7 @@ import os
|
|||
import socket
|
||||
from typing import Any, cast
|
||||
|
||||
from pyasuswrt import AsusWrtError
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.device_tracker import (
|
||||
|
@ -15,6 +16,7 @@ from homeassistant.components.device_tracker import (
|
|||
)
|
||||
from homeassistant.config_entries import ConfigEntry, ConfigFlow
|
||||
from homeassistant.const import (
|
||||
CONF_BASE,
|
||||
CONF_HOST,
|
||||
CONF_MODE,
|
||||
CONF_PASSWORD,
|
||||
|
@ -30,6 +32,7 @@ from homeassistant.helpers.schema_config_entry_flow import (
|
|||
SchemaFlowFormStep,
|
||||
SchemaOptionsFlowHandler,
|
||||
)
|
||||
from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig
|
||||
|
||||
from .bridge import AsusWrtBridge
|
||||
from .const import (
|
||||
|
@ -44,11 +47,21 @@ from .const import (
|
|||
DOMAIN,
|
||||
MODE_AP,
|
||||
MODE_ROUTER,
|
||||
PROTOCOL_HTTP,
|
||||
PROTOCOL_HTTPS,
|
||||
PROTOCOL_SSH,
|
||||
PROTOCOL_TELNET,
|
||||
)
|
||||
|
||||
LABEL_MAC = "LABEL_MAC"
|
||||
ALLOWED_PROTOCOL = [
|
||||
PROTOCOL_HTTPS,
|
||||
PROTOCOL_SSH,
|
||||
PROTOCOL_HTTP,
|
||||
PROTOCOL_TELNET,
|
||||
]
|
||||
|
||||
PASS_KEY = "pass_key"
|
||||
PASS_KEY_MSG = "Only provide password or SSH key file"
|
||||
|
||||
RESULT_CONN_ERROR = "cannot_connect"
|
||||
RESULT_SUCCESS = "success"
|
||||
|
@ -56,14 +69,20 @@ RESULT_UNKNOWN = "unknown"
|
|||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
LEGACY_SCHEMA = vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_MODE, default=MODE_ROUTER): vol.In(
|
||||
{MODE_ROUTER: "Router", MODE_AP: "Access Point"}
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
OPTIONS_SCHEMA = vol.Schema(
|
||||
{
|
||||
vol.Optional(
|
||||
CONF_CONSIDER_HOME, default=DEFAULT_CONSIDER_HOME.total_seconds()
|
||||
): vol.All(vol.Coerce(int), vol.Clamp(min=0, max=900)),
|
||||
vol.Optional(CONF_TRACK_UNKNOWN, default=DEFAULT_TRACK_UNKNOWN): bool,
|
||||
vol.Required(CONF_INTERFACE, default=DEFAULT_INTERFACE): str,
|
||||
vol.Required(CONF_DNSMASQ, default=DEFAULT_DNSMASQ): str,
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -72,12 +91,22 @@ async def get_options_schema(handler: SchemaCommonFlowHandler) -> vol.Schema:
|
|||
"""Get options schema."""
|
||||
options_flow: SchemaOptionsFlowHandler
|
||||
options_flow = cast(SchemaOptionsFlowHandler, handler.parent_handler)
|
||||
if options_flow.config_entry.data[CONF_MODE] == MODE_AP:
|
||||
return OPTIONS_SCHEMA.extend(
|
||||
used_protocol = options_flow.config_entry.data[CONF_PROTOCOL]
|
||||
if used_protocol in [PROTOCOL_SSH, PROTOCOL_TELNET]:
|
||||
data_schema = OPTIONS_SCHEMA.extend(
|
||||
{
|
||||
vol.Optional(CONF_REQUIRE_IP, default=True): bool,
|
||||
vol.Required(CONF_INTERFACE, default=DEFAULT_INTERFACE): str,
|
||||
vol.Required(CONF_DNSMASQ, default=DEFAULT_DNSMASQ): str,
|
||||
}
|
||||
)
|
||||
if options_flow.config_entry.data[CONF_MODE] == MODE_AP:
|
||||
return data_schema.extend(
|
||||
{
|
||||
vol.Optional(CONF_REQUIRE_IP, default=True): bool,
|
||||
}
|
||||
)
|
||||
return data_schema
|
||||
|
||||
return OPTIONS_SCHEMA
|
||||
|
||||
|
||||
|
@ -101,45 +130,47 @@ def _get_ip(host: str) -> str | None:
|
|||
|
||||
|
||||
class AsusWrtFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
"""Handle a config flow."""
|
||||
"""Handle a config flow for AsusWRT."""
|
||||
|
||||
VERSION = 1
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize the AsusWrt config flow."""
|
||||
self._config_data: dict[str, Any] = {}
|
||||
|
||||
@callback
|
||||
def _show_setup_form(
|
||||
self,
|
||||
user_input: dict[str, Any] | None = None,
|
||||
errors: dict[str, str] | None = None,
|
||||
) -> FlowResult:
|
||||
def _show_setup_form(self, error: str | None = None) -> FlowResult:
|
||||
"""Show the setup form to the user."""
|
||||
|
||||
if user_input is None:
|
||||
user_input = {}
|
||||
user_input = self._config_data
|
||||
|
||||
adv_schema = {}
|
||||
conf_password = vol.Required(CONF_PASSWORD)
|
||||
if self.show_advanced_options:
|
||||
conf_password = vol.Optional(CONF_PASSWORD)
|
||||
adv_schema[vol.Optional(CONF_PORT)] = cv.port
|
||||
adv_schema[vol.Optional(CONF_SSH_KEY)] = str
|
||||
add_schema = {
|
||||
vol.Exclusive(CONF_PASSWORD, PASS_KEY, PASS_KEY_MSG): str,
|
||||
vol.Optional(CONF_PORT): cv.port,
|
||||
vol.Exclusive(CONF_SSH_KEY, PASS_KEY, PASS_KEY_MSG): str,
|
||||
}
|
||||
else:
|
||||
add_schema = {vol.Required(CONF_PASSWORD): str}
|
||||
|
||||
schema = {
|
||||
vol.Required(CONF_HOST, default=user_input.get(CONF_HOST, "")): str,
|
||||
vol.Required(CONF_USERNAME, default=user_input.get(CONF_USERNAME, "")): str,
|
||||
conf_password: str,
|
||||
vol.Required(CONF_PROTOCOL, default=PROTOCOL_SSH): vol.In(
|
||||
{PROTOCOL_SSH: "SSH", PROTOCOL_TELNET: "Telnet"}
|
||||
),
|
||||
**adv_schema,
|
||||
vol.Required(CONF_MODE, default=MODE_ROUTER): vol.In(
|
||||
{MODE_ROUTER: "Router", MODE_AP: "Access Point"}
|
||||
**add_schema,
|
||||
vol.Required(
|
||||
CONF_PROTOCOL,
|
||||
default=user_input.get(CONF_PROTOCOL, PROTOCOL_HTTPS),
|
||||
): SelectSelector(
|
||||
SelectSelectorConfig(
|
||||
options=ALLOWED_PROTOCOL, translation_key="protocols"
|
||||
)
|
||||
),
|
||||
}
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
data_schema=vol.Schema(schema),
|
||||
errors=errors or {},
|
||||
errors={CONF_BASE: error} if error else None,
|
||||
)
|
||||
|
||||
async def _async_check_connection(
|
||||
|
@ -147,25 +178,49 @@ class AsusWrtFlowHandler(ConfigFlow, domain=DOMAIN):
|
|||
) -> tuple[str, str | None]:
|
||||
"""Attempt to connect the AsusWrt router."""
|
||||
|
||||
api: AsusWrtBridge
|
||||
host: str = user_input[CONF_HOST]
|
||||
api = AsusWrtBridge.get_bridge(self.hass, user_input)
|
||||
protocol = user_input[CONF_PROTOCOL]
|
||||
error: str | None = None
|
||||
|
||||
conf = {**user_input, CONF_MODE: MODE_ROUTER}
|
||||
api = AsusWrtBridge.get_bridge(self.hass, conf)
|
||||
try:
|
||||
await api.async_connect()
|
||||
|
||||
except OSError:
|
||||
_LOGGER.error("Error connecting to the AsusWrt router at %s", host)
|
||||
return RESULT_CONN_ERROR, None
|
||||
except (AsusWrtError, OSError):
|
||||
_LOGGER.error(
|
||||
"Error connecting to the AsusWrt router at %s using protocol %s",
|
||||
host,
|
||||
protocol,
|
||||
)
|
||||
error = RESULT_CONN_ERROR
|
||||
|
||||
except Exception: # pylint: disable=broad-except
|
||||
_LOGGER.exception(
|
||||
"Unknown error connecting with AsusWrt router at %s", host
|
||||
"Unknown error connecting with AsusWrt router at %s using protocol %s",
|
||||
host,
|
||||
protocol,
|
||||
)
|
||||
return RESULT_UNKNOWN, None
|
||||
error = RESULT_UNKNOWN
|
||||
|
||||
if not api.is_connected:
|
||||
_LOGGER.error("Error connecting to the AsusWrt router at %s", host)
|
||||
return RESULT_CONN_ERROR, None
|
||||
if error is None:
|
||||
if not api.is_connected:
|
||||
_LOGGER.error(
|
||||
"Error connecting to the AsusWrt router at %s using protocol %s",
|
||||
host,
|
||||
protocol,
|
||||
)
|
||||
error = RESULT_CONN_ERROR
|
||||
|
||||
if error is not None:
|
||||
return error, None
|
||||
|
||||
_LOGGER.info(
|
||||
"Successfully connected to the AsusWrt router at %s using protocol %s",
|
||||
host,
|
||||
protocol,
|
||||
)
|
||||
unique_id = api.label_mac
|
||||
await api.async_disconnect()
|
||||
|
||||
|
@ -182,51 +237,59 @@ class AsusWrtFlowHandler(ConfigFlow, domain=DOMAIN):
|
|||
return self.async_abort(reason="no_unique_id")
|
||||
|
||||
if user_input is None:
|
||||
return self._show_setup_form(user_input)
|
||||
|
||||
errors: dict[str, str] = {}
|
||||
host: str = user_input[CONF_HOST]
|
||||
return self._show_setup_form()
|
||||
|
||||
self._config_data = user_input
|
||||
pwd: str | None = user_input.get(CONF_PASSWORD)
|
||||
ssh: str | None = user_input.get(CONF_SSH_KEY)
|
||||
protocol: str = user_input[CONF_PROTOCOL]
|
||||
|
||||
if not pwd and protocol != PROTOCOL_SSH:
|
||||
return self._show_setup_form(error="pwd_required")
|
||||
if not (pwd or ssh):
|
||||
errors["base"] = "pwd_or_ssh"
|
||||
elif ssh:
|
||||
if pwd:
|
||||
errors["base"] = "pwd_and_ssh"
|
||||
return self._show_setup_form(error="pwd_or_ssh")
|
||||
if ssh and not await self.hass.async_add_executor_job(_is_file, ssh):
|
||||
return self._show_setup_form(error="ssh_not_file")
|
||||
|
||||
host: str = user_input[CONF_HOST]
|
||||
if not await self.hass.async_add_executor_job(_get_ip, host):
|
||||
return self._show_setup_form(error="invalid_host")
|
||||
|
||||
result, unique_id = await self._async_check_connection(user_input)
|
||||
if result == RESULT_SUCCESS:
|
||||
if unique_id:
|
||||
await self.async_set_unique_id(unique_id)
|
||||
# we allow to configure a single instance without unique id
|
||||
elif self._async_current_entries():
|
||||
return self.async_abort(reason="invalid_unique_id")
|
||||
else:
|
||||
isfile = await self.hass.async_add_executor_job(_is_file, ssh)
|
||||
if not isfile:
|
||||
errors["base"] = "ssh_not_file"
|
||||
|
||||
if not errors:
|
||||
ip_address = await self.hass.async_add_executor_job(_get_ip, host)
|
||||
if not ip_address:
|
||||
errors["base"] = "invalid_host"
|
||||
|
||||
if not errors:
|
||||
result, unique_id = await self._async_check_connection(user_input)
|
||||
if result == RESULT_SUCCESS:
|
||||
if unique_id:
|
||||
await self.async_set_unique_id(unique_id)
|
||||
# we allow configure a single instance without unique id
|
||||
elif self._async_current_entries():
|
||||
return self.async_abort(reason="invalid_unique_id")
|
||||
else:
|
||||
_LOGGER.warning(
|
||||
"This device does not provide a valid Unique ID."
|
||||
" Configuration of multiple instance will not be possible"
|
||||
)
|
||||
|
||||
return self.async_create_entry(
|
||||
title=host,
|
||||
data=user_input,
|
||||
_LOGGER.warning(
|
||||
"This device does not provide a valid Unique ID."
|
||||
" Configuration of multiple instance will not be possible"
|
||||
)
|
||||
|
||||
errors["base"] = result
|
||||
if protocol in [PROTOCOL_SSH, PROTOCOL_TELNET]:
|
||||
return await self.async_step_legacy()
|
||||
return await self._async_save_entry()
|
||||
|
||||
return self._show_setup_form(user_input, errors)
|
||||
return self._show_setup_form(error=result)
|
||||
|
||||
async def async_step_legacy(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> FlowResult:
|
||||
"""Handle a flow for legacy settings."""
|
||||
if user_input is None:
|
||||
return self.async_show_form(step_id="legacy", data_schema=LEGACY_SCHEMA)
|
||||
|
||||
self._config_data.update(user_input)
|
||||
return await self._async_save_entry()
|
||||
|
||||
async def _async_save_entry(self) -> FlowResult:
|
||||
"""Save entry data if unique id is valid."""
|
||||
return self.async_create_entry(
|
||||
title=self._config_data[CONF_HOST],
|
||||
data=self._config_data,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
@callback
|
||||
|
|
|
@ -20,6 +20,8 @@ KEY_SENSORS = "sensors"
|
|||
MODE_AP = "ap"
|
||||
MODE_ROUTER = "router"
|
||||
|
||||
PROTOCOL_HTTP = "http"
|
||||
PROTOCOL_HTTPS = "https"
|
||||
PROTOCOL_SSH = "ssh"
|
||||
PROTOCOL_TELNET = "telnet"
|
||||
|
||||
|
|
|
@ -7,5 +7,5 @@
|
|||
"integration_type": "hub",
|
||||
"iot_class": "local_polling",
|
||||
"loggers": ["aioasuswrt", "asyncssh"],
|
||||
"requirements": ["aioasuswrt==1.4.0"]
|
||||
"requirements": ["aioasuswrt==1.4.0", "pyasuswrt==0.1.20"]
|
||||
}
|
||||
|
|
|
@ -6,6 +6,8 @@ from datetime import datetime, timedelta
|
|||
import logging
|
||||
from typing import Any
|
||||
|
||||
from pyasuswrt import AsusWrtError
|
||||
|
||||
from homeassistant.components.device_tracker import (
|
||||
CONF_CONSIDER_HOME,
|
||||
DEFAULT_CONSIDER_HOME,
|
||||
|
@ -219,7 +221,7 @@ class AsusWrtRouter:
|
|||
"""Set up a AsusWrt router."""
|
||||
try:
|
||||
await self._api.async_connect()
|
||||
except OSError as exc:
|
||||
except (AsusWrtError, OSError) as exc:
|
||||
raise ConfigEntryNotReady from exc
|
||||
if not self._api.is_connected:
|
||||
raise ConfigEntryNotReady
|
||||
|
|
|
@ -6,21 +6,26 @@
|
|||
"description": "Set required parameter to connect to your router",
|
||||
"data": {
|
||||
"host": "[%key:common::config_flow::data::host%]",
|
||||
"name": "[%key:common::config_flow::data::name%]",
|
||||
"username": "[%key:common::config_flow::data::username%]",
|
||||
"password": "[%key:common::config_flow::data::password%]",
|
||||
"ssh_key": "Path to your SSH key file (instead of password)",
|
||||
"protocol": "Communication protocol to use",
|
||||
"port": "Port (leave empty for protocol default)",
|
||||
"mode": "[%key:common::config_flow::data::mode%]"
|
||||
"port": "Port (leave empty for protocol default)"
|
||||
}
|
||||
},
|
||||
"legacy": {
|
||||
"title": "AsusWRT",
|
||||
"description": "Set required parameters to connect to your router",
|
||||
"data": {
|
||||
"mode": "Router operating mode"
|
||||
}
|
||||
}
|
||||
},
|
||||
"error": {
|
||||
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
|
||||
"invalid_host": "[%key:common::config_flow::error::invalid_host%]",
|
||||
"pwd_and_ssh": "Only provide password or SSH key file",
|
||||
"pwd_or_ssh": "Please provide password or SSH key file",
|
||||
"pwd_required": "Password is required for selected protocol",
|
||||
"ssh_not_file": "SSH key file not found",
|
||||
"unknown": "[%key:common::config_flow::error::unknown%]"
|
||||
},
|
||||
|
@ -79,5 +84,15 @@
|
|||
"name": "CPU Temperature"
|
||||
}
|
||||
}
|
||||
},
|
||||
"selector": {
|
||||
"protocols": {
|
||||
"options": {
|
||||
"https": "HTTPS",
|
||||
"http": "HTTP",
|
||||
"ssh": "SSH",
|
||||
"telnet": "Telnet"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1610,6 +1610,9 @@ pyairnow==1.2.1
|
|||
# homeassistant.components.airvisual_pro
|
||||
pyairvisual==2023.08.1
|
||||
|
||||
# homeassistant.components.asuswrt
|
||||
pyasuswrt==0.1.20
|
||||
|
||||
# homeassistant.components.atag
|
||||
pyatag==0.3.5.3
|
||||
|
||||
|
|
|
@ -1223,6 +1223,9 @@ pyairnow==1.2.1
|
|||
# homeassistant.components.airvisual_pro
|
||||
pyairvisual==2023.08.1
|
||||
|
||||
# homeassistant.components.asuswrt
|
||||
pyasuswrt==0.1.20
|
||||
|
||||
# homeassistant.components.atag
|
||||
pyatag==0.3.5.3
|
||||
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
"""Test code shared between test files."""
|
||||
|
||||
from aioasuswrt.asuswrt import Device as LegacyDevice
|
||||
from pyasuswrt.asuswrt import Device as HttpDevice
|
||||
|
||||
from homeassistant.components.asuswrt.const import (
|
||||
CONF_SSH_KEY,
|
||||
MODE_ROUTER,
|
||||
PROTOCOL_HTTP,
|
||||
PROTOCOL_HTTPS,
|
||||
PROTOCOL_SSH,
|
||||
PROTOCOL_TELNET,
|
||||
)
|
||||
|
@ -40,6 +43,14 @@ CONFIG_DATA_SSH = {
|
|||
CONF_MODE: MODE_ROUTER,
|
||||
}
|
||||
|
||||
CONFIG_DATA_HTTP = {
|
||||
CONF_HOST: HOST,
|
||||
CONF_PORT: 80,
|
||||
CONF_PROTOCOL: PROTOCOL_HTTPS,
|
||||
CONF_USERNAME: "user",
|
||||
CONF_PASSWORD: "pwd",
|
||||
}
|
||||
|
||||
MOCK_MACS = [
|
||||
"A1:B1:C1:D1:E1:F1",
|
||||
"A2:B2:C2:D2:E2:F2",
|
||||
|
@ -48,6 +59,8 @@ MOCK_MACS = [
|
|||
]
|
||||
|
||||
|
||||
def new_device(mac, ip, name):
|
||||
def new_device(protocol, mac, ip, name):
|
||||
"""Return a new device for specific protocol."""
|
||||
if protocol in [PROTOCOL_HTTP, PROTOCOL_HTTPS]:
|
||||
return HttpDevice(mac, ip, name, ROUTER_MAC_ADDR, None)
|
||||
return LegacyDevice(mac, ip, name)
|
||||
|
|
|
@ -4,16 +4,24 @@ from unittest.mock import Mock, patch
|
|||
|
||||
from aioasuswrt.asuswrt import AsusWrt as AsusWrtLegacy
|
||||
from aioasuswrt.connection import TelnetConnection
|
||||
from pyasuswrt.asuswrt import AsusWrtError, AsusWrtHttp
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.asuswrt.const import PROTOCOL_HTTP, PROTOCOL_SSH
|
||||
|
||||
from .common import ASUSWRT_BASE, MOCK_MACS, ROUTER_MAC_ADDR, new_device
|
||||
|
||||
ASUSWRT_HTTP_LIB = f"{ASUSWRT_BASE}.bridge.AsusWrtHttp"
|
||||
ASUSWRT_LEGACY_LIB = f"{ASUSWRT_BASE}.bridge.AsusWrtLegacy"
|
||||
|
||||
MOCK_BYTES_TOTAL = [60000000000, 50000000000]
|
||||
MOCK_BYTES_TOTAL_HTTP = dict(enumerate(MOCK_BYTES_TOTAL))
|
||||
MOCK_CURRENT_TRANSFER_RATES = [20000000, 10000000]
|
||||
MOCK_LOAD_AVG = [1.1, 1.2, 1.3]
|
||||
MOCK_TEMPERATURES = {"2.4GHz": 40.2, "5.0GHz": 0, "CPU": 71.2}
|
||||
MOCK_CURRENT_TRANSFER_RATES_HTTP = dict(enumerate(MOCK_CURRENT_TRANSFER_RATES))
|
||||
MOCK_LOAD_AVG_HTTP = {"load_avg_1": 1.1, "load_avg_5": 1.2, "load_avg_15": 1.3}
|
||||
MOCK_LOAD_AVG = list(MOCK_LOAD_AVG_HTTP.values())
|
||||
MOCK_TEMPERATURES_HTTP = {"2.4GHz": 40.2, "CPU": 71.2}
|
||||
MOCK_TEMPERATURES = {**MOCK_TEMPERATURES_HTTP, "5.0GHz": 0}
|
||||
|
||||
|
||||
@pytest.fixture(name="patch_setup_entry")
|
||||
|
@ -29,8 +37,17 @@ def mock_controller_patch_setup_entry():
|
|||
def mock_devices_legacy_fixture():
|
||||
"""Mock a list of devices."""
|
||||
return {
|
||||
MOCK_MACS[0]: new_device(MOCK_MACS[0], "192.168.1.2", "Test"),
|
||||
MOCK_MACS[1]: new_device(MOCK_MACS[1], "192.168.1.3", "TestTwo"),
|
||||
MOCK_MACS[0]: new_device(PROTOCOL_SSH, MOCK_MACS[0], "192.168.1.2", "Test"),
|
||||
MOCK_MACS[1]: new_device(PROTOCOL_SSH, MOCK_MACS[1], "192.168.1.3", "TestTwo"),
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(name="mock_devices_http")
|
||||
def mock_devices_http_fixture():
|
||||
"""Mock a list of devices."""
|
||||
return {
|
||||
MOCK_MACS[0]: new_device(PROTOCOL_HTTP, MOCK_MACS[0], "192.168.1.2", "Test"),
|
||||
MOCK_MACS[1]: new_device(PROTOCOL_HTTP, MOCK_MACS[1], "192.168.1.3", "TestTwo"),
|
||||
}
|
||||
|
||||
|
||||
|
@ -81,3 +98,48 @@ def mock_controller_connect_legacy_sens_fail(connect_legacy):
|
|||
True,
|
||||
True,
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(name="connect_http")
|
||||
def mock_controller_connect_http(mock_devices_http):
|
||||
"""Mock a successful connection with http library."""
|
||||
with patch(ASUSWRT_HTTP_LIB, spec_set=AsusWrtHttp) as service_mock:
|
||||
service_mock.return_value.is_connected = True
|
||||
service_mock.return_value.mac = ROUTER_MAC_ADDR
|
||||
service_mock.return_value.model = "FAKE_MODEL"
|
||||
service_mock.return_value.firmware = "FAKE_FIRMWARE"
|
||||
service_mock.return_value.async_get_connected_devices.return_value = (
|
||||
mock_devices_http
|
||||
)
|
||||
service_mock.return_value.async_get_traffic_bytes.return_value = (
|
||||
MOCK_BYTES_TOTAL_HTTP
|
||||
)
|
||||
service_mock.return_value.async_get_traffic_rates.return_value = (
|
||||
MOCK_CURRENT_TRANSFER_RATES_HTTP
|
||||
)
|
||||
service_mock.return_value.async_get_loadavg.return_value = MOCK_LOAD_AVG_HTTP
|
||||
service_mock.return_value.async_get_temperatures.return_value = (
|
||||
MOCK_TEMPERATURES_HTTP
|
||||
)
|
||||
yield service_mock
|
||||
|
||||
|
||||
@pytest.fixture(name="connect_http_sens_fail")
|
||||
def mock_controller_connect_http_sens_fail(connect_http):
|
||||
"""Mock a successful connection using http library with sensors fail."""
|
||||
connect_http.return_value.mac = None
|
||||
connect_http.return_value.async_get_connected_devices.side_effect = AsusWrtError
|
||||
connect_http.return_value.async_get_traffic_bytes.side_effect = AsusWrtError
|
||||
connect_http.return_value.async_get_traffic_rates.side_effect = AsusWrtError
|
||||
connect_http.return_value.async_get_loadavg.side_effect = AsusWrtError
|
||||
connect_http.return_value.async_get_temperatures.side_effect = AsusWrtError
|
||||
|
||||
|
||||
@pytest.fixture(name="connect_http_sens_detect")
|
||||
def mock_controller_connect_http_sens_detect():
|
||||
"""Mock a successful sensor detection using http library."""
|
||||
with patch(
|
||||
f"{ASUSWRT_BASE}.bridge.AsusWrtHttpBridge._get_available_temperature_sensors",
|
||||
return_value=[*MOCK_TEMPERATURES],
|
||||
) as mock_sens_detect:
|
||||
yield mock_sens_detect
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
from socket import gaierror
|
||||
from unittest.mock import patch
|
||||
|
||||
from pyasuswrt import AsusWrtError
|
||||
import pytest
|
||||
|
||||
from homeassistant import data_entry_flow
|
||||
|
@ -13,18 +14,54 @@ from homeassistant.components.asuswrt.const import (
|
|||
CONF_TRACK_UNKNOWN,
|
||||
DOMAIN,
|
||||
MODE_AP,
|
||||
MODE_ROUTER,
|
||||
PROTOCOL_HTTPS,
|
||||
PROTOCOL_SSH,
|
||||
PROTOCOL_TELNET,
|
||||
)
|
||||
from homeassistant.components.device_tracker import CONF_CONSIDER_HOME
|
||||
from homeassistant.config_entries import SOURCE_USER
|
||||
from homeassistant.const import CONF_HOST, CONF_MODE, CONF_PASSWORD
|
||||
from homeassistant.const import (
|
||||
CONF_BASE,
|
||||
CONF_HOST,
|
||||
CONF_MODE,
|
||||
CONF_PASSWORD,
|
||||
CONF_PORT,
|
||||
CONF_PROTOCOL,
|
||||
CONF_USERNAME,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
from .common import ASUSWRT_BASE, CONFIG_DATA_TELNET, HOST, ROUTER_MAC_ADDR
|
||||
from .common import ASUSWRT_BASE, HOST, ROUTER_MAC_ADDR
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
SSH_KEY = "1234"
|
||||
|
||||
CONFIG_DATA = {
|
||||
CONF_HOST: HOST,
|
||||
CONF_USERNAME: "user",
|
||||
CONF_PASSWORD: "pwd",
|
||||
}
|
||||
|
||||
CONFIG_DATA_HTTP = {
|
||||
**CONFIG_DATA,
|
||||
CONF_PROTOCOL: PROTOCOL_HTTPS,
|
||||
CONF_PORT: 8443,
|
||||
}
|
||||
|
||||
CONFIG_DATA_SSH = {
|
||||
**CONFIG_DATA,
|
||||
CONF_PROTOCOL: PROTOCOL_SSH,
|
||||
CONF_PORT: 22,
|
||||
}
|
||||
|
||||
CONFIG_DATA_TELNET = {
|
||||
**CONFIG_DATA,
|
||||
CONF_PROTOCOL: PROTOCOL_TELNET,
|
||||
CONF_PORT: 23,
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(name="patch_get_host", autouse=True)
|
||||
def mock_controller_patch_get_host():
|
||||
|
@ -45,7 +82,7 @@ def mock_controller_patch_is_file():
|
|||
|
||||
|
||||
@pytest.mark.parametrize("unique_id", [{}, {"label_mac": ROUTER_MAC_ADDR}])
|
||||
async def test_user(
|
||||
async def test_user_legacy(
|
||||
hass: HomeAssistant, connect_legacy, patch_setup_entry, unique_id
|
||||
) -> None:
|
||||
"""Test user config."""
|
||||
|
@ -58,30 +95,57 @@ async def test_user(
|
|||
connect_legacy.return_value.async_get_nvram.return_value = unique_id
|
||||
|
||||
# test with all provided
|
||||
legacy_result = await hass.config_entries.flow.async_configure(
|
||||
flow_result["flow_id"], user_input=CONFIG_DATA_TELNET
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert legacy_result["type"] == data_entry_flow.FlowResultType.FORM
|
||||
assert legacy_result["step_id"] == "legacy"
|
||||
|
||||
# complete configuration
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
flow_result["flow_id"],
|
||||
user_input=CONFIG_DATA_TELNET,
|
||||
legacy_result["flow_id"], user_input={CONF_MODE: MODE_AP}
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
|
||||
assert result["title"] == HOST
|
||||
assert result["data"] == CONFIG_DATA_TELNET
|
||||
assert result["data"] == {**CONFIG_DATA_TELNET, CONF_MODE: MODE_AP}
|
||||
|
||||
assert len(patch_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("config", "error"),
|
||||
[
|
||||
({}, "pwd_or_ssh"),
|
||||
({CONF_PASSWORD: "pwd", CONF_SSH_KEY: SSH_KEY}, "pwd_and_ssh"),
|
||||
],
|
||||
)
|
||||
async def test_error_wrong_password_ssh(hass: HomeAssistant, config, error) -> None:
|
||||
"""Test we abort for wrong password and ssh file combination."""
|
||||
config_data = {k: v for k, v in CONFIG_DATA_TELNET.items() if k != CONF_PASSWORD}
|
||||
config_data.update(config)
|
||||
@pytest.mark.parametrize("unique_id", [None, ROUTER_MAC_ADDR])
|
||||
async def test_user_http(
|
||||
hass: HomeAssistant, connect_http, patch_setup_entry, unique_id
|
||||
) -> None:
|
||||
"""Test user config http."""
|
||||
flow_result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": SOURCE_USER, "show_advanced_options": True}
|
||||
)
|
||||
assert flow_result["type"] == data_entry_flow.FlowResultType.FORM
|
||||
assert flow_result["step_id"] == "user"
|
||||
|
||||
connect_http.return_value.mac = unique_id
|
||||
|
||||
# test with all provided
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
flow_result["flow_id"], user_input=CONFIG_DATA_HTTP
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
|
||||
assert result["title"] == HOST
|
||||
assert result["data"] == CONFIG_DATA_HTTP
|
||||
|
||||
assert len(patch_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize("config", [CONFIG_DATA_TELNET, CONFIG_DATA_HTTP])
|
||||
async def test_error_pwd_required(hass: HomeAssistant, config) -> None:
|
||||
"""Test we abort for missing password."""
|
||||
config_data = {k: v for k, v in config.items() if k != CONF_PASSWORD}
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_USER, "show_advanced_options": True},
|
||||
|
@ -89,12 +153,25 @@ async def test_error_wrong_password_ssh(hass: HomeAssistant, config, error) -> N
|
|||
)
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.FORM
|
||||
assert result["errors"] == {"base": error}
|
||||
assert result["errors"] == {CONF_BASE: "pwd_required"}
|
||||
|
||||
|
||||
async def test_error_no_password_ssh(hass: HomeAssistant) -> None:
|
||||
"""Test we abort for wrong password and ssh file combination."""
|
||||
config_data = {k: v for k, v in CONFIG_DATA_SSH.items() if k != CONF_PASSWORD}
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_USER, "show_advanced_options": True},
|
||||
data=config_data,
|
||||
)
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.FORM
|
||||
assert result["errors"] == {CONF_BASE: "pwd_or_ssh"}
|
||||
|
||||
|
||||
async def test_error_invalid_ssh(hass: HomeAssistant, patch_is_file) -> None:
|
||||
"""Test we abort if invalid ssh file is provided."""
|
||||
config_data = {k: v for k, v in CONFIG_DATA_TELNET.items() if k != CONF_PASSWORD}
|
||||
config_data = {k: v for k, v in CONFIG_DATA_SSH.items() if k != CONF_PASSWORD}
|
||||
config_data[CONF_SSH_KEY] = SSH_KEY
|
||||
|
||||
patch_is_file.return_value = False
|
||||
|
@ -105,7 +182,7 @@ async def test_error_invalid_ssh(hass: HomeAssistant, patch_is_file) -> None:
|
|||
)
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.FORM
|
||||
assert result["errors"] == {"base": "ssh_not_file"}
|
||||
assert result["errors"] == {CONF_BASE: "ssh_not_file"}
|
||||
|
||||
|
||||
async def test_error_invalid_host(hass: HomeAssistant, patch_get_host) -> None:
|
||||
|
@ -118,7 +195,7 @@ async def test_error_invalid_host(hass: HomeAssistant, patch_get_host) -> None:
|
|||
)
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.FORM
|
||||
assert result["errors"] == {"base": "invalid_host"}
|
||||
assert result["errors"] == {CONF_BASE: "invalid_host"}
|
||||
|
||||
|
||||
async def test_abort_if_not_unique_id_setup(hass: HomeAssistant) -> None:
|
||||
|
@ -138,27 +215,26 @@ async def test_abort_if_not_unique_id_setup(hass: HomeAssistant) -> None:
|
|||
|
||||
|
||||
async def test_update_uniqueid_exist(
|
||||
hass: HomeAssistant, connect_legacy, patch_setup_entry
|
||||
hass: HomeAssistant, connect_http, patch_setup_entry
|
||||
) -> None:
|
||||
"""Test we update entry if uniqueid is already configured."""
|
||||
existing_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={**CONFIG_DATA_TELNET, CONF_HOST: "10.10.10.10"},
|
||||
data={**CONFIG_DATA_HTTP, CONF_HOST: "10.10.10.10"},
|
||||
unique_id=ROUTER_MAC_ADDR,
|
||||
)
|
||||
existing_entry.add_to_hass(hass)
|
||||
|
||||
# test with all provided
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_USER, "show_advanced_options": True},
|
||||
data=CONFIG_DATA_TELNET,
|
||||
data=CONFIG_DATA_HTTP,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
|
||||
assert result["title"] == HOST
|
||||
assert result["data"] == CONFIG_DATA_TELNET
|
||||
assert result["data"] == CONFIG_DATA_HTTP
|
||||
prev_entry = hass.config_entries.async_get_entry(existing_entry.entry_id)
|
||||
assert not prev_entry
|
||||
|
||||
|
@ -190,10 +266,10 @@ async def test_abort_invalid_unique_id(hass: HomeAssistant, connect_legacy) -> N
|
|||
(None, "cannot_connect"),
|
||||
],
|
||||
)
|
||||
async def test_on_connect_failed(
|
||||
async def test_on_connect_legacy_failed(
|
||||
hass: HomeAssistant, connect_legacy, side_effect, error
|
||||
) -> None:
|
||||
"""Test when we have errors connecting the router."""
|
||||
"""Test when we have errors connecting the router with legacy library."""
|
||||
flow_result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_USER, "show_advanced_options": True},
|
||||
|
@ -202,11 +278,43 @@ async def test_on_connect_failed(
|
|||
connect_legacy.return_value.is_connected = False
|
||||
connect_legacy.return_value.connection.async_connect.side_effect = side_effect
|
||||
|
||||
# go to legacy form
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
flow_result["flow_id"], user_input=CONFIG_DATA_TELNET
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.FORM
|
||||
assert result["errors"] == {"base": error}
|
||||
assert result["errors"] == {CONF_BASE: error}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("side_effect", "error"),
|
||||
[
|
||||
(AsusWrtError, "cannot_connect"),
|
||||
(TypeError, "unknown"),
|
||||
(None, "cannot_connect"),
|
||||
],
|
||||
)
|
||||
async def test_on_connect_http_failed(
|
||||
hass: HomeAssistant, connect_http, side_effect, error
|
||||
) -> None:
|
||||
"""Test when we have errors connecting the router with http library."""
|
||||
flow_result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": SOURCE_USER, "show_advanced_options": True},
|
||||
)
|
||||
|
||||
connect_http.return_value.is_connected = False
|
||||
connect_http.return_value.async_connect.side_effect = side_effect
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
flow_result["flow_id"], user_input=CONFIG_DATA_HTTP
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.FORM
|
||||
assert result["errors"] == {CONF_BASE: error}
|
||||
|
||||
|
||||
async def test_options_flow_ap(hass: HomeAssistant, patch_setup_entry) -> None:
|
||||
|
@ -251,7 +359,7 @@ async def test_options_flow_router(hass: HomeAssistant, patch_setup_entry) -> No
|
|||
"""Test config flow options for router mode."""
|
||||
config_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data=CONFIG_DATA_TELNET,
|
||||
data={**CONFIG_DATA_TELNET, CONF_MODE: MODE_ROUTER},
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
|
@ -280,3 +388,36 @@ async def test_options_flow_router(hass: HomeAssistant, patch_setup_entry) -> No
|
|||
CONF_INTERFACE: "aaa",
|
||||
CONF_DNSMASQ: "bbb",
|
||||
}
|
||||
|
||||
|
||||
async def test_options_flow_http(hass: HomeAssistant, patch_setup_entry) -> None:
|
||||
"""Test config flow options for http mode."""
|
||||
config_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={**CONFIG_DATA_HTTP, CONF_MODE: MODE_ROUTER},
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
result = await hass.config_entries.options.async_init(config_entry.entry_id)
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.FORM
|
||||
assert result["step_id"] == "init"
|
||||
assert CONF_INTERFACE not in result["data_schema"].schema
|
||||
assert CONF_DNSMASQ not in result["data_schema"].schema
|
||||
assert CONF_REQUIRE_IP not in result["data_schema"].schema
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"],
|
||||
user_input={
|
||||
CONF_CONSIDER_HOME: 20,
|
||||
CONF_TRACK_UNKNOWN: True,
|
||||
},
|
||||
)
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
|
||||
assert config_entry.options == {
|
||||
CONF_CONSIDER_HOME: 20,
|
||||
CONF_TRACK_UNKNOWN: True,
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
"""Tests for the AsusWrt sensor."""
|
||||
from datetime import timedelta
|
||||
|
||||
from pyasuswrt.asuswrt import AsusWrtError
|
||||
import pytest
|
||||
|
||||
from homeassistant.components import device_tracker, sensor
|
||||
|
@ -14,19 +15,32 @@ from homeassistant.components.asuswrt.const import (
|
|||
)
|
||||
from homeassistant.components.device_tracker import CONF_CONSIDER_HOME
|
||||
from homeassistant.config_entries import ConfigEntryState
|
||||
from homeassistant.const import STATE_HOME, STATE_NOT_HOME, STATE_UNAVAILABLE
|
||||
from homeassistant.const import (
|
||||
CONF_PROTOCOL,
|
||||
STATE_HOME,
|
||||
STATE_NOT_HOME,
|
||||
STATE_UNAVAILABLE,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import device_registry as dr, entity_registry as er
|
||||
from homeassistant.util import slugify
|
||||
from homeassistant.util.dt import utcnow
|
||||
|
||||
from .common import CONFIG_DATA_TELNET, HOST, MOCK_MACS, ROUTER_MAC_ADDR, new_device
|
||||
from .common import (
|
||||
CONFIG_DATA_HTTP,
|
||||
CONFIG_DATA_TELNET,
|
||||
HOST,
|
||||
MOCK_MACS,
|
||||
ROUTER_MAC_ADDR,
|
||||
new_device,
|
||||
)
|
||||
|
||||
from tests.common import MockConfigEntry, async_fire_time_changed
|
||||
|
||||
SENSORS_DEFAULT = [*SENSORS_BYTES, *SENSORS_RATES]
|
||||
|
||||
SENSORS_ALL_LEGACY = [*SENSORS_DEFAULT, *SENSORS_LOAD_AVG, *SENSORS_TEMPERATURES]
|
||||
SENSORS_ALL_HTTP = [*SENSORS_DEFAULT, *SENSORS_LOAD_AVG, *SENSORS_TEMPERATURES]
|
||||
|
||||
|
||||
@pytest.fixture(name="create_device_registry_devices")
|
||||
|
@ -132,8 +146,12 @@ async def _test_sensors(
|
|||
assert hass.states.get(f"{sensor_prefix}_devices_connected").state == "1"
|
||||
|
||||
# add 2 new devices, one unnamed that should be ignored but counted
|
||||
mock_devices[MOCK_MACS[2]] = new_device(MOCK_MACS[2], "192.168.1.4", "TestThree")
|
||||
mock_devices[MOCK_MACS[3]] = new_device(MOCK_MACS[3], "192.168.1.5", None)
|
||||
mock_devices[MOCK_MACS[2]] = new_device(
|
||||
config[CONF_PROTOCOL], MOCK_MACS[2], "192.168.1.4", "TestThree"
|
||||
)
|
||||
mock_devices[MOCK_MACS[3]] = new_device(
|
||||
config[CONF_PROTOCOL], MOCK_MACS[3], "192.168.1.5", None
|
||||
)
|
||||
|
||||
# change consider home settings to have status not home of removed tracked device
|
||||
hass.config_entries.async_update_entry(
|
||||
|
@ -154,7 +172,7 @@ async def _test_sensors(
|
|||
"entry_unique_id",
|
||||
[None, ROUTER_MAC_ADDR],
|
||||
)
|
||||
async def test_sensors(
|
||||
async def test_sensors_legacy(
|
||||
hass: HomeAssistant,
|
||||
connect_legacy,
|
||||
mock_devices_legacy,
|
||||
|
@ -165,11 +183,24 @@ async def test_sensors(
|
|||
await _test_sensors(hass, mock_devices_legacy, CONFIG_DATA_TELNET, entry_unique_id)
|
||||
|
||||
|
||||
async def test_loadavg_sensors(hass: HomeAssistant, connect_legacy) -> None:
|
||||
@pytest.mark.parametrize(
|
||||
"entry_unique_id",
|
||||
[None, ROUTER_MAC_ADDR],
|
||||
)
|
||||
async def test_sensors_http(
|
||||
hass: HomeAssistant,
|
||||
connect_http,
|
||||
mock_devices_http,
|
||||
create_device_registry_devices,
|
||||
entry_unique_id,
|
||||
) -> None:
|
||||
"""Test creating AsusWRT default sensors and tracker with http protocol."""
|
||||
await _test_sensors(hass, mock_devices_http, CONFIG_DATA_HTTP, entry_unique_id)
|
||||
|
||||
|
||||
async def _test_loadavg_sensors(hass: HomeAssistant, config) -> None:
|
||||
"""Test creating an AsusWRT load average sensors."""
|
||||
config_entry, sensor_prefix = _setup_entry(
|
||||
hass, CONFIG_DATA_TELNET, SENSORS_LOAD_AVG
|
||||
)
|
||||
config_entry, sensor_prefix = _setup_entry(hass, config, SENSORS_LOAD_AVG)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
# initial devices setup
|
||||
|
@ -184,13 +215,40 @@ async def test_loadavg_sensors(hass: HomeAssistant, connect_legacy) -> None:
|
|||
assert hass.states.get(f"{sensor_prefix}_sensor_load_avg15").state == "1.3"
|
||||
|
||||
|
||||
async def test_temperature_sensors(hass: HomeAssistant, connect_legacy) -> None:
|
||||
"""Test creating a AsusWRT temperature sensors."""
|
||||
async def test_loadavg_sensors_legacy(hass: HomeAssistant, connect_legacy) -> None:
|
||||
"""Test creating an AsusWRT load average sensors."""
|
||||
await _test_loadavg_sensors(hass, CONFIG_DATA_TELNET)
|
||||
|
||||
|
||||
async def test_loadavg_sensors_http(hass: HomeAssistant, connect_http) -> None:
|
||||
"""Test creating an AsusWRT load average sensors."""
|
||||
await _test_loadavg_sensors(hass, CONFIG_DATA_HTTP)
|
||||
|
||||
|
||||
async def test_temperature_sensors_http_fail(
|
||||
hass: HomeAssistant, connect_http_sens_fail
|
||||
) -> None:
|
||||
"""Test fail creating AsusWRT temperature sensors."""
|
||||
config_entry, sensor_prefix = _setup_entry(
|
||||
hass, CONFIG_DATA_TELNET, SENSORS_TEMPERATURES
|
||||
hass, CONFIG_DATA_HTTP, SENSORS_TEMPERATURES
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
# initial devices setup
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# assert temperature availability exception is handled correctly
|
||||
assert not hass.states.get(f"{sensor_prefix}_2_4ghz")
|
||||
assert not hass.states.get(f"{sensor_prefix}_5_0ghz")
|
||||
assert not hass.states.get(f"{sensor_prefix}_cpu")
|
||||
|
||||
|
||||
async def _test_temperature_sensors(hass: HomeAssistant, config) -> None:
|
||||
"""Test creating a AsusWRT temperature sensors."""
|
||||
config_entry, sensor_prefix = _setup_entry(hass, config, SENSORS_TEMPERATURES)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
# initial devices setup
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
@ -203,11 +261,23 @@ async def test_temperature_sensors(hass: HomeAssistant, connect_legacy) -> None:
|
|||
assert hass.states.get(f"{sensor_prefix}_cpu").state == "71.2"
|
||||
|
||||
|
||||
async def test_temperature_sensors_legacy(hass: HomeAssistant, connect_legacy) -> None:
|
||||
"""Test creating a AsusWRT temperature sensors."""
|
||||
await _test_temperature_sensors(hass, CONFIG_DATA_TELNET)
|
||||
|
||||
|
||||
async def test_temperature_sensors_http(hass: HomeAssistant, connect_http) -> None:
|
||||
"""Test creating a AsusWRT temperature sensors."""
|
||||
await _test_temperature_sensors(hass, CONFIG_DATA_HTTP)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"side_effect",
|
||||
[OSError, None],
|
||||
)
|
||||
async def test_connect_fail(hass: HomeAssistant, connect_legacy, side_effect) -> None:
|
||||
async def test_connect_fail_legacy(
|
||||
hass: HomeAssistant, connect_legacy, side_effect
|
||||
) -> None:
|
||||
"""Test AsusWRT connect fail."""
|
||||
|
||||
# init config entry
|
||||
|
@ -226,22 +296,43 @@ async def test_connect_fail(hass: HomeAssistant, connect_legacy, side_effect) ->
|
|||
assert config_entry.state is ConfigEntryState.SETUP_RETRY
|
||||
|
||||
|
||||
async def test_sensors_polling_fails(
|
||||
hass: HomeAssistant, connect_legacy_sens_fail
|
||||
@pytest.mark.parametrize(
|
||||
"side_effect",
|
||||
[AsusWrtError, None],
|
||||
)
|
||||
async def test_connect_fail_http(
|
||||
hass: HomeAssistant, connect_http, side_effect
|
||||
) -> None:
|
||||
"""Test AsusWRT sensors are unavailable when polling fails."""
|
||||
config_entry, sensor_prefix = _setup_entry(
|
||||
hass, CONFIG_DATA_TELNET, SENSORS_ALL_LEGACY
|
||||
"""Test AsusWRT connect fail."""
|
||||
|
||||
# init config entry
|
||||
config_entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data=CONFIG_DATA_HTTP,
|
||||
)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
connect_http.return_value.async_connect.side_effect = side_effect
|
||||
connect_http.return_value.is_connected = False
|
||||
|
||||
# initial setup fail
|
||||
await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
assert config_entry.state is ConfigEntryState.SETUP_RETRY
|
||||
|
||||
|
||||
async def _test_sensors_polling_fails(hass: HomeAssistant, config, sensors) -> None:
|
||||
"""Test AsusWRT sensors are unavailable when polling fails."""
|
||||
config_entry, sensor_prefix = _setup_entry(hass, config, sensors)
|
||||
config_entry.add_to_hass(hass)
|
||||
|
||||
# initial devices setup
|
||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
async_fire_time_changed(hass, utcnow() + timedelta(seconds=30))
|
||||
await hass.async_block_till_done()
|
||||
|
||||
for sensor_name in SENSORS_ALL_LEGACY:
|
||||
for sensor_name in sensors:
|
||||
assert (
|
||||
hass.states.get(f"{sensor_prefix}_{slugify(sensor_name)}").state
|
||||
== STATE_UNAVAILABLE
|
||||
|
@ -249,6 +340,23 @@ async def test_sensors_polling_fails(
|
|||
assert hass.states.get(f"{sensor_prefix}_devices_connected").state == "0"
|
||||
|
||||
|
||||
async def test_sensors_polling_fails_legacy(
|
||||
hass: HomeAssistant,
|
||||
connect_legacy_sens_fail,
|
||||
) -> None:
|
||||
"""Test AsusWRT sensors are unavailable when polling fails."""
|
||||
await _test_sensors_polling_fails(hass, CONFIG_DATA_TELNET, SENSORS_ALL_LEGACY)
|
||||
|
||||
|
||||
async def test_sensors_polling_fails_http(
|
||||
hass: HomeAssistant,
|
||||
connect_http_sens_fail,
|
||||
connect_http_sens_detect,
|
||||
) -> None:
|
||||
"""Test AsusWRT sensors are unavailable when polling fails."""
|
||||
await _test_sensors_polling_fails(hass, CONFIG_DATA_HTTP, SENSORS_ALL_HTTP)
|
||||
|
||||
|
||||
async def test_options_reload(hass: HomeAssistant, connect_legacy) -> None:
|
||||
"""Test AsusWRT integration is reload changing an options that require this."""
|
||||
config_entry = MockConfigEntry(
|
||||
|
|
Loading…
Reference in New Issue