Add reauth to onvif (#91957)

Co-authored-by: Franck Nijhof <git@frenck.dev>
pull/92020/head
J. Nick Koston 2023-04-25 12:20:17 -05:00 committed by GitHub
parent 8e70446ef8
commit 2354f8194e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 241 additions and 19 deletions

View File

@ -15,10 +15,11 @@ from homeassistant.const import (
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from .const import CONF_SNAPSHOT_AUTH, DEFAULT_ARGUMENTS, DOMAIN
from .device import ONVIFDevice
from .util import is_auth_error, stringify_onvif_error
LOGGER = logging.getLogger(__name__)
@ -44,10 +45,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
) from err
except Fault as err:
await device.device.close()
# We do no know if the credentials are wrong or the camera is
# still booting up, so we will retry later
if is_auth_error(err):
raise ConfigEntryAuthFailed(
f"Auth Failed: {stringify_onvif_error(err)}"
) from err
raise ConfigEntryNotReady(
f"Could not connect to camera, verify credentials are correct: {err}"
f"Could not connect to camera: {stringify_onvif_error(err)}"
) from err
except ONVIFError as err:
await device.device.close()

View File

@ -1,6 +1,7 @@
"""Config flow for ONVIF."""
from __future__ import annotations
from collections.abc import Mapping
from pprint import pformat
from typing import Any
from urllib.parse import urlparse
@ -39,7 +40,7 @@ from .const import (
LOGGER,
)
from .device import get_device
from .util import stringify_onvif_error
from .util import is_auth_error, stringify_onvif_error
CONF_MANUAL_INPUT = "Manually configure ONVIF device"
@ -84,6 +85,7 @@ class OnvifFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a ONVIF config flow."""
VERSION = 1
_reauth_entry: config_entries.ConfigEntry
@staticmethod
@callback
@ -111,6 +113,44 @@ class OnvifFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
data_schema=vol.Schema({vol.Required("auto", default=True): bool}),
)
async def async_step_reauth(self, entry_data: Mapping[str, Any]) -> FlowResult:
"""Handle re-authentication of an existing config entry."""
reauth_entry = self.hass.config_entries.async_get_entry(
self.context["entry_id"]
)
assert reauth_entry is not None
self._reauth_entry = reauth_entry
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Confirm reauth."""
entry = self._reauth_entry
errors: dict[str, str] | None = {}
description_placeholders: dict[str, str] | None = None
if user_input is not None:
entry_data = entry.data
self.onvif_config = entry_data | user_input
errors, description_placeholders = await self.async_setup_profiles(
configure_unique_id=False
)
if not errors:
hass = self.hass
entry_id = entry.entry_id
hass.config_entries.async_update_entry(entry, data=self.onvif_config)
hass.async_create_task(hass.config_entries.async_reload(entry_id))
return self.async_abort(reason="reauth_successful")
return self.async_show_form(
step_id="reauth_confirm",
data_schema=vol.Schema(
{vol.Required(CONF_USERNAME): str, vol.Required(CONF_PASSWORD): str}
),
errors=errors,
description_placeholders=description_placeholders,
)
async def async_step_dhcp(self, discovery_info: dhcp.DhcpServiceInfo) -> FlowResult:
"""Handle dhcp discovery."""
hass = self.hass
@ -217,7 +257,9 @@ class OnvifFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
description_placeholders=description_placeholders,
)
async def async_setup_profiles(self) -> tuple[dict[str, str], dict[str, str]]:
async def async_setup_profiles(
self, configure_unique_id: bool = True
) -> tuple[dict[str, str], dict[str, str]]:
"""Fetch ONVIF device profiles."""
LOGGER.debug(
"Fetching profiles from ONVIF device %s", pformat(self.onvif_config)
@ -260,21 +302,24 @@ class OnvifFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
if not self.device_id:
raise AbortFlow(reason="no_mac")
await self.async_set_unique_id(self.device_id, raise_on_progress=False)
self._abort_if_unique_id_configured(
updates={
CONF_HOST: self.onvif_config[CONF_HOST],
CONF_PORT: self.onvif_config[CONF_PORT],
CONF_NAME: self.onvif_config[CONF_NAME],
}
)
if configure_unique_id:
await self.async_set_unique_id(self.device_id, raise_on_progress=False)
self._abort_if_unique_id_configured(
updates={
CONF_HOST: self.onvif_config[CONF_HOST],
CONF_PORT: self.onvif_config[CONF_PORT],
CONF_NAME: self.onvif_config[CONF_NAME],
CONF_USERNAME: self.onvif_config[CONF_USERNAME],
CONF_PASSWORD: self.onvif_config[CONF_PASSWORD],
}
)
# Verify there is an H264 profile
media_service = device.create_media_service()
profiles = await media_service.GetProfiles()
except Fault as err:
stringified_error = stringify_onvif_error(err)
description_placeholders = {"error": stringified_error}
if "auth" in stringified_error.lower():
if is_auth_error(err):
LOGGER.debug(
"%s: Could not authenticate with camera: %s",
self.onvif_config[CONF_NAME],

View File

@ -5,7 +5,8 @@
"no_devices_found": "[%key:common::config_flow::abort::no_devices_found%]",
"already_in_progress": "[%key:common::config_flow::abort::already_in_progress%]",
"no_h264": "There were no H264 streams available. Check the profile configuration on your device.",
"no_mac": "Could not configure unique ID for ONVIF device."
"no_mac": "Could not configure unique ID for ONVIF device.",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
},
"error": {
"onvif_error": "Error setting up ONVIF device: {error}. Check logs for more information.",
@ -42,6 +43,13 @@
"data": {
"include": "Create camera entity"
}
},
"reauth_confirm": {
"title": "Reauthenticate the ONVIF device",
"data": {
"username": "[%key:common::config_flow::data::username%]",
"password": "[%key:common::config_flow::data::password%]"
}
}
}
},

