Fix netatmo authentication when using cloud authentication credentials (#104021)

* Fix netatmo authentication loop

* Update unit tests

* Move logic to determine api scopes

* Add unit tests for new method

* Use pyatmo scope list (#1)

* Exclude scopes not working with cloud

* Fix linting error

---------

Co-authored-by: Tobias Sauerwein <cgtobi@users.noreply.github.com>
pull/104348/head
deosrc 2023-11-15 20:28:16 +00:00 committed by Franck Nijhof
parent c241c2f79c
commit 399299c13c
No known key found for this signature in database
GPG Key ID: D62583BA8AB11CA3
5 changed files with 54 additions and 22 deletions

View File

@ -8,7 +8,6 @@ from typing import Any
import aiohttp import aiohttp
import pyatmo import pyatmo
from pyatmo.const import ALL_SCOPES as NETATMO_SCOPES
import voluptuous as vol import voluptuous as vol
from homeassistant.components import cloud from homeassistant.components import cloud
@ -143,7 +142,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
try: try:
await session.async_ensure_token_valid() await session.async_ensure_token_valid()
except aiohttp.ClientResponseError as ex: except aiohttp.ClientResponseError as ex:
_LOGGER.debug("API error: %s (%s)", ex.status, ex.message) _LOGGER.warning("API error: %s (%s)", ex.status, ex.message)
if ex.status in ( if ex.status in (
HTTPStatus.BAD_REQUEST, HTTPStatus.BAD_REQUEST,
HTTPStatus.UNAUTHORIZED, HTTPStatus.UNAUTHORIZED,
@ -152,19 +151,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
raise ConfigEntryAuthFailed("Token not valid, trigger renewal") from ex raise ConfigEntryAuthFailed("Token not valid, trigger renewal") from ex
raise ConfigEntryNotReady from ex raise ConfigEntryNotReady from ex
if entry.data["auth_implementation"] == cloud.DOMAIN: required_scopes = api.get_api_scopes(entry.data["auth_implementation"])
required_scopes = { if not (set(session.token["scope"]) & set(required_scopes)):
scope _LOGGER.warning(
for scope in NETATMO_SCOPES
if scope not in ("access_doorbell", "read_doorbell")
}
else:
required_scopes = set(NETATMO_SCOPES)
if not (set(session.token["scope"]) & required_scopes):
_LOGGER.debug(
"Session is missing scopes: %s", "Session is missing scopes: %s",
required_scopes - set(session.token["scope"]), set(required_scopes) - set(session.token["scope"]),
) )
raise ConfigEntryAuthFailed("Token scope not valid, trigger renewal") raise ConfigEntryAuthFailed("Token scope not valid, trigger renewal")

View File

@ -1,11 +1,29 @@
"""API for Netatmo bound to HASS OAuth.""" """API for Netatmo bound to HASS OAuth."""
from collections.abc import Iterable
from typing import cast from typing import cast
from aiohttp import ClientSession from aiohttp import ClientSession
import pyatmo import pyatmo
from homeassistant.components import cloud
from homeassistant.helpers import config_entry_oauth2_flow from homeassistant.helpers import config_entry_oauth2_flow
from .const import API_SCOPES_EXCLUDED_FROM_CLOUD
def get_api_scopes(auth_implementation: str) -> Iterable[str]:
"""Return the Netatmo API scopes based on the auth implementation."""
if auth_implementation == cloud.DOMAIN:
return set(
{
scope
for scope in pyatmo.const.ALL_SCOPES
if scope not in API_SCOPES_EXCLUDED_FROM_CLOUD
}
)
return sorted(pyatmo.const.ALL_SCOPES)
class AsyncConfigEntryNetatmoAuth(pyatmo.AbstractAsyncAuth): class AsyncConfigEntryNetatmoAuth(pyatmo.AbstractAsyncAuth):
"""Provide Netatmo authentication tied to an OAuth2 based config entry.""" """Provide Netatmo authentication tied to an OAuth2 based config entry."""

View File

@ -6,7 +6,6 @@ import logging
from typing import Any from typing import Any
import uuid import uuid
from pyatmo.const import ALL_SCOPES
import voluptuous as vol import voluptuous as vol
from homeassistant import config_entries from homeassistant import config_entries
@ -15,6 +14,7 @@ from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import config_entry_oauth2_flow, config_validation as cv from homeassistant.helpers import config_entry_oauth2_flow, config_validation as cv
from .api import get_api_scopes
from .const import ( from .const import (
CONF_AREA_NAME, CONF_AREA_NAME,
CONF_LAT_NE, CONF_LAT_NE,
@ -53,13 +53,7 @@ class NetatmoFlowHandler(
@property @property
def extra_authorize_data(self) -> dict: def extra_authorize_data(self) -> dict:
"""Extra data that needs to be appended to the authorize url.""" """Extra data that needs to be appended to the authorize url."""
exclude = [] scopes = get_api_scopes(self.flow_impl.domain)
if self.flow_impl.name == "Home Assistant Cloud":
exclude = ["access_doorbell", "read_doorbell"]
scopes = [scope for scope in ALL_SCOPES if scope not in exclude]
scopes.sort()
return {"scope": " ".join(scopes)} return {"scope": " ".join(scopes)}
async def async_step_user(self, user_input: dict | None = None) -> FlowResult: async def async_step_user(self, user_input: dict | None = None) -> FlowResult:

View File

@ -30,6 +30,13 @@ HOME_DATA = "netatmo_home_data"
DATA_HANDLER = "netatmo_data_handler" DATA_HANDLER = "netatmo_data_handler"
SIGNAL_NAME = "signal_name" SIGNAL_NAME = "signal_name"
API_SCOPES_EXCLUDED_FROM_CLOUD = [
"access_doorbell",
"read_doorbell",
"read_mhs1",
"write_mhs1",
]
NETATMO_CREATE_BATTERY = "netatmo_create_battery" NETATMO_CREATE_BATTERY = "netatmo_create_battery"
NETATMO_CREATE_CAMERA = "netatmo_create_camera" NETATMO_CREATE_CAMERA = "netatmo_create_camera"
NETATMO_CREATE_CAMERA_LIGHT = "netatmo_create_camera_light" NETATMO_CREATE_CAMERA_LIGHT = "netatmo_create_camera_light"

View File

@ -0,0 +1,22 @@
"""The tests for the Netatmo api."""
from pyatmo.const import ALL_SCOPES
from homeassistant.components import cloud
from homeassistant.components.netatmo import api
from homeassistant.components.netatmo.const import API_SCOPES_EXCLUDED_FROM_CLOUD
async def test_get_api_scopes_cloud() -> None:
"""Test method to get API scopes when using cloud auth implementation."""
result = api.get_api_scopes(cloud.DOMAIN)
for scope in API_SCOPES_EXCLUDED_FROM_CLOUD:
assert scope not in result
async def test_get_api_scopes_other() -> None:
"""Test method to get API scopes when using cloud auth implementation."""
result = api.get_api_scopes("netatmo_239846i2f0j2")
assert sorted(ALL_SCOPES) == result