Add 2SA authentication to Synology DSM (#34101)

Co-Authored-By: J. Nick Koston <nick@koston.org>

Co-authored-by: J. Nick Koston <nick@koston.org>
pull/34434/head
Quentame 2020-04-18 01:47:50 +02:00 committed by GitHub
parent 4484afc0d6
commit 30744bea9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 134 additions and 12 deletions

View File

@ -5,10 +5,17 @@
},
"error": {
"login": "Login error: please check your username & password",
"missing_data": "Missing data: please retry later or an other configuration"
"missing_data": "Missing data: please retry later or an other configuration",
"otp_failed": "Two-step authentication failed, retry with a new pass code"
},
"flow_title": "Synology DSM {name} ({host})",
"step": {
"2sa": {
"data": {
"otp_code": "Code"
},
"title": "Synology DSM: two-step authentication"
},
"link": {
"data": {
"api_version": "DSM version",

View File

@ -71,8 +71,11 @@ async def async_setup_entry(hass: HomeAssistantType, entry: ConfigEntry):
unit = hass.config.units.temperature_unit
use_ssl = entry.data[CONF_SSL]
api_version = entry.data.get(CONF_API_VERSION, DEFAULT_DSM_VERSION)
device_token = entry.data.get("device_token")
api = SynoApi(hass, host, port, username, password, unit, use_ssl, api_version)
api = SynoApi(
hass, host, port, username, password, unit, use_ssl, device_token, api_version
)
await api.async_setup()
@ -105,6 +108,7 @@ class SynoApi:
password: str,
temp_unit: str,
use_ssl: bool,
device_token: str,
api_version: int,
):
"""Initialize the API wrapper class."""
@ -114,6 +118,7 @@ class SynoApi:
self._username = username
self._password = password
self._use_ssl = use_ssl
self._device_token = device_token
self._api_version = api_version
self.temp_unit = temp_unit
@ -137,6 +142,7 @@ class SynoApi:
self._username,
self._password,
self._use_ssl,
device_token=self._device_token,
dsm_version=self._api_version,
)

View File

@ -3,6 +3,11 @@ import logging
from urllib.parse import urlparse
from synology_dsm import SynologyDSM
from synology_dsm.exceptions import (
SynologyDSMLogin2SAFailedException,
SynologyDSMLogin2SARequiredException,
SynologyDSMLoginInvalidException,
)
import voluptuous as vol
from homeassistant import config_entries, exceptions
@ -29,6 +34,8 @@ from .const import DOMAIN # pylint: disable=unused-import
_LOGGER = logging.getLogger(__name__)
CONF_OTP_CODE = "otp_code"
def _discovery_schema_with_defaults(discovery_info):
return vol.Schema(_ordered_shared_schema(discovery_info))
@ -66,6 +73,7 @@ class SynologyDSMFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
def __init__(self):
"""Initialize the synology_dsm config flow."""
self.saved_user_input = {}
self.discovered_conf = {}
async def _show_setup_form(self, user_input=None, errors=None):
@ -104,6 +112,7 @@ class SynologyDSMFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
password = user_input[CONF_PASSWORD]
use_ssl = user_input.get(CONF_SSL, DEFAULT_SSL)
api_version = user_input.get(CONF_API_VERSION, DEFAULT_DSM_VERSION)
otp_code = user_input.get(CONF_OTP_CODE)
if not port:
if use_ssl is True:
@ -117,9 +126,15 @@ class SynologyDSMFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
try:
serial = await self.hass.async_add_executor_job(
_login_and_fetch_syno_info, api
_login_and_fetch_syno_info, api, otp_code
)
except InvalidAuth:
except SynologyDSMLogin2SARequiredException:
return await self.async_step_2sa(user_input)
except SynologyDSMLogin2SAFailedException:
errors[CONF_OTP_CODE] = "otp_failed"
user_input[CONF_OTP_CODE] = None
return await self.async_step_2sa(user_input, errors)
except (SynologyDSMLoginInvalidException, InvalidAuth):
errors[CONF_USERNAME] = "login"
except InvalidData:
errors["base"] = "missing_data"
@ -139,6 +154,8 @@ class SynologyDSMFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
CONF_PASSWORD: password,
CONF_API_VERSION: api_version,
}
if otp_code:
config_data["device_token"] = api.device_token
if user_input.get(CONF_DISKS):
config_data[CONF_DISKS] = user_input[CONF_DISKS]
if user_input.get(CONF_VOLUMES):
@ -168,10 +185,27 @@ class SynologyDSMFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Import a config entry."""
return await self.async_step_user(user_input)
async def async_step_link(self, user_input=None):
async def async_step_link(self, user_input):
"""Link a config entry from discovery."""
return await self.async_step_user(user_input)
async def async_step_2sa(self, user_input, errors=None):
"""Enter 2SA code to anthenticate."""
if not self.saved_user_input:
self.saved_user_input = user_input
if not user_input.get(CONF_OTP_CODE):
return self.async_show_form(
step_id="2sa",
data_schema=vol.Schema({vol.Required(CONF_OTP_CODE): str}),
errors=errors or {},
)
user_input = {**self.saved_user_input, **user_input}
self.saved_user_input = {}
return await self.async_step_user(user_input)
def _host_already_configured(self, hostname):
"""See if we already have a host matching user input configured."""
existing_hosts = {
@ -180,9 +214,9 @@ class SynologyDSMFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
return hostname in existing_hosts
def _login_and_fetch_syno_info(api):
def _login_and_fetch_syno_info(api, otp_code):
"""Login to the NAS and fetch basic data."""
if not api.login():
if not api.login(otp_code):
raise InvalidAuth
# These do i/o

View File

@ -6,9 +6,9 @@ from homeassistant.const import (
)
DOMAIN = "synology_dsm"
BASE_NAME = "Synology"
CONF_VOLUMES = "volumes"
BASE_NAME = "Synology"
DEFAULT_SSL = True
DEFAULT_PORT = 5000
DEFAULT_PORT_SSL = 5001

View File

@ -2,7 +2,7 @@
"domain": "synology_dsm",
"name": "Synology DSM",
"documentation": "https://www.home-assistant.io/integrations/synology_dsm",
"requirements": ["python-synology==0.5.0"],
"requirements": ["python-synology==0.6.0"],
"codeowners": ["@ProtoThis", "@Quentame"],
"config_flow": true,
"ssdp": [

View File

@ -13,6 +13,12 @@
"password": "Password"
}
},
"2sa": {
"title": "Synology DSM: two-step authentication",
"data": {
"otp_code": "Code"
}
},
"link": {
"title": "Synology DSM",
"description": "Do you want to setup {name} ({host})?",
@ -27,7 +33,8 @@
},
"error": {
"login": "Login error: please check your username & password",
"missing_data": "Missing data: please retry later or an other configuration"
"missing_data": "Missing data: please retry later or an other configuration",
"otp_failed": "Two-step authentication failed, retry with a new pass code"
},
"abort": { "already_configured": "Host already configured" }
}

View File

@ -1673,7 +1673,7 @@ python-sochain-api==0.0.2
python-songpal==0.11.2
# homeassistant.components.synology_dsm
python-synology==0.5.0
python-synology==0.6.0
# homeassistant.components.tado
python-tado==0.6.0

View File

@ -637,7 +637,7 @@ python-miio==0.5.0.1
python-nest==4.1.0
# homeassistant.components.synology_dsm
python-synology==0.5.0
python-synology==0.6.0
# homeassistant.components.tado
python-tado==0.6.0

View File

@ -3,9 +3,14 @@ import logging
from unittest.mock import MagicMock, Mock, patch
import pytest
from synology_dsm.exceptions import (
SynologyDSMLogin2SAFailedException,
SynologyDSMLogin2SARequiredException,
)
from homeassistant import data_entry_flow, setup
from homeassistant.components import ssdp
from homeassistant.components.synology_dsm.config_flow import CONF_OTP_CODE
from homeassistant.components.synology_dsm.const import (
CONF_VOLUMES,
DEFAULT_DSM_VERSION,
@ -39,6 +44,7 @@ PORT = 1234
SSL = True
USERNAME = "Home_Assistant"
PASSWORD = "password"
DEVICE_TOKEN = "Dév!cè_T0k€ñ"
@pytest.fixture(name="service")
@ -54,6 +60,22 @@ def mock_controller_service():
yield service_mock
@pytest.fixture(name="service_2sa")
def mock_controller_service_2sa():
"""Mock a successful service with 2SA login."""
with patch(
"homeassistant.components.synology_dsm.config_flow.SynologyDSM"
) as service_mock:
service_mock.return_value.login = Mock(
side_effect=SynologyDSMLogin2SARequiredException(USERNAME)
)
service_mock.return_value.information.serial = SERIAL
service_mock.return_value.utilisation.cpu_user_load = 1
service_mock.return_value.storage.disks_ids = []
service_mock.return_value.storage.volumes_ids = []
yield service_mock
@pytest.fixture(name="service_login_failed")
def mock_controller_service_login_failed():
"""Mock a failed login."""
@ -107,6 +129,7 @@ async def test_user(hass: HomeAssistantType, service: MagicMock):
assert result["data"][CONF_USERNAME] == USERNAME
assert result["data"][CONF_PASSWORD] == PASSWORD
assert result["data"][CONF_API_VERSION] == 5
assert result["data"].get("device_token") is None
assert result["data"].get(CONF_DISKS) is None
assert result["data"].get(CONF_VOLUMES) is None
@ -131,6 +154,48 @@ async def test_user(hass: HomeAssistantType, service: MagicMock):
assert result["data"][CONF_USERNAME] == USERNAME
assert result["data"][CONF_PASSWORD] == PASSWORD
assert result["data"][CONF_API_VERSION] == DEFAULT_DSM_VERSION
assert result["data"].get("device_token") is None
assert result["data"].get(CONF_DISKS) is None
assert result["data"].get(CONF_VOLUMES) is None
async def test_user_2sa(hass: HomeAssistantType, service_2sa: MagicMock):
"""Test user with 2sa authentication config."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
data={CONF_HOST: HOST, CONF_USERNAME: USERNAME, CONF_PASSWORD: PASSWORD},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "2sa"
# Failed the first time because was too slow to enter the code
service_2sa.return_value.login = Mock(
side_effect=SynologyDSMLogin2SAFailedException
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_OTP_CODE: "000000"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "2sa"
assert result["errors"] == {CONF_OTP_CODE: "otp_failed"}
# Successful login with 2SA code
service_2sa.return_value.login = Mock(return_value=True)
service_2sa.return_value.device_token = DEVICE_TOKEN
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_OTP_CODE: "123456"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["result"].unique_id == SERIAL
assert result["title"] == HOST
assert result["data"][CONF_HOST] == HOST
assert result["data"][CONF_PORT] == DEFAULT_PORT_SSL
assert result["data"][CONF_SSL] == DEFAULT_SSL
assert result["data"][CONF_USERNAME] == USERNAME
assert result["data"][CONF_PASSWORD] == PASSWORD
assert result["data"].get("device_token") == DEVICE_TOKEN
assert result["data"].get(CONF_DISKS) is None
assert result["data"].get(CONF_VOLUMES) is None
@ -152,6 +217,7 @@ async def test_import(hass: HomeAssistantType, service: MagicMock):
assert result["data"][CONF_USERNAME] == USERNAME
assert result["data"][CONF_PASSWORD] == PASSWORD
assert result["data"][CONF_API_VERSION] == DEFAULT_DSM_VERSION
assert result["data"].get("device_token") is None
assert result["data"].get(CONF_DISKS) is None
assert result["data"].get(CONF_VOLUMES) is None
@ -180,6 +246,7 @@ async def test_import(hass: HomeAssistantType, service: MagicMock):
assert result["data"][CONF_USERNAME] == USERNAME
assert result["data"][CONF_PASSWORD] == PASSWORD
assert result["data"][CONF_API_VERSION] == 5
assert result["data"].get("device_token") is None
assert result["data"][CONF_DISKS] == ["sda", "sdb", "sdc"]
assert result["data"][CONF_VOLUMES] == ["volume_1"]
@ -263,5 +330,6 @@ async def test_form_ssdp(hass: HomeAssistantType, service: MagicMock):
assert result["data"][CONF_USERNAME] == USERNAME
assert result["data"][CONF_PASSWORD] == PASSWORD
assert result["data"][CONF_API_VERSION] == DEFAULT_DSM_VERSION
assert result["data"].get("device_token") is None
assert result["data"].get(CONF_DISKS) is None
assert result["data"].get(CONF_VOLUMES) is None