421 lines
14 KiB
Python
421 lines
14 KiB
Python
"""Config flow for the ntfy integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Mapping
|
|
import logging
|
|
import random
|
|
import re
|
|
import string
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from aiontfy import Ntfy
|
|
from aiontfy.exceptions import (
|
|
NtfyException,
|
|
NtfyHTTPError,
|
|
NtfyUnauthorizedAuthenticationError,
|
|
)
|
|
import voluptuous as vol
|
|
from yarl import URL
|
|
|
|
from homeassistant import data_entry_flow
|
|
from homeassistant.config_entries import (
|
|
ConfigEntry,
|
|
ConfigFlow,
|
|
ConfigFlowResult,
|
|
ConfigSubentryFlow,
|
|
SubentryFlowResult,
|
|
)
|
|
from homeassistant.const import (
|
|
ATTR_CREDENTIALS,
|
|
CONF_NAME,
|
|
CONF_PASSWORD,
|
|
CONF_TOKEN,
|
|
CONF_URL,
|
|
CONF_USERNAME,
|
|
CONF_VERIFY_SSL,
|
|
)
|
|
from homeassistant.core import callback
|
|
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
|
from homeassistant.helpers.selector import (
|
|
TextSelector,
|
|
TextSelectorConfig,
|
|
TextSelectorType,
|
|
)
|
|
|
|
from .const import CONF_TOPIC, DEFAULT_URL, DOMAIN, SECTION_AUTH
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
STEP_USER_DATA_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Required(CONF_URL, default=DEFAULT_URL): TextSelector(
|
|
TextSelectorConfig(
|
|
type=TextSelectorType.URL,
|
|
autocomplete="url",
|
|
),
|
|
),
|
|
vol.Required(CONF_VERIFY_SSL, default=True): bool,
|
|
vol.Required(SECTION_AUTH): data_entry_flow.section(
|
|
vol.Schema(
|
|
{
|
|
vol.Optional(CONF_USERNAME): TextSelector(
|
|
TextSelectorConfig(
|
|
type=TextSelectorType.TEXT,
|
|
autocomplete="username",
|
|
),
|
|
),
|
|
vol.Optional(CONF_PASSWORD): TextSelector(
|
|
TextSelectorConfig(
|
|
type=TextSelectorType.PASSWORD,
|
|
autocomplete="current-password",
|
|
),
|
|
),
|
|
}
|
|
),
|
|
{"collapsed": True},
|
|
),
|
|
}
|
|
)
|
|
|
|
STEP_REAUTH_DATA_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Exclusive(CONF_PASSWORD, ATTR_CREDENTIALS): TextSelector(
|
|
TextSelectorConfig(
|
|
type=TextSelectorType.PASSWORD,
|
|
autocomplete="current-password",
|
|
),
|
|
),
|
|
vol.Exclusive(CONF_TOKEN, ATTR_CREDENTIALS): str,
|
|
}
|
|
)
|
|
|
|
STEP_RECONFIGURE_DATA_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Exclusive(CONF_USERNAME, ATTR_CREDENTIALS): TextSelector(
|
|
TextSelectorConfig(
|
|
type=TextSelectorType.TEXT,
|
|
autocomplete="username",
|
|
),
|
|
),
|
|
vol.Optional(CONF_PASSWORD, default=""): TextSelector(
|
|
TextSelectorConfig(
|
|
type=TextSelectorType.PASSWORD,
|
|
autocomplete="current-password",
|
|
),
|
|
),
|
|
vol.Exclusive(CONF_TOKEN, ATTR_CREDENTIALS): str,
|
|
}
|
|
)
|
|
|
|
STEP_USER_TOPIC_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Required(CONF_TOPIC): str,
|
|
vol.Optional(CONF_NAME): str,
|
|
}
|
|
)
|
|
|
|
RE_TOPIC = re.compile("^[-_a-zA-Z0-9]{1,64}$")
|
|
|
|
|
|
class NtfyConfigFlow(ConfigFlow, domain=DOMAIN):
|
|
"""Handle a config flow for ntfy."""
|
|
|
|
@classmethod
|
|
@callback
|
|
def async_get_supported_subentry_types(
|
|
cls, config_entry: ConfigEntry
|
|
) -> dict[str, type[ConfigSubentryFlow]]:
|
|
"""Return subentries supported by this integration."""
|
|
return {"topic": TopicSubentryFlowHandler}
|
|
|
|
async def async_step_user(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Handle the initial step."""
|
|
errors: dict[str, str] = {}
|
|
if user_input is not None:
|
|
url = URL(user_input[CONF_URL])
|
|
username = user_input[SECTION_AUTH].get(CONF_USERNAME)
|
|
self._async_abort_entries_match(
|
|
{
|
|
CONF_URL: url.human_repr(),
|
|
CONF_USERNAME: username,
|
|
}
|
|
)
|
|
session = async_get_clientsession(self.hass, user_input[CONF_VERIFY_SSL])
|
|
if username:
|
|
ntfy = Ntfy(
|
|
user_input[CONF_URL],
|
|
session,
|
|
username,
|
|
user_input[SECTION_AUTH].get(CONF_PASSWORD, ""),
|
|
)
|
|
else:
|
|
ntfy = Ntfy(user_input[CONF_URL], session)
|
|
|
|
try:
|
|
account = await ntfy.account()
|
|
token = (
|
|
(await ntfy.generate_token("Home Assistant")).token
|
|
if account.username != "*"
|
|
else None
|
|
)
|
|
except NtfyUnauthorizedAuthenticationError:
|
|
errors["base"] = "invalid_auth"
|
|
except NtfyHTTPError as e:
|
|
_LOGGER.debug("Error %s: %s [%s]", e.code, e.error, e.link)
|
|
errors["base"] = "cannot_connect"
|
|
except NtfyException:
|
|
errors["base"] = "cannot_connect"
|
|
except Exception:
|
|
_LOGGER.exception("Unexpected exception")
|
|
errors["base"] = "unknown"
|
|
else:
|
|
if TYPE_CHECKING:
|
|
assert url.host
|
|
return self.async_create_entry(
|
|
title=url.host,
|
|
data={
|
|
CONF_URL: url.human_repr(),
|
|
CONF_USERNAME: username,
|
|
CONF_TOKEN: token,
|
|
CONF_VERIFY_SSL: user_input[CONF_VERIFY_SSL],
|
|
},
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="user",
|
|
data_schema=self.add_suggested_values_to_schema(
|
|
data_schema=STEP_USER_DATA_SCHEMA, suggested_values=user_input
|
|
),
|
|
errors=errors,
|
|
)
|
|
|
|
async def async_step_reauth(
|
|
self, entry_data: Mapping[str, Any]
|
|
) -> ConfigFlowResult:
|
|
"""Perform reauth upon an API authentication error."""
|
|
return await self.async_step_reauth_confirm()
|
|
|
|
async def async_step_reauth_confirm(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Confirm reauthentication dialog."""
|
|
errors: dict[str, str] = {}
|
|
|
|
entry = self._get_reauth_entry()
|
|
|
|
if user_input is not None:
|
|
session = async_get_clientsession(self.hass)
|
|
if token := user_input.get(CONF_TOKEN):
|
|
ntfy = Ntfy(
|
|
entry.data[CONF_URL],
|
|
session,
|
|
token=user_input[CONF_TOKEN],
|
|
)
|
|
else:
|
|
ntfy = Ntfy(
|
|
entry.data[CONF_URL],
|
|
session,
|
|
username=entry.data[CONF_USERNAME],
|
|
password=user_input[CONF_PASSWORD],
|
|
)
|
|
|
|
try:
|
|
account = await ntfy.account()
|
|
token = (
|
|
(await ntfy.generate_token("Home Assistant")).token
|
|
if not user_input.get(CONF_TOKEN)
|
|
else user_input[CONF_TOKEN]
|
|
)
|
|
except NtfyUnauthorizedAuthenticationError:
|
|
errors["base"] = "invalid_auth"
|
|
except NtfyHTTPError as e:
|
|
_LOGGER.debug("Error %s: %s [%s]", e.code, e.error, e.link)
|
|
errors["base"] = "cannot_connect"
|
|
except NtfyException:
|
|
errors["base"] = "cannot_connect"
|
|
except Exception:
|
|
_LOGGER.exception("Unexpected exception")
|
|
errors["base"] = "unknown"
|
|
else:
|
|
if entry.data[CONF_USERNAME] != account.username:
|
|
return self.async_abort(
|
|
reason="account_mismatch",
|
|
description_placeholders={
|
|
CONF_USERNAME: entry.data[CONF_USERNAME],
|
|
"wrong_username": account.username,
|
|
},
|
|
)
|
|
return self.async_update_reload_and_abort(
|
|
entry,
|
|
data_updates={CONF_TOKEN: token},
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="reauth_confirm",
|
|
data_schema=self.add_suggested_values_to_schema(
|
|
data_schema=STEP_REAUTH_DATA_SCHEMA, suggested_values=user_input
|
|
),
|
|
errors=errors,
|
|
description_placeholders={CONF_USERNAME: entry.data[CONF_USERNAME]},
|
|
)
|
|
|
|
async def async_step_reconfigure(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Handle reconfigure flow for ntfy."""
|
|
errors: dict[str, str] = {}
|
|
|
|
entry = self._get_reconfigure_entry()
|
|
|
|
if user_input is not None:
|
|
session = async_get_clientsession(self.hass)
|
|
if token := user_input.get(CONF_TOKEN):
|
|
ntfy = Ntfy(
|
|
entry.data[CONF_URL],
|
|
session,
|
|
token=user_input[CONF_TOKEN],
|
|
)
|
|
else:
|
|
ntfy = Ntfy(
|
|
entry.data[CONF_URL],
|
|
session,
|
|
username=user_input.get(CONF_USERNAME, entry.data[CONF_USERNAME]),
|
|
password=user_input[CONF_PASSWORD],
|
|
)
|
|
|
|
try:
|
|
account = await ntfy.account()
|
|
if not token:
|
|
token = (await ntfy.generate_token("Home Assistant")).token
|
|
except NtfyUnauthorizedAuthenticationError:
|
|
errors["base"] = "invalid_auth"
|
|
except NtfyHTTPError as e:
|
|
_LOGGER.debug("Error %s: %s [%s]", e.code, e.error, e.link)
|
|
errors["base"] = "cannot_connect"
|
|
except NtfyException:
|
|
errors["base"] = "cannot_connect"
|
|
except Exception:
|
|
_LOGGER.exception("Unexpected exception")
|
|
errors["base"] = "unknown"
|
|
else:
|
|
if entry.data[CONF_USERNAME]:
|
|
if entry.data[CONF_USERNAME] != account.username:
|
|
return self.async_abort(
|
|
reason="account_mismatch",
|
|
description_placeholders={
|
|
CONF_USERNAME: entry.data[CONF_USERNAME],
|
|
"wrong_username": account.username,
|
|
},
|
|
)
|
|
|
|
return self.async_update_reload_and_abort(
|
|
entry,
|
|
data_updates={CONF_TOKEN: token},
|
|
)
|
|
self._async_abort_entries_match(
|
|
{
|
|
CONF_URL: entry.data[CONF_URL],
|
|
CONF_USERNAME: account.username,
|
|
}
|
|
)
|
|
return self.async_update_reload_and_abort(
|
|
entry,
|
|
data_updates={
|
|
CONF_USERNAME: account.username,
|
|
CONF_TOKEN: token,
|
|
},
|
|
)
|
|
if entry.data[CONF_USERNAME]:
|
|
return self.async_show_form(
|
|
step_id="reconfigure_user",
|
|
data_schema=self.add_suggested_values_to_schema(
|
|
data_schema=STEP_REAUTH_DATA_SCHEMA,
|
|
suggested_values=user_input,
|
|
),
|
|
errors=errors,
|
|
description_placeholders={
|
|
CONF_NAME: entry.title,
|
|
CONF_USERNAME: entry.data[CONF_USERNAME],
|
|
},
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id="reconfigure",
|
|
data_schema=self.add_suggested_values_to_schema(
|
|
data_schema=STEP_RECONFIGURE_DATA_SCHEMA,
|
|
suggested_values=user_input,
|
|
),
|
|
errors=errors,
|
|
description_placeholders={CONF_NAME: entry.title},
|
|
)
|
|
|
|
async def async_step_reconfigure_user(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> ConfigFlowResult:
|
|
"""Handle reconfigure flow for authenticated ntfy entry."""
|
|
|
|
return await self.async_step_reconfigure(user_input)
|
|
|
|
|
|
class TopicSubentryFlowHandler(ConfigSubentryFlow):
|
|
"""Handle subentry flow for adding and modifying a topic."""
|
|
|
|
async def async_step_user(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> SubentryFlowResult:
|
|
"""User flow to add a new topic."""
|
|
|
|
return self.async_show_menu(
|
|
step_id="user",
|
|
menu_options=["add_topic", "generate_topic"],
|
|
)
|
|
|
|
async def async_step_generate_topic(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> SubentryFlowResult:
|
|
"""User flow to add a new topic."""
|
|
topic = "".join(
|
|
random.choices(
|
|
string.ascii_lowercase + string.ascii_uppercase + string.digits,
|
|
k=16,
|
|
)
|
|
)
|
|
return self.async_show_form(
|
|
step_id="add_topic",
|
|
data_schema=self.add_suggested_values_to_schema(
|
|
data_schema=STEP_USER_TOPIC_SCHEMA,
|
|
suggested_values={CONF_TOPIC: topic},
|
|
),
|
|
)
|
|
|
|
async def async_step_add_topic(
|
|
self, user_input: dict[str, Any] | None = None
|
|
) -> SubentryFlowResult:
|
|
"""User flow to add a new topic."""
|
|
config_entry = self._get_entry()
|
|
errors: dict[str, str] = {}
|
|
|
|
if user_input is not None:
|
|
if not RE_TOPIC.match(user_input[CONF_TOPIC]):
|
|
errors["base"] = "invalid_topic"
|
|
else:
|
|
for existing_subentry in config_entry.subentries.values():
|
|
if existing_subentry.unique_id == user_input[CONF_TOPIC]:
|
|
return self.async_abort(reason="already_configured")
|
|
|
|
return self.async_create_entry(
|
|
title=user_input.get(CONF_NAME, user_input[CONF_TOPIC]),
|
|
data=user_input,
|
|
unique_id=user_input[CONF_TOPIC],
|
|
)
|
|
return self.async_show_form(
|
|
step_id="add_topic",
|
|
data_schema=self.add_suggested_values_to_schema(
|
|
data_schema=STEP_USER_TOPIC_SCHEMA, suggested_values=user_input
|
|
),
|
|
errors=errors,
|
|
)
|