424 lines
14 KiB
Python
424 lines
14 KiB
Python
"""Module to help with parsing and generating configuration files."""
|
||
|
||
from __future__ import annotations
|
||
|
||
from collections import OrderedDict
|
||
from collections.abc import Sequence
|
||
from contextlib import suppress
|
||
import logging
|
||
from typing import Any, Final
|
||
from urllib.parse import urlparse
|
||
|
||
import voluptuous as vol
|
||
|
||
from . import auth
|
||
from .auth import mfa_modules as auth_mfa_modules, providers as auth_providers
|
||
from .const import (
|
||
ATTR_ASSUMED_STATE,
|
||
ATTR_FRIENDLY_NAME,
|
||
ATTR_HIDDEN,
|
||
CONF_ALLOWLIST_EXTERNAL_DIRS,
|
||
CONF_ALLOWLIST_EXTERNAL_URLS,
|
||
CONF_AUTH_MFA_MODULES,
|
||
CONF_AUTH_PROVIDERS,
|
||
CONF_COUNTRY,
|
||
CONF_CURRENCY,
|
||
CONF_CUSTOMIZE,
|
||
CONF_CUSTOMIZE_DOMAIN,
|
||
CONF_CUSTOMIZE_GLOB,
|
||
CONF_DEBUG,
|
||
CONF_ELEVATION,
|
||
CONF_EXTERNAL_URL,
|
||
CONF_ID,
|
||
CONF_INTERNAL_URL,
|
||
CONF_LANGUAGE,
|
||
CONF_LATITUDE,
|
||
CONF_LEGACY_TEMPLATES,
|
||
CONF_LONGITUDE,
|
||
CONF_MEDIA_DIRS,
|
||
CONF_NAME,
|
||
CONF_PACKAGES,
|
||
CONF_RADIUS,
|
||
CONF_TEMPERATURE_UNIT,
|
||
CONF_TIME_ZONE,
|
||
CONF_TYPE,
|
||
CONF_UNIT_SYSTEM,
|
||
CONF_URL,
|
||
CONF_USERNAME,
|
||
LEGACY_CONF_WHITELIST_EXTERNAL_DIRS,
|
||
)
|
||
from .core import DOMAIN as HOMEASSISTANT_DOMAIN, ConfigSource, HomeAssistant
|
||
from .generated.currencies import HISTORIC_CURRENCIES
|
||
from .helpers import config_validation as cv, issue_registry as ir
|
||
from .helpers.entity_values import EntityValues
|
||
from .util.hass_dict import HassKey
|
||
from .util.package import is_docker_env
|
||
from .util.unit_system import get_unit_system, validate_unit_system
|
||
from .util.webrtc import RTCIceServer
|
||
|
||
_LOGGER = logging.getLogger(__name__)
|
||
|
||
DATA_CUSTOMIZE: HassKey[EntityValues] = HassKey("hass_customize")
|
||
|
||
CONF_CREDENTIAL: Final = "credential"
|
||
CONF_ICE_SERVERS: Final = "ice_servers"
|
||
CONF_WEBRTC: Final = "webrtc"
|
||
|
||
|
||
def _no_duplicate_auth_provider(
|
||
configs: Sequence[dict[str, Any]],
|
||
) -> Sequence[dict[str, Any]]:
|
||
"""No duplicate auth provider config allowed in a list.
|
||
|
||
Each type of auth provider can only have one config without optional id.
|
||
Unique id is required if same type of auth provider used multiple times.
|
||
"""
|
||
config_keys: set[tuple[str, str | None]] = set()
|
||
for config in configs:
|
||
key = (config[CONF_TYPE], config.get(CONF_ID))
|
||
if key in config_keys:
|
||
raise vol.Invalid(
|
||
f"Duplicate auth provider {config[CONF_TYPE]} found. "
|
||
"Please add unique IDs "
|
||
"if you want to have the same auth provider twice"
|
||
)
|
||
config_keys.add(key)
|
||
return configs
|
||
|
||
|
||
def _no_duplicate_auth_mfa_module(
|
||
configs: Sequence[dict[str, Any]],
|
||
) -> Sequence[dict[str, Any]]:
|
||
"""No duplicate auth mfa module item allowed in a list.
|
||
|
||
Each type of mfa module can only have one config without optional id.
|
||
A global unique id is required if same type of mfa module used multiple
|
||
times.
|
||
Note: this is different than auth provider
|
||
"""
|
||
config_keys: set[str] = set()
|
||
for config in configs:
|
||
key = config.get(CONF_ID, config[CONF_TYPE])
|
||
if key in config_keys:
|
||
raise vol.Invalid(
|
||
f"Duplicate mfa module {config[CONF_TYPE]} found. "
|
||
"Please add unique IDs "
|
||
"if you want to have the same mfa module twice"
|
||
)
|
||
config_keys.add(key)
|
||
return configs
|
||
|
||
|
||
def _filter_bad_internal_external_urls(conf: dict) -> dict:
|
||
"""Filter internal/external URL with a path."""
|
||
for key in CONF_INTERNAL_URL, CONF_EXTERNAL_URL:
|
||
if key in conf and urlparse(conf[key]).path not in ("", "/"):
|
||
# We warn but do not fix, because if this was incorrectly configured,
|
||
# adjusting this value might impact security.
|
||
_LOGGER.warning(
|
||
"Invalid %s set. It's not allowed to have a path (/bla)", key
|
||
)
|
||
|
||
return conf
|
||
|
||
|
||
# Schema for all packages element
|
||
_PACKAGES_CONFIG_SCHEMA = vol.Schema({cv.string: vol.Any(dict, list)})
|
||
|
||
# Schema for individual package definition
|
||
_PACKAGE_DEFINITION_SCHEMA = vol.Schema({cv.string: vol.Any(dict, list, None)})
|
||
|
||
_CUSTOMIZE_DICT_SCHEMA = vol.Schema(
|
||
{
|
||
vol.Optional(ATTR_FRIENDLY_NAME): cv.string,
|
||
vol.Optional(ATTR_HIDDEN): cv.boolean,
|
||
vol.Optional(ATTR_ASSUMED_STATE): cv.boolean,
|
||
},
|
||
extra=vol.ALLOW_EXTRA,
|
||
)
|
||
|
||
_CUSTOMIZE_CONFIG_SCHEMA = vol.Schema(
|
||
{
|
||
vol.Optional(CONF_CUSTOMIZE, default={}): vol.Schema(
|
||
{cv.entity_id: _CUSTOMIZE_DICT_SCHEMA}
|
||
),
|
||
vol.Optional(CONF_CUSTOMIZE_DOMAIN, default={}): vol.Schema(
|
||
{cv.string: _CUSTOMIZE_DICT_SCHEMA}
|
||
),
|
||
vol.Optional(CONF_CUSTOMIZE_GLOB, default={}): vol.Schema(
|
||
{cv.string: _CUSTOMIZE_DICT_SCHEMA}
|
||
),
|
||
}
|
||
)
|
||
|
||
|
||
def _raise_issue_if_historic_currency(hass: HomeAssistant, currency: str) -> None:
|
||
if currency not in HISTORIC_CURRENCIES:
|
||
ir.async_delete_issue(hass, HOMEASSISTANT_DOMAIN, "historic_currency")
|
||
return
|
||
|
||
ir.async_create_issue(
|
||
hass,
|
||
HOMEASSISTANT_DOMAIN,
|
||
"historic_currency",
|
||
is_fixable=False,
|
||
learn_more_url="homeassistant://config/general",
|
||
severity=ir.IssueSeverity.WARNING,
|
||
translation_key="historic_currency",
|
||
translation_placeholders={"currency": currency},
|
||
)
|
||
|
||
|
||
def _raise_issue_if_no_country(hass: HomeAssistant, country: str | None) -> None:
|
||
if country is not None:
|
||
ir.async_delete_issue(hass, HOMEASSISTANT_DOMAIN, "country_not_configured")
|
||
return
|
||
|
||
ir.async_create_issue(
|
||
hass,
|
||
HOMEASSISTANT_DOMAIN,
|
||
"country_not_configured",
|
||
is_fixable=False,
|
||
learn_more_url="homeassistant://config/general",
|
||
severity=ir.IssueSeverity.WARNING,
|
||
translation_key="country_not_configured",
|
||
)
|
||
|
||
|
||
def _validate_currency(data: Any) -> Any:
|
||
try:
|
||
return cv.currency(data)
|
||
except vol.InInvalid:
|
||
with suppress(vol.InInvalid):
|
||
return cv.historic_currency(data)
|
||
raise
|
||
|
||
|
||
def _validate_stun_or_turn_url(value: Any) -> str:
|
||
"""Validate an URL."""
|
||
url_in = str(value)
|
||
url = urlparse(url_in)
|
||
|
||
if url.scheme not in ("stun", "stuns", "turn", "turns"):
|
||
raise vol.Invalid("invalid url")
|
||
return url_in
|
||
|
||
|
||
CORE_CONFIG_SCHEMA = vol.All(
|
||
_CUSTOMIZE_CONFIG_SCHEMA.extend(
|
||
{
|
||
CONF_NAME: vol.Coerce(str),
|
||
CONF_LATITUDE: cv.latitude,
|
||
CONF_LONGITUDE: cv.longitude,
|
||
CONF_ELEVATION: vol.Coerce(int),
|
||
CONF_RADIUS: cv.positive_int,
|
||
vol.Remove(CONF_TEMPERATURE_UNIT): cv.temperature_unit,
|
||
CONF_UNIT_SYSTEM: validate_unit_system,
|
||
CONF_TIME_ZONE: cv.time_zone,
|
||
vol.Optional(CONF_INTERNAL_URL): cv.url,
|
||
vol.Optional(CONF_EXTERNAL_URL): cv.url,
|
||
vol.Optional(CONF_ALLOWLIST_EXTERNAL_DIRS): vol.All(
|
||
cv.ensure_list, [vol.IsDir()]
|
||
),
|
||
vol.Optional(LEGACY_CONF_WHITELIST_EXTERNAL_DIRS): vol.All(
|
||
cv.ensure_list, [vol.IsDir()]
|
||
),
|
||
vol.Optional(CONF_ALLOWLIST_EXTERNAL_URLS): vol.All(
|
||
cv.ensure_list, [cv.url]
|
||
),
|
||
vol.Optional(CONF_PACKAGES, default={}): _PACKAGES_CONFIG_SCHEMA,
|
||
vol.Optional(CONF_AUTH_PROVIDERS): vol.All(
|
||
cv.ensure_list,
|
||
[
|
||
auth_providers.AUTH_PROVIDER_SCHEMA.extend(
|
||
{
|
||
CONF_TYPE: vol.NotIn(
|
||
["insecure_example"],
|
||
(
|
||
"The insecure_example auth provider"
|
||
" is for testing only."
|
||
),
|
||
)
|
||
}
|
||
)
|
||
],
|
||
_no_duplicate_auth_provider,
|
||
),
|
||
vol.Optional(CONF_AUTH_MFA_MODULES): vol.All(
|
||
cv.ensure_list,
|
||
[
|
||
auth_mfa_modules.MULTI_FACTOR_AUTH_MODULE_SCHEMA.extend(
|
||
{
|
||
CONF_TYPE: vol.NotIn(
|
||
["insecure_example"],
|
||
"The insecure_example mfa module is for testing only.",
|
||
)
|
||
}
|
||
)
|
||
],
|
||
_no_duplicate_auth_mfa_module,
|
||
),
|
||
vol.Optional(CONF_MEDIA_DIRS): cv.schema_with_slug_keys(vol.IsDir()),
|
||
vol.Remove(CONF_LEGACY_TEMPLATES): cv.boolean,
|
||
vol.Optional(CONF_CURRENCY): _validate_currency,
|
||
vol.Optional(CONF_COUNTRY): cv.country,
|
||
vol.Optional(CONF_LANGUAGE): cv.language,
|
||
vol.Optional(CONF_DEBUG): cv.boolean,
|
||
vol.Optional(CONF_WEBRTC): vol.Schema(
|
||
{
|
||
vol.Required(CONF_ICE_SERVERS): vol.All(
|
||
cv.ensure_list,
|
||
[
|
||
vol.Schema(
|
||
{
|
||
vol.Required(CONF_URL): vol.All(
|
||
cv.ensure_list, [_validate_stun_or_turn_url]
|
||
),
|
||
vol.Optional(CONF_USERNAME): cv.string,
|
||
vol.Optional(CONF_CREDENTIAL): cv.string,
|
||
}
|
||
)
|
||
],
|
||
)
|
||
}
|
||
),
|
||
}
|
||
),
|
||
_filter_bad_internal_external_urls,
|
||
)
|
||
|
||
|
||
async def async_process_ha_core_config(hass: HomeAssistant, config: dict) -> None:
|
||
"""Process the [homeassistant] section from the configuration.
|
||
|
||
This method is a coroutine.
|
||
"""
|
||
# CORE_CONFIG_SCHEMA is not async safe since it uses vol.IsDir
|
||
# so we need to run it in an executor job.
|
||
config = await hass.async_add_executor_job(CORE_CONFIG_SCHEMA, config)
|
||
|
||
# Only load auth during startup.
|
||
if not hasattr(hass, "auth"):
|
||
if (auth_conf := config.get(CONF_AUTH_PROVIDERS)) is None:
|
||
auth_conf = [{"type": "homeassistant"}]
|
||
|
||
mfa_conf = config.get(
|
||
CONF_AUTH_MFA_MODULES,
|
||
[{"type": "totp", "id": "totp", "name": "Authenticator app"}],
|
||
)
|
||
|
||
setattr(
|
||
hass, "auth", await auth.auth_manager_from_config(hass, auth_conf, mfa_conf)
|
||
)
|
||
|
||
await hass.config.async_load()
|
||
|
||
hac = hass.config
|
||
|
||
if any(
|
||
k in config
|
||
for k in (
|
||
CONF_LATITUDE,
|
||
CONF_LONGITUDE,
|
||
CONF_NAME,
|
||
CONF_ELEVATION,
|
||
CONF_TIME_ZONE,
|
||
CONF_UNIT_SYSTEM,
|
||
CONF_EXTERNAL_URL,
|
||
CONF_INTERNAL_URL,
|
||
CONF_CURRENCY,
|
||
CONF_COUNTRY,
|
||
CONF_LANGUAGE,
|
||
CONF_RADIUS,
|
||
)
|
||
):
|
||
hac.config_source = ConfigSource.YAML
|
||
|
||
for key, attr in (
|
||
(CONF_LATITUDE, "latitude"),
|
||
(CONF_LONGITUDE, "longitude"),
|
||
(CONF_NAME, "location_name"),
|
||
(CONF_ELEVATION, "elevation"),
|
||
(CONF_INTERNAL_URL, "internal_url"),
|
||
(CONF_EXTERNAL_URL, "external_url"),
|
||
(CONF_MEDIA_DIRS, "media_dirs"),
|
||
(CONF_CURRENCY, "currency"),
|
||
(CONF_COUNTRY, "country"),
|
||
(CONF_LANGUAGE, "language"),
|
||
(CONF_RADIUS, "radius"),
|
||
):
|
||
if key in config:
|
||
setattr(hac, attr, config[key])
|
||
|
||
if config.get(CONF_DEBUG):
|
||
hac.debug = True
|
||
|
||
if CONF_WEBRTC in config:
|
||
hac.webrtc.ice_servers = [
|
||
RTCIceServer(
|
||
server[CONF_URL],
|
||
server.get(CONF_USERNAME),
|
||
server.get(CONF_CREDENTIAL),
|
||
)
|
||
for server in config[CONF_WEBRTC][CONF_ICE_SERVERS]
|
||
]
|
||
|
||
_raise_issue_if_historic_currency(hass, hass.config.currency)
|
||
_raise_issue_if_no_country(hass, hass.config.country)
|
||
|
||
if CONF_TIME_ZONE in config:
|
||
await hac.async_set_time_zone(config[CONF_TIME_ZONE])
|
||
|
||
if CONF_MEDIA_DIRS not in config:
|
||
if is_docker_env():
|
||
hac.media_dirs = {"local": "/media"}
|
||
else:
|
||
hac.media_dirs = {"local": hass.config.path("media")}
|
||
|
||
# Init whitelist external dir
|
||
hac.allowlist_external_dirs = {hass.config.path("www"), *hac.media_dirs.values()}
|
||
if CONF_ALLOWLIST_EXTERNAL_DIRS in config:
|
||
hac.allowlist_external_dirs.update(set(config[CONF_ALLOWLIST_EXTERNAL_DIRS]))
|
||
|
||
elif LEGACY_CONF_WHITELIST_EXTERNAL_DIRS in config:
|
||
_LOGGER.warning(
|
||
"Key %s has been replaced with %s. Please update your config",
|
||
LEGACY_CONF_WHITELIST_EXTERNAL_DIRS,
|
||
CONF_ALLOWLIST_EXTERNAL_DIRS,
|
||
)
|
||
hac.allowlist_external_dirs.update(
|
||
set(config[LEGACY_CONF_WHITELIST_EXTERNAL_DIRS])
|
||
)
|
||
|
||
# Init whitelist external URL list – make sure to add / to every URL that doesn't
|
||
# already have it so that we can properly test "path ownership"
|
||
if CONF_ALLOWLIST_EXTERNAL_URLS in config:
|
||
hac.allowlist_external_urls.update(
|
||
url if url.endswith("/") else f"{url}/"
|
||
for url in config[CONF_ALLOWLIST_EXTERNAL_URLS]
|
||
)
|
||
|
||
# Customize
|
||
cust_exact = dict(config[CONF_CUSTOMIZE])
|
||
cust_domain = dict(config[CONF_CUSTOMIZE_DOMAIN])
|
||
cust_glob = OrderedDict(config[CONF_CUSTOMIZE_GLOB])
|
||
|
||
for name, pkg in config[CONF_PACKAGES].items():
|
||
if (pkg_cust := pkg.get(HOMEASSISTANT_DOMAIN)) is None:
|
||
continue
|
||
|
||
try:
|
||
pkg_cust = _CUSTOMIZE_CONFIG_SCHEMA(pkg_cust)
|
||
except vol.Invalid:
|
||
_LOGGER.warning("Package %s contains invalid customize", name)
|
||
continue
|
||
|
||
cust_exact.update(pkg_cust[CONF_CUSTOMIZE])
|
||
cust_domain.update(pkg_cust[CONF_CUSTOMIZE_DOMAIN])
|
||
cust_glob.update(pkg_cust[CONF_CUSTOMIZE_GLOB])
|
||
|
||
hass.data[DATA_CUSTOMIZE] = EntityValues(cust_exact, cust_domain, cust_glob)
|
||
|
||
if CONF_UNIT_SYSTEM in config:
|
||
hac.units = get_unit_system(config[CONF_UNIT_SYSTEM])
|