View File

@ -1,11 +1,49 @@
"""ONVIF util."""
from __future__ import annotations
from typing import Any
from zeep.exceptions import Fault
def extract_subcodes_as_strings(subcodes: Any) -> list[str]:
"""Stringify ONVIF subcodes."""
if isinstance(subcodes, list):
return [code.text if hasattr(code, "text") else str(code) for code in subcodes]
return [str(subcodes)]
def stringify_onvif_error(error: Exception) -> str:
"""Stringify ONVIF error."""
if isinstance(error, Fault):
return error.message or str(error) or "Device sent empty error"
return str(error)
message = error.message
if error.detail:
message += ": " + error.detail
if error.code:
message += f" (code:{error.code})"
if error.subcodes:
message += (
f" (subcodes:{','.join(extract_subcodes_as_strings(error.subcodes))})"
)
if error.actor:
message += f" (actor:{error.actor})"
else:
message = str(error)
return message or "Device sent empty error"
def is_auth_error(error: Exception) -> bool:
"""Return True if error is an authentication error.
Most of the tested cameras do not return a proper error code when
authentication fails, so we need to check the error message as well.
"""
if not isinstance(error, Fault):
return False
return (
any(
"NotAuthorized" in code
for code in extract_subcodes_as_strings(error.subcodes)
)
or "auth" in stringify_onvif_error(error).lower()
)

View File

