Reolink check for admin (#85570)

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
fixes undefined
pull/86058/head
starkillerOG 2023-01-13 20:07:09 +01:00 committed by Paulus Schoutsen
parent 1d5ecdd4ea
commit cb27cfe7dd
7 changed files with 171 additions and 27 deletions

View File

@ -14,10 +14,11 @@ from reolink_aio.exceptions import ApiError, InvalidContentTypeError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EVENT_HOMEASSISTANT_STOP, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import DOMAIN
from .exceptions import UserNotAdmin
from .host import ReolinkHost
_LOGGER = logging.getLogger(__name__)
@ -40,16 +41,20 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> b
try:
if not await host.async_init():
await host.stop()
raise ConfigEntryNotReady(
f"Error while trying to setup {host.api.host}:{host.api.port}: "
"failed to obtain data from device."
)
except UserNotAdmin as err:
raise ConfigEntryAuthFailed(err) from UserNotAdmin
except (
ClientConnectorError,
asyncio.TimeoutError,
ApiError,
InvalidContentTypeError,
) as err:
await host.stop()
raise ConfigEntryNotReady(
f'Error while trying to setup {host.api.host}:{host.api.port}: "{str(err)}".'
) from err

View File

@ -1,19 +1,21 @@
"""Config flow for the Reolink camera component."""
from __future__ import annotations
from collections.abc import Mapping
import logging
from typing import Any
from reolink_aio.exceptions import ApiError, CredentialsInvalidError
import voluptuous as vol
from homeassistant import config_entries, core, exceptions
from homeassistant import config_entries, exceptions
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_PORT, CONF_USERNAME
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import config_validation as cv
from .const import CONF_PROTOCOL, CONF_USE_HTTPS, DEFAULT_PROTOCOL, DOMAIN
from .exceptions import UserNotAdmin
from .host import ReolinkHost
_LOGGER = logging.getLogger(__name__)
@ -53,6 +55,13 @@ class ReolinkFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1
def __init__(self) -> None:
"""Initialize."""
self._host: str | None = None
self._username: str = "admin"
self._password: str | None = None
self._reauth: bool = False
@staticmethod
@callback
def async_get_options_flow(
@ -61,16 +70,37 @@ class ReolinkFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Options callback for Reolink."""
return ReolinkOptionsFlowHandler(config_entry)
async def async_step_reauth(self, entry_data: Mapping[str, Any]) -> FlowResult:
"""Perform reauth upon an authentication error or no admin privileges."""
self._host = entry_data[CONF_HOST]
self._username = entry_data[CONF_USERNAME]
self._password = entry_data[CONF_PASSWORD]
self._reauth = True
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Dialog that informs the user that reauth is required."""
if user_input is not None:
return await self.async_step_user()
return self.async_show_form(step_id="reauth_confirm")
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the initial step."""
errors = {}
placeholders = {}
placeholders = {"error": ""}
if user_input is not None:
host = ReolinkHost(self.hass, user_input, DEFAULT_OPTIONS)
try:
host = await async_obtain_host_settings(self.hass, user_input)
await async_obtain_host_settings(host)
except UserNotAdmin:
errors[CONF_USERNAME] = "not_admin"
placeholders["username"] = host.api.username
placeholders["userlevel"] = host.api.user_level
except CannotConnect:
errors[CONF_HOST] = "cannot_connect"
except CredentialsInvalidError:
@ -87,7 +117,17 @@ class ReolinkFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
user_input[CONF_PORT] = host.api.port
user_input[CONF_USE_HTTPS] = host.api.use_https
await self.async_set_unique_id(host.unique_id, raise_on_progress=False)
existing_entry = await self.async_set_unique_id(
host.unique_id, raise_on_progress=False
)
if existing_entry and self._reauth:
if self.hass.config_entries.async_update_entry(
existing_entry, data=user_input
):
await self.hass.config_entries.async_reload(
existing_entry.entry_id
)
return self.async_abort(reason="reauth_successful")
self._abort_if_unique_id_configured(updates=user_input)
return self.async_create_entry(
@ -98,9 +138,9 @@ class ReolinkFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
data_schema = vol.Schema(
{
vol.Required(CONF_USERNAME, default="admin"): str,
vol.Required(CONF_PASSWORD): str,
vol.Required(CONF_HOST): str,
vol.Required(CONF_USERNAME, default=self._username): str,
vol.Required(CONF_PASSWORD, default=self._password): str,
vol.Required(CONF_HOST, default=self._host): str,
}
)
if errors:
@ -119,20 +159,14 @@ class ReolinkFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
)
async def async_obtain_host_settings(
hass: core.HomeAssistant, user_input: dict
) -> ReolinkHost:
async def async_obtain_host_settings(host: ReolinkHost) -> None:
"""Initialize the Reolink host and get the host information."""
host = ReolinkHost(hass, user_input, DEFAULT_OPTIONS)
try:
if not await host.async_init():
raise CannotConnect
finally:
await host.stop()
return host
class CannotConnect(exceptions.HomeAssistantError):
"""Error to indicate we cannot connect."""

View File

@ -0,0 +1,6 @@
"""Exceptions for the Reolink Camera integration."""
from homeassistant.exceptions import HomeAssistantError
class UserNotAdmin(HomeAssistantError):
"""Raised when user is not admin."""

View File

@ -19,6 +19,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import format_mac
from .const import CONF_PROTOCOL, CONF_USE_HTTPS, DEFAULT_TIMEOUT
from .exceptions import UserNotAdmin
_LOGGER = logging.getLogger(__name__)
@ -68,6 +69,12 @@ class ReolinkHost:
if self._api.mac_address is None:
return False
if not self._api.is_admin:
await self.stop()
raise UserNotAdmin(
f"User '{self._api.username}' has authorization level '{self._api.user_level}', only admin users can change camera settings"
)
enable_onvif = None
enable_rtmp = None
enable_rtsp = None

View File

@ -2,6 +2,7 @@
"config": {
"step": {
"user": {
"description": "{error}",
"data": {
"host": "[%key:common::config_flow::data::host%]",
"port": "[%key:common::config_flow::data::port%]",
@ -9,16 +10,22 @@
"username": "[%key:common::config_flow::data::username%]",
"password": "[%key:common::config_flow::data::password%]"
}
},
"reauth_confirm": {
"title": "[%key:common::config_flow::title::reauth%]",
"description": "The Reolink integration needs to re-authenticate your connection details"
}
},
"error": {
"api_error": "API error occurred: {error}",
"api_error": "API error occurred",
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"unknown": "[%key:common::config_flow::error::unknown%]: {error}"
"not_admin": "User needs to be admin, user ''{username}'' has authorisation level ''{userlevel}''",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
}
},
"options": {

View File

@ -1,15 +1,21 @@
{
"config": {
"abort": {
"already_configured": "Device is already configured"
"already_configured": "Device is already configured",
"reauth_successful": "Re-authentication was successful"
},
"error": {
"api_error": "API error occurred: {error}",
"api_error": "API error occurred",
"cannot_connect": "Failed to connect",
"invalid_auth": "Invalid authentication",
"unknown": "Unexpected error: {error}"
"not_admin": "User needs to be admin, user ''{username}'' has authorisation level ''{userlevel}''",
"unknown": "Unexpected error"
},
"step": {
"reauth_confirm": {
"description": "The Reolink integration needs to re-authenticate your connection details",
"title": "Reauthenticate Integration"
},
"user": {
"data": {
"host": "Host",
@ -17,7 +23,8 @@
"port": "Port",
"use_https": "Enable HTTPS",
"username": "Username"
}
},
"description": "{error}"
}
}
},

View File

@ -24,7 +24,7 @@ TEST_NVR_NAME = "test_reolink_name"
TEST_USE_HTTPS = True
def get_mock_info(error=None, host_data_return=True):
def get_mock_info(error=None, host_data_return=True, user_level="admin"):
"""Return a mock gateway info instance."""
host_mock = Mock()
if error is None:
@ -40,6 +40,8 @@ def get_mock_info(error=None, host_data_return=True):
host_mock.nvr_name = TEST_NVR_NAME
host_mock.port = TEST_PORT
host_mock.use_https = TEST_USE_HTTPS
host_mock.is_admin = user_level == "admin"
host_mock.user_level = user_level
return host_mock
@ -110,7 +112,22 @@ async def test_config_flow_errors(hass):
assert result["type"] is data_entry_flow.FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {"host": "cannot_connect"}
assert result["errors"] == {CONF_HOST: "cannot_connect"}
host_mock = get_mock_info(user_level="guest")
with patch("homeassistant.components.reolink.host.Host", return_value=host_mock):
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_USERNAME: TEST_USERNAME,
CONF_PASSWORD: TEST_PASSWORD,
CONF_HOST: TEST_HOST,
},
)
assert result["type"] is data_entry_flow.FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {CONF_USERNAME: "not_admin"}
host_mock = get_mock_info(error=json.JSONDecodeError("test_error", "test", 1))
with patch("homeassistant.components.reolink.host.Host", return_value=host_mock):
@ -125,7 +142,7 @@ async def test_config_flow_errors(hass):
assert result["type"] is data_entry_flow.FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {"host": "unknown"}
assert result["errors"] == {CONF_HOST: "unknown"}
host_mock = get_mock_info(error=CredentialsInvalidError("Test error"))
with patch("homeassistant.components.reolink.host.Host", return_value=host_mock):
@ -140,7 +157,7 @@ async def test_config_flow_errors(hass):
assert result["type"] is data_entry_flow.FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {"host": "invalid_auth"}
assert result["errors"] == {CONF_HOST: "invalid_auth"}
host_mock = get_mock_info(error=ApiError("Test error"))
with patch("homeassistant.components.reolink.host.Host", return_value=host_mock):
@ -155,7 +172,7 @@ async def test_config_flow_errors(hass):
assert result["type"] is data_entry_flow.FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {"host": "api_error"}
assert result["errors"] == {CONF_HOST: "api_error"}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
@ -261,3 +278,64 @@ async def test_change_connection_settings(hass):
assert config_entry.data[CONF_HOST] == TEST_HOST2
assert config_entry.data[CONF_USERNAME] == TEST_USERNAME2
assert config_entry.data[CONF_PASSWORD] == TEST_PASSWORD2
async def test_reauth(hass):
"""Test a reauth flow."""
config_entry = MockConfigEntry(
domain=const.DOMAIN,
unique_id=format_mac(TEST_MAC),
data={
CONF_HOST: TEST_HOST,
CONF_USERNAME: TEST_USERNAME,
CONF_PASSWORD: TEST_PASSWORD,
CONF_PORT: TEST_PORT,
const.CONF_USE_HTTPS: TEST_USE_HTTPS,
},
options={
const.CONF_PROTOCOL: const.DEFAULT_PROTOCOL,
},
title=TEST_NVR_NAME,
)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
result = await hass.config_entries.flow.async_init(
const.DOMAIN,
context={
"source": config_entries.SOURCE_REAUTH,
"entry_id": config_entry.entry_id,
"title_placeholders": {"name": TEST_NVR_NAME},
"unique_id": format_mac(TEST_MAC),
},
data=config_entry.data,
)
assert result["type"] == "form"
assert result["step_id"] == "reauth_confirm"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{},
)
assert result["type"] == "form"
assert result["step_id"] == "user"
assert result["errors"] == {}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_HOST: TEST_HOST2,
CONF_USERNAME: TEST_USERNAME2,
CONF_PASSWORD: TEST_PASSWORD2,
},
)
assert result["type"] == "abort"
assert result["reason"] == "reauth_successful"
assert config_entry.data[CONF_HOST] == TEST_HOST2
assert config_entry.data[CONF_USERNAME] == TEST_USERNAME2
assert config_entry.data[CONF_PASSWORD] == TEST_PASSWORD2