Support HTTPS connections to Huawei LTE devices (#86119)
* Support HTTPS connections to Huawei LTE devices Not all devices support HTTPS, so we default to plain HTTP still. Ones that do are very likely to have certificates that do not pass hostname verification, and are either self signed or issued by an untrusted CA. Add option to use unverified HTTPS to make it possible to connect to these, and when in effect, filter urllib3's related warnings about insecure connections to the hostname in question. * Use common config key and strings for certificate verification settings Even though the wording might be slightly suboptimal here, it's better to be consistent across the codebase than to finetune on this level. This also switches the default the other way around: verification is now disabled by default. This is not a good general default, but for this particular case setups where the verification would succeed would be so rare and require considerable local setup that it's very unlikely to happen in practice. * Add config flow tests * Mock logout for better test coverage * Set up custom requests session only when using unverified https * Add https config flow test case * Make better use of verify SSL defaultpull/101186/head
parent
239d7c9d80
commit
2a4ab3d53d
|
@ -35,6 +35,7 @@ from homeassistant.const import (
|
|||
CONF_RECIPIENT,
|
||||
CONF_URL,
|
||||
CONF_USERNAME,
|
||||
CONF_VERIFY_SSL,
|
||||
EVENT_HOMEASSISTANT_STOP,
|
||||
Platform,
|
||||
)
|
||||
|
@ -89,7 +90,7 @@ from .const import (
|
|||
SERVICE_SUSPEND_INTEGRATION,
|
||||
UPDATE_SIGNAL,
|
||||
)
|
||||
from .utils import get_device_macs
|
||||
from .utils import get_device_macs, non_verifying_requests_session
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
@ -335,16 +336,19 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
|
||||
def _connect() -> Connection:
|
||||
"""Set up a connection."""
|
||||
kwargs: dict[str, Any] = {
|
||||
"timeout": CONNECTION_TIMEOUT,
|
||||
}
|
||||
if url.startswith("https://") and not entry.data.get(CONF_VERIFY_SSL):
|
||||
kwargs["requests_session"] = non_verifying_requests_session(url)
|
||||
if entry.options.get(CONF_UNAUTHENTICATED_MODE):
|
||||
_LOGGER.debug("Connecting in unauthenticated mode, reduced feature set")
|
||||
connection = Connection(url, timeout=CONNECTION_TIMEOUT)
|
||||
connection = Connection(url, **kwargs)
|
||||
else:
|
||||
_LOGGER.debug("Connecting in authenticated mode, full feature set")
|
||||
username = entry.data.get(CONF_USERNAME) or ""
|
||||
password = entry.data.get(CONF_PASSWORD) or ""
|
||||
connection = Connection(
|
||||
url, username=username, password=password, timeout=CONNECTION_TIMEOUT
|
||||
)
|
||||
connection = Connection(url, username=username, password=password, **kwargs)
|
||||
return connection
|
||||
|
||||
try:
|
||||
|
|
|
@ -16,7 +16,7 @@ from huawei_lte_api.exceptions import (
|
|||
ResponseErrorException,
|
||||
)
|
||||
from huawei_lte_api.Session import GetResponseType
|
||||
from requests.exceptions import Timeout
|
||||
from requests.exceptions import SSLError, Timeout
|
||||
from url_normalize import url_normalize
|
||||
import voluptuous as vol
|
||||
|
||||
|
@ -29,6 +29,7 @@ from homeassistant.const import (
|
|||
CONF_RECIPIENT,
|
||||
CONF_URL,
|
||||
CONF_USERNAME,
|
||||
CONF_VERIFY_SSL,
|
||||
)
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.data_entry_flow import FlowResult
|
||||
|
@ -44,7 +45,7 @@ from .const import (
|
|||
DEFAULT_UNAUTHENTICATED_MODE,
|
||||
DOMAIN,
|
||||
)
|
||||
from .utils import get_device_macs
|
||||
from .utils import get_device_macs, non_verifying_requests_session
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
@ -80,6 +81,13 @@ class ConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
self.context.get(CONF_URL, ""),
|
||||
),
|
||||
): str,
|
||||
vol.Optional(
|
||||
CONF_VERIFY_SSL,
|
||||
default=user_input.get(
|
||||
CONF_VERIFY_SSL,
|
||||
False,
|
||||
),
|
||||
): bool,
|
||||
vol.Optional(
|
||||
CONF_USERNAME, default=user_input.get(CONF_USERNAME) or ""
|
||||
): str,
|
||||
|
@ -119,11 +127,20 @@ class ConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
password = user_input.get(CONF_PASSWORD) or ""
|
||||
|
||||
def _get_connection() -> Connection:
|
||||
if (
|
||||
user_input[CONF_URL].startswith("https://")
|
||||
and not user_input[CONF_VERIFY_SSL]
|
||||
):
|
||||
requests_session = non_verifying_requests_session(user_input[CONF_URL])
|
||||
else:
|
||||
requests_session = None
|
||||
|
||||
return Connection(
|
||||
url=user_input[CONF_URL],
|
||||
username=username,
|
||||
password=password,
|
||||
timeout=CONNECTION_TIMEOUT,
|
||||
requests_session=requests_session,
|
||||
)
|
||||
|
||||
conn = None
|
||||
|
@ -140,6 +157,12 @@ class ConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
except ResponseErrorException:
|
||||
_LOGGER.warning("Response error", exc_info=True)
|
||||
errors["base"] = "response_error"
|
||||
except SSLError:
|
||||
_LOGGER.warning("SSL error", exc_info=True)
|
||||
if user_input[CONF_VERIFY_SSL]:
|
||||
errors[CONF_URL] = "ssl_error_try_unverified"
|
||||
else:
|
||||
errors[CONF_URL] = "ssl_error_try_plain"
|
||||
except Timeout:
|
||||
_LOGGER.warning("Connection timeout", exc_info=True)
|
||||
errors[CONF_URL] = "connection_timeout"
|
||||
|
@ -152,6 +175,7 @@ class ConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
def _disconnect(conn: Connection) -> None:
|
||||
try:
|
||||
conn.close()
|
||||
conn.requests_session.close()
|
||||
except Exception: # pylint: disable=broad-except
|
||||
_LOGGER.debug("Disconnect error", exc_info=True)
|
||||
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
"invalid_url": "Invalid URL",
|
||||
"login_attempts_exceeded": "Maximum login attempts exceeded, please try again later",
|
||||
"response_error": "Unknown error from device",
|
||||
"ssl_error_try_plain": "HTTPS error, please try a plain HTTP URL",
|
||||
"ssl_error_try_unverified": "HTTPS error, please try disabling certificate verification or a plain HTTP URL",
|
||||
"unknown": "[%key:common::config_flow::error::unknown%]"
|
||||
},
|
||||
"flow_title": "{name}",
|
||||
|
@ -30,7 +32,8 @@
|
|||
"data": {
|
||||
"password": "[%key:common::config_flow::data::password%]",
|
||||
"url": "[%key:common::config_flow::data::url%]",
|
||||
"username": "[%key:common::config_flow::data::username%]"
|
||||
"username": "[%key:common::config_flow::data::username%]",
|
||||
"verify_ssl": "[%key:common::config_flow::data::verify_ssl%]"
|
||||
},
|
||||
"description": "Enter device access details.",
|
||||
"title": "Configure Huawei LTE"
|
||||
|
|
|
@ -2,8 +2,13 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from contextlib import suppress
|
||||
import re
|
||||
from urllib.parse import urlparse
|
||||
import warnings
|
||||
|
||||
from huawei_lte_api.Session import GetResponseType
|
||||
import requests
|
||||
from urllib3.exceptions import InsecureRequestWarning
|
||||
|
||||
from homeassistant.helpers.device_registry import format_mac
|
||||
|
||||
|
@ -25,3 +30,18 @@ def get_device_macs(
|
|||
macs.extend(x.get("WifiMac") for x in wlan_settings["Ssids"]["Ssid"])
|
||||
|
||||
return sorted({format_mac(str(x)) for x in macs if x})
|
||||
|
||||
|
||||
def non_verifying_requests_session(url: str) -> requests.Session:
|
||||
"""Get requests.Session that does not verify HTTPS, filter warnings about it."""
|
||||
parsed_url = urlparse(url)
|
||||
assert parsed_url.hostname
|
||||
requests_session = requests.Session()
|
||||
requests_session.verify = False
|
||||
warnings.filterwarnings(
|
||||
"ignore",
|
||||
message=rf"^.*\b{re.escape(parsed_url.hostname)}\b",
|
||||
category=InsecureRequestWarning,
|
||||
module=r"^urllib3\.connectionpool$",
|
||||
)
|
||||
return requests_session
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
"""Tests for the Huawei LTE config flow."""
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
|
||||
from huawei_lte_api.enums.client import ResponseCodeEnum
|
||||
from huawei_lte_api.enums.user import LoginErrorEnum, LoginStateEnum, PasswordTypeEnum
|
||||
|
@ -18,6 +20,7 @@ from homeassistant.const import (
|
|||
CONF_RECIPIENT,
|
||||
CONF_URL,
|
||||
CONF_USERNAME,
|
||||
CONF_VERIFY_SSL,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
|
@ -25,8 +28,9 @@ from tests.common import MockConfigEntry
|
|||
|
||||
FIXTURE_UNIQUE_ID = "SERIALNUMBER"
|
||||
|
||||
FIXTURE_USER_INPUT = {
|
||||
FIXTURE_USER_INPUT: dict[str, Any] = {
|
||||
CONF_URL: "http://192.168.1.1/",
|
||||
CONF_VERIFY_SSL: False,
|
||||
CONF_USERNAME: "admin",
|
||||
CONF_PASSWORD: "secret",
|
||||
}
|
||||
|
@ -95,34 +99,59 @@ async def test_already_configured(
|
|||
assert result["reason"] == "already_configured"
|
||||
|
||||
|
||||
async def test_connection_error(
|
||||
hass: HomeAssistant, requests_mock: requests_mock.Mocker
|
||||
) -> None:
|
||||
"""Test we show user form on connection error."""
|
||||
requests_mock.request(ANY, ANY, exc=ConnectionError())
|
||||
@pytest.mark.parametrize(
|
||||
("exception", "errors", "data_patch"),
|
||||
(
|
||||
(ConnectionError(), {CONF_URL: "unknown"}, {}),
|
||||
(requests.exceptions.SSLError(), {CONF_URL: "ssl_error_try_plain"}, {}),
|
||||
(
|
||||
requests.exceptions.SSLError(),
|
||||
{CONF_URL: "ssl_error_try_unverified"},
|
||||
{CONF_VERIFY_SSL: True},
|
||||
),
|
||||
),
|
||||
)
|
||||
async def test_connection_errors(
|
||||
hass: HomeAssistant,
|
||||
requests_mock: requests_mock.Mocker,
|
||||
exception: Exception,
|
||||
errors: dict[str, str],
|
||||
data_patch: dict[str, Any],
|
||||
):
|
||||
"""Test we show user form on various errors."""
|
||||
requests_mock.request(ANY, ANY, exc=exception)
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}, data=FIXTURE_USER_INPUT
|
||||
DOMAIN,
|
||||
context={"source": config_entries.SOURCE_USER},
|
||||
data=FIXTURE_USER_INPUT | data_patch,
|
||||
)
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.FORM
|
||||
assert result["step_id"] == "user"
|
||||
assert result["errors"] == {CONF_URL: "unknown"}
|
||||
assert result["errors"] == errors
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def login_requests_mock(requests_mock):
|
||||
"""Set up a requests_mock with base mocks for login tests."""
|
||||
requests_mock.request(
|
||||
ANY, FIXTURE_USER_INPUT[CONF_URL], text='<meta name="csrf_token" content="x"/>'
|
||||
)
|
||||
requests_mock.request(
|
||||
ANY,
|
||||
f"{FIXTURE_USER_INPUT[CONF_URL]}api/user/state-login",
|
||||
text=(
|
||||
f"<response><State>{LoginStateEnum.LOGGED_OUT}</State>"
|
||||
f"<password_type>{PasswordTypeEnum.SHA256}</password_type></response>"
|
||||
),
|
||||
https_url = urlunparse(
|
||||
urlparse(FIXTURE_USER_INPUT[CONF_URL])._replace(scheme="https")
|
||||
)
|
||||
for url in FIXTURE_USER_INPUT[CONF_URL], https_url:
|
||||
requests_mock.request(ANY, url, text='<meta name="csrf_token" content="x"/>')
|
||||
requests_mock.request(
|
||||
ANY,
|
||||
f"{url}api/user/state-login",
|
||||
text=(
|
||||
f"<response><State>{LoginStateEnum.LOGGED_OUT}</State>"
|
||||
f"<password_type>{PasswordTypeEnum.SHA256}</password_type></response>"
|
||||
),
|
||||
)
|
||||
requests_mock.request(
|
||||
ANY,
|
||||
f"{url}api/user/logout",
|
||||
text="<response>OK</response>",
|
||||
)
|
||||
return requests_mock
|
||||
|
||||
|
||||
|
@ -194,11 +223,19 @@ async def test_login_error(
|
|||
assert result["errors"] == errors
|
||||
|
||||
|
||||
async def test_success(hass: HomeAssistant, login_requests_mock) -> None:
|
||||
@pytest.mark.parametrize("scheme", ("http", "https"))
|
||||
async def test_success(hass: HomeAssistant, login_requests_mock, scheme: str) -> None:
|
||||
"""Test successful flow provides entry creation data."""
|
||||
user_input = {
|
||||
**FIXTURE_USER_INPUT,
|
||||
CONF_URL: urlunparse(
|
||||
urlparse(FIXTURE_USER_INPUT[CONF_URL])._replace(scheme=scheme)
|
||||
),
|
||||
}
|
||||
|
||||
login_requests_mock.request(
|
||||
ANY,
|
||||
f"{FIXTURE_USER_INPUT[CONF_URL]}api/user/login",
|
||||
f"{user_input[CONF_URL]}api/user/login",
|
||||
text="<response>OK</response>",
|
||||
)
|
||||
with patch("homeassistant.components.huawei_lte.async_setup"), patch(
|
||||
|
@ -207,14 +244,14 @@ async def test_success(hass: HomeAssistant, login_requests_mock) -> None:
|
|||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": config_entries.SOURCE_USER},
|
||||
data=FIXTURE_USER_INPUT,
|
||||
data=user_input,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY
|
||||
assert result["data"][CONF_URL] == FIXTURE_USER_INPUT[CONF_URL]
|
||||
assert result["data"][CONF_USERNAME] == FIXTURE_USER_INPUT[CONF_USERNAME]
|
||||
assert result["data"][CONF_PASSWORD] == FIXTURE_USER_INPUT[CONF_PASSWORD]
|
||||
assert result["data"][CONF_URL] == user_input[CONF_URL]
|
||||
assert result["data"][CONF_USERNAME] == user_input[CONF_USERNAME]
|
||||
assert result["data"][CONF_PASSWORD] == user_input[CONF_PASSWORD]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -300,8 +337,9 @@ async def test_ssdp(
|
|||
)
|
||||
|
||||
for k, v in expected_result.items():
|
||||
assert result[k] == v
|
||||
assert result[k] == v # type: ignore[literal-required] # expected is a subset
|
||||
if result.get("data_schema"):
|
||||
assert result["data_schema"] is not None
|
||||
assert result["data_schema"]({})[CONF_URL] == url + "/"
|
||||
|
||||
|
||||
|
@ -355,6 +393,7 @@ async def test_reauth(
|
|||
|
||||
assert result["type"] == data_entry_flow.FlowResultType.FORM
|
||||
assert result["step_id"] == "reauth_confirm"
|
||||
assert result["data_schema"] is not None
|
||||
assert result["data_schema"]({}) == {
|
||||
CONF_USERNAME: mock_entry_data[CONF_USERNAME],
|
||||
CONF_PASSWORD: mock_entry_data[CONF_PASSWORD],
|
||||
|
@ -376,7 +415,7 @@ async def test_reauth(
|
|||
await hass.async_block_till_done()
|
||||
|
||||
for k, v in expected_result.items():
|
||||
assert result[k] == v
|
||||
assert result[k] == v # type: ignore[literal-required] # expected is a subset
|
||||
for k, v in expected_entry_data.items():
|
||||
assert entry.data[k] == v
|
||||
|
||||
|
|
Loading…
Reference in New Issue