@ -44,6 +44,7 @@ def setup_mock_onvif_camera(
auth_fail=False,
update_xaddrs_fail=False,
no_profiles=False,
auth_failure=False,
):
"""Prepare mock onvif.ONVIFCamera."""
devicemgmt = MagicMock()
@ -81,7 +82,13 @@ def setup_mock_onvif_camera(
else:
media_service.GetProfiles = AsyncMock(return_value=[profile1, profile2])
if update_xaddrs_fail:
if auth_failure:
mock_onvif_camera.update_xaddrs = AsyncMock(
side_effect=Fault(
"not authorized", subcodes=[MagicMock(text="NotAuthorized")]
)
)
elif update_xaddrs_fail:
mock_onvif_camera.update_xaddrs = AsyncMock(
side_effect=ONVIFError("camera not ready")
)

View File

@ -708,3 +708,124 @@ async def test_discovered_by_dhcp_does_not_update_if_no_matching_entry(
assert result["type"] == FlowResultType.ABORT
assert result["reason"] == "no_devices_found"
async def test_form_reauth(hass: HomeAssistant) -> None:
"""Test reauthenticate."""
entry, _, _ = await setup_onvif_integration(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_REAUTH, "entry_id": entry.entry_id},
data=entry.data,
)
assert result["type"] == FlowResultType.FORM
assert result["step_id"] == "reauth_confirm"
with patch(
"homeassistant.components.onvif.config_flow.get_device"
) as mock_onvif_camera, patch(
"homeassistant.components.onvif.ONVIFDevice"
) as mock_device, patch(
"homeassistant.components.onvif.async_setup_entry",
return_value=True,
) as mock_setup_entry:
setup_mock_onvif_camera(mock_onvif_camera, auth_failure=True)
setup_mock_device(mock_device)
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
config_flow.CONF_USERNAME: "new-test-username",
config_flow.CONF_PASSWORD: "new-test-password",
},
)
await hass.async_block_till_done()
assert result2["type"] == FlowResultType.FORM
assert result2["step_id"] == "reauth_confirm"
assert result2["errors"] == {config_flow.CONF_PASSWORD: "auth_failed"}
assert result2["description_placeholders"] == {
"error": "not authorized (subcodes:NotAuthorized)"
}
with patch(
"homeassistant.components.onvif.config_flow.get_device"
) as mock_onvif_camera, patch(
"homeassistant.components.onvif.ONVIFDevice"
) as mock_device, patch(
"homeassistant.components.onvif.async_setup_entry",
return_value=True,
) as mock_setup_entry:
setup_mock_onvif_camera(mock_onvif_camera)
setup_mock_device(mock_device)
result3 = await hass.config_entries.flow.async_configure(
result2["flow_id"],
{
config_flow.CONF_USERNAME: "new-test-username",
config_flow.CONF_PASSWORD: "new-test-password",
},
)
await hass.async_block_till_done()
assert result3["type"] == FlowResultType.ABORT
assert result3["reason"] == "reauth_successful"
assert len(mock_setup_entry.mock_calls) == 1
assert entry.data[config_flow.CONF_USERNAME] == "new-test-username"
assert entry.data[config_flow.CONF_PASSWORD] == "new-test-password"
async def test_flow_manual_entry_updates_existing_user_password(
hass: HomeAssistant,
) -> None:
"""Test that the existing username and password can be updated via manual entry."""
entry, _, _ = await setup_onvif_integration(hass)
result = await hass.config_entries.flow.async_init(
config_flow.DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == data_entry_flow.FlowResultType.FORM
assert result["step_id"] == "user"
with patch(
"homeassistant.components.onvif.config_flow.get_device"
) as mock_onvif_camera, patch(
"homeassistant.components.onvif.config_flow.wsdiscovery"
) as mock_discovery, patch(
"homeassistant.components.onvif.ONVIFDevice"
) as mock_device:
setup_mock_onvif_camera(mock_onvif_camera, two_profiles=True)
# no discovery
mock_discovery.return_value = []
setup_mock_device(mock_device)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={"auto": False},
)
assert result["type"] == data_entry_flow.FlowResultType.FORM
assert result["step_id"] == "configure"
with patch(
"homeassistant.components.onvif.async_setup_entry", return_value=True
) as mock_setup_entry:
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={
config_flow.CONF_NAME: NAME,
config_flow.CONF_HOST: HOST,
config_flow.CONF_PORT: PORT,
config_flow.CONF_USERNAME: USERNAME,
config_flow.CONF_PASSWORD: "new_password",
},
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.FlowResultType.ABORT
assert result["reason"] == "already_configured"
assert entry.data[config_flow.CONF_USERNAME] == USERNAME
assert entry.data[config_flow.CONF_PASSWORD] == "new_password"
assert len(mock_setup_entry.mock_calls) == 1