core/homeassistant/helpers/config_validation.py

1199 lines
35 KiB
Python
Raw Normal View History

2016-03-28 01:48:51 +00:00
"""Helpers for config validation using voluptuous."""
2019-07-31 19:25:30 +00:00
from datetime import (
date as date_sys,
2019-07-31 19:25:30 +00:00
datetime as datetime_sys,
time as time_sys,
timedelta,
2019-07-31 19:25:30 +00:00
)
from enum import Enum
import inspect
import logging
from numbers import Number
import os
import re
from socket import _GLOBAL_DEFAULT_TIMEOUT # type: ignore # private, not in typeshed
from typing import (
Any,
Callable,
Dict,
Hashable,
List,
Optional,
Pattern,
Type,
TypeVar,
Union,
cast,
)
from urllib.parse import urlparse
from uuid import UUID
from pkg_resources import parse_version
import voluptuous as vol
import voluptuous_serialize
2016-03-28 01:48:51 +00:00
from homeassistant.const import (
ATTR_AREA_ID,
ATTR_ENTITY_ID,
2019-07-31 19:25:30 +00:00
CONF_ABOVE,
CONF_ALIAS,
CONF_ATTRIBUTE,
2019-07-31 19:25:30 +00:00
CONF_BELOW,
CONF_CHOOSE,
2019-07-31 19:25:30 +00:00
CONF_CONDITION,
CONF_CONDITIONS,
2020-03-05 19:44:42 +00:00
CONF_CONTINUE_ON_TIMEOUT,
CONF_COUNT,
CONF_DEFAULT,
2020-03-05 19:44:42 +00:00
CONF_DELAY,
CONF_DEVICE_ID,
CONF_DOMAIN,
2019-07-31 19:25:30 +00:00
CONF_ENTITY_ID,
CONF_ENTITY_NAMESPACE,
2020-03-05 19:44:42 +00:00
CONF_EVENT,
CONF_EVENT_DATA,
CONF_EVENT_DATA_TEMPLATE,
CONF_FOR,
2019-07-31 19:25:30 +00:00
CONF_PLATFORM,
CONF_REPEAT,
2019-07-31 19:25:30 +00:00
CONF_SCAN_INTERVAL,
2020-03-05 19:44:42 +00:00
CONF_SCENE,
CONF_SEQUENCE,
2020-03-05 19:44:42 +00:00
CONF_SERVICE,
CONF_SERVICE_TEMPLATE,
CONF_STATE,
CONF_TIMEOUT,
2019-07-31 19:25:30 +00:00
CONF_UNIT_SYSTEM_IMPERIAL,
CONF_UNIT_SYSTEM_METRIC,
CONF_UNTIL,
2019-07-31 19:25:30 +00:00
CONF_VALUE_TEMPLATE,
CONF_VARIABLES,
CONF_WAIT_FOR_TRIGGER,
2020-03-05 19:44:42 +00:00
CONF_WAIT_TEMPLATE,
CONF_WHILE,
2019-07-31 19:25:30 +00:00
ENTITY_MATCH_ALL,
ENTITY_MATCH_NONE,
2019-07-31 19:25:30 +00:00
SUN_EVENT_SUNRISE,
SUN_EVENT_SUNSET,
TEMP_CELSIUS,
TEMP_FAHRENHEIT,
WEEKDAYS,
__version__,
)
from homeassistant.core import split_entity_id, valid_entity_id
from homeassistant.exceptions import TemplateError
2020-09-11 10:24:16 +00:00
from homeassistant.helpers import (
script_variables as script_variables_helper,
template as template_helper,
)
from homeassistant.helpers.logging import KeywordStyleAdapter
from homeassistant.util import slugify as util_slugify
import homeassistant.util.dt as dt_util
2016-03-28 01:48:51 +00:00
# pylint: disable=invalid-name
TIME_PERIOD_ERROR = "offset {} should be format 'HH:MM', 'HH:MM:SS' or 'HH:MM:SS.F'"
# Home Assistant types
byte = vol.All(vol.Coerce(int), vol.Range(min=0, max=255))
small_float = vol.All(vol.Coerce(float), vol.Range(min=0, max=1))
positive_int = vol.All(vol.Coerce(int), vol.Range(min=0))
2019-07-31 19:25:30 +00:00
latitude = vol.All(
vol.Coerce(float), vol.Range(min=-90, max=90), msg="invalid latitude"
)
longitude = vol.All(
vol.Coerce(float), vol.Range(min=-180, max=180), msg="invalid longitude"
)
gps = vol.ExactSequence([latitude, longitude])
2016-05-03 05:05:09 +00:00
sun_event = vol.All(vol.Lower, vol.Any(SUN_EVENT_SUNSET, SUN_EVENT_SUNRISE))
port = vol.All(vol.Coerce(int), vol.Range(min=1, max=65535))
# typing typevar
2019-07-31 19:25:30 +00:00
T = TypeVar("T")
2016-04-21 22:52:20 +00:00
# Adapted from:
# https://github.com/alecthomas/voluptuous/issues/115#issuecomment-144464666
def has_at_least_one_key(*keys: str) -> Callable:
"""Validate that at least one key exists."""
2019-07-31 19:25:30 +00:00
def validate(obj: Dict) -> Dict:
2016-04-21 22:52:20 +00:00
"""Test keys exist in dict."""
if not isinstance(obj, dict):
2019-07-31 19:25:30 +00:00
raise vol.Invalid("expected dictionary")
2016-04-21 22:52:20 +00:00
for k in obj.keys():
if k in keys:
return obj
raise vol.Invalid("must contain at least one of {}.".format(", ".join(keys)))
2016-04-21 22:52:20 +00:00
return validate
2019-09-20 15:23:34 +00:00
def has_at_most_one_key(*keys: str) -> Callable[[Dict], Dict]:
"""Validate that zero keys exist or one key exists."""
2019-07-31 19:25:30 +00:00
def validate(obj: Dict) -> Dict:
"""Test zero keys exist or one key exists in dict."""
if not isinstance(obj, dict):
2019-07-31 19:25:30 +00:00
raise vol.Invalid("expected dictionary")
if len(set(keys) & set(obj)) > 1:
2019-07-31 19:25:30 +00:00
raise vol.Invalid("must contain at most one of {}.".format(", ".join(keys)))
return obj
return validate
def boolean(value: Any) -> bool:
"""Validate and coerce a boolean value."""
if isinstance(value, bool):
return value
if isinstance(value, str):
value = value.lower().strip()
2019-07-31 19:25:30 +00:00
if value in ("1", "true", "yes", "on", "enable"):
return True
2019-07-31 19:25:30 +00:00
if value in ("0", "false", "no", "off", "disable"):
return False
elif isinstance(value, Number):
# type ignore: https://github.com/python/mypy/issues/3186
return value != 0 # type: ignore
raise vol.Invalid(f"invalid boolean value {value}")
2016-03-28 01:48:51 +00:00
_WS = re.compile("\\s*")
def whitespace(value: Any) -> str:
"""Validate result contains only whitespace."""
if isinstance(value, str) and _WS.fullmatch(value):
return value
raise vol.Invalid(f"contains non-whitespace: {value}")
def isdevice(value: Any) -> str:
"""Validate that value is a real device."""
try:
os.stat(value)
return str(value)
except OSError as err:
raise vol.Invalid(f"No device at {value} found") from err
def matches_regex(regex: str) -> Callable[[Any], str]:
"""Validate that the value is a string that matches a regex."""
compiled = re.compile(regex)
def validator(value: Any) -> str:
"""Validate that value matches the given regex."""
if not isinstance(value, str):
raise vol.Invalid(f"not a string value: {value}")
if not compiled.match(value):
2019-07-31 19:25:30 +00:00
raise vol.Invalid(
f"value {value} does not match regular expression {compiled.pattern}"
2019-07-31 19:25:30 +00:00
)
return value
2019-07-31 19:25:30 +00:00
return validator
def is_regex(value: Any) -> Pattern[Any]:
"""Validate that a string is a valid regular expression."""
try:
r = re.compile(value)
return r
except TypeError as err:
2019-07-31 19:25:30 +00:00
raise vol.Invalid(
f"value {value} is of the wrong type for a regular expression"
) from err
except re.error as err:
raise vol.Invalid(f"value {value} is not a valid regular expression") from err
def isfile(value: Any) -> str:
"""Validate that the value is an existing file."""
if value is None:
2019-07-31 19:25:30 +00:00
raise vol.Invalid("None is not file")
file_in = os.path.expanduser(str(value))
if not os.path.isfile(file_in):
2019-07-31 19:25:30 +00:00
raise vol.Invalid("not a file")
if not os.access(file_in, os.R_OK):
2019-07-31 19:25:30 +00:00
raise vol.Invalid("file not readable")
return file_in
def isdir(value: Any) -> str:
"""Validate that the value is an existing dir."""
if value is None:
2019-07-31 19:25:30 +00:00
raise vol.Invalid("not a directory")
dir_in = os.path.expanduser(str(value))
if not os.path.isdir(dir_in):
2019-07-31 19:25:30 +00:00
raise vol.Invalid("not a directory")
if not os.access(dir_in, os.R_OK):
2019-07-31 19:25:30 +00:00
raise vol.Invalid("directory not readable")
return dir_in
def ensure_list(value: Union[T, List[T], None]) -> List[T]:
"""Wrap value in list if it is not one."""
if value is None:
return []
return value if isinstance(value, list) else [value]
def entity_id(value: Any) -> str:
2016-03-28 01:48:51 +00:00
"""Validate Entity ID."""
str_value = string(value).lower()
if valid_entity_id(str_value):
return str_value
raise vol.Invalid(f"Entity ID {value} is an invalid entity id")
2016-03-28 01:48:51 +00:00
def entity_ids(value: Union[str, List]) -> List[str]:
2016-03-28 01:48:51 +00:00
"""Validate Entity IDs."""
if value is None:
2019-07-31 19:25:30 +00:00
raise vol.Invalid("Entity IDs can not be None")
2016-03-28 01:48:51 +00:00
if isinstance(value, str):
2019-07-31 19:25:30 +00:00
value = [ent_id.strip() for ent_id in value.split(",")]
2016-03-28 01:48:51 +00:00
2016-04-10 22:20:20 +00:00
return [entity_id(ent_id) for ent_id in value]
2016-03-28 01:48:51 +00:00
comp_entity_ids = vol.Any(
vol.All(vol.Lower, vol.Any(ENTITY_MATCH_ALL, ENTITY_MATCH_NONE)), entity_ids
)
2019-09-20 15:23:34 +00:00
def entity_domain(domain: str) -> Callable[[Any], str]:
"""Validate that entity belong to domain."""
2019-07-31 19:25:30 +00:00
def validate(value: Any) -> str:
"""Test if entity domain is domain."""
ent_domain = entities_domain(domain)
return ent_domain(value)[0]
2019-07-31 19:25:30 +00:00
return validate
2019-09-20 15:23:34 +00:00
def entities_domain(domain: str) -> Callable[[Union[str, List]], List[str]]:
"""Validate that entities belong to domain."""
2019-07-31 19:25:30 +00:00
def validate(values: Union[str, List]) -> List[str]:
"""Test if entity domain is domain."""
values = entity_ids(values)
for ent_id in values:
if split_entity_id(ent_id)[0] != domain:
raise vol.Invalid(
f"Entity ID '{ent_id}' does not belong to domain '{domain}'"
2019-07-31 19:25:30 +00:00
)
return values
2019-07-31 19:25:30 +00:00
return validate
def enum(enumClass: Type[Enum]) -> vol.All:
"""Create validator for specified enum."""
return vol.All(vol.In(enumClass.__members__), enumClass.__getitem__)
def icon(value: Any) -> str:
2016-03-28 01:48:51 +00:00
"""Validate icon."""
str_value = str(value)
2016-03-28 01:48:51 +00:00
if ":" in str_value:
return str_value
2016-03-28 01:48:51 +00:00
2019-08-02 21:20:07 +00:00
raise vol.Invalid('Icons should be specified in the form "prefix:name"')
2016-03-28 01:48:51 +00:00
2016-04-21 22:52:20 +00:00
time_period_dict = vol.All(
2019-07-31 19:25:30 +00:00
dict,
vol.Schema(
{
"days": vol.Coerce(float),
"hours": vol.Coerce(float),
"minutes": vol.Coerce(float),
"seconds": vol.Coerce(float),
"milliseconds": vol.Coerce(float),
2019-07-31 19:25:30 +00:00
}
),
has_at_least_one_key("days", "hours", "minutes", "seconds", "milliseconds"),
lambda value: timedelta(**value),
)
2016-04-21 22:52:20 +00:00
2019-09-20 15:23:34 +00:00
def time(value: Any) -> time_sys:
"""Validate and transform a time."""
if isinstance(value, time_sys):
return value
try:
time_val = dt_util.parse_time(value)
except TypeError as err:
raise vol.Invalid("Not a parseable type") from err
if time_val is None:
raise vol.Invalid(f"Invalid time specified: {value}")
return time_val
2019-09-20 15:23:34 +00:00
def date(value: Any) -> date_sys:
"""Validate and transform a date."""
if isinstance(value, date_sys):
return value
try:
date_val = dt_util.parse_date(value)
except TypeError as err:
raise vol.Invalid("Not a parseable type") from err
if date_val is None:
raise vol.Invalid("Could not parse date")
return date_val
def time_period_str(value: str) -> timedelta:
"""Validate and transform time offset."""
if isinstance(value, int): # type: ignore
2019-07-31 19:25:30 +00:00
raise vol.Invalid("Make sure you wrap time values in quotes")
if not isinstance(value, str):
raise vol.Invalid(TIME_PERIOD_ERROR.format(value))
negative_offset = False
2019-07-31 19:25:30 +00:00
if value.startswith("-"):
negative_offset = True
value = value[1:]
2019-07-31 19:25:30 +00:00
elif value.startswith("+"):
value = value[1:]
parsed = value.split(":")
if len(parsed) not in (2, 3):
raise vol.Invalid(TIME_PERIOD_ERROR.format(value))
try:
hour = int(parsed[0])
minute = int(parsed[1])
try:
second = float(parsed[2])
except IndexError:
second = 0
except ValueError as err:
raise vol.Invalid(TIME_PERIOD_ERROR.format(value)) from err
offset = timedelta(hours=hour, minutes=minute, seconds=second)
if negative_offset:
offset *= -1
return offset
def time_period_seconds(value: Union[float, str]) -> timedelta:
"""Validate and transform seconds to a time offset."""
try:
return timedelta(seconds=float(value))
except (ValueError, TypeError) as err:
raise vol.Invalid(f"Expected seconds, got {value}") from err
2019-07-31 19:25:30 +00:00
time_period = vol.Any(time_period_str, time_period_seconds, timedelta, time_period_dict)
2016-04-21 22:52:20 +00:00
def match_all(value: T) -> T:
"""Validate that matches all values."""
return value
def positive_timedelta(value: timedelta) -> timedelta:
2016-04-21 22:52:20 +00:00
"""Validate timedelta is positive."""
if value < timedelta(0):
2019-07-31 19:25:30 +00:00
raise vol.Invalid("Time period should be positive")
2016-04-21 22:52:20 +00:00
return value
positive_time_period_dict = vol.All(time_period_dict, positive_timedelta)
positive_time_period = vol.All(time_period, positive_timedelta)
def remove_falsy(value: List[T]) -> List[T]:
"""Remove falsy values from a list."""
return [v for v in value if v]
def service(value: Any) -> str:
"""Validate service."""
# Services use same format as entities so we can use same helper.
str_value = string(value).lower()
if valid_entity_id(str_value):
return str_value
raise vol.Invalid(f"Service {value} does not match format <domain>.<name>")
def slug(value: Any) -> str:
"""Validate value is a valid slug."""
if value is None:
raise vol.Invalid("Slug should not be None")
str_value = str(value)
slg = util_slugify(str_value)
if str_value == slg:
return str_value
raise vol.Invalid(f"invalid slug {value} (try {slg})")
def schema_with_slug_keys(
value_schema: Union[T, Callable], *, slug_validator: Callable[[Any], str] = slug
) -> Callable:
"""Ensure dicts have slugs as keys.
Replacement of vol.Schema({cv.slug: value_schema}) to prevent misleading
"Extra keys" errors from voluptuous.
"""
schema = vol.Schema({str: value_schema})
def verify(value: Dict) -> Dict:
"""Validate all keys are slugs and then the value_schema."""
if not isinstance(value, dict):
2019-07-31 19:25:30 +00:00
raise vol.Invalid("expected dictionary")
for key in value.keys():
slug_validator(key)
return cast(Dict, schema(value))
2019-07-31 19:25:30 +00:00
return verify
def slugify(value: Any) -> str:
"""Coerce a value to a slug."""
if value is None:
2019-07-31 19:25:30 +00:00
raise vol.Invalid("Slug should not be None")
slg = util_slugify(str(value))
if slg:
return slg
raise vol.Invalid(f"Unable to slugify {value}")
def string(value: Any) -> str:
"""Coerce value to string, except for None."""
if value is None:
2019-07-31 19:25:30 +00:00
raise vol.Invalid("string value is None")
if isinstance(value, (list, dict)):
2019-07-31 19:25:30 +00:00
raise vol.Invalid("value should be a string")
return str(value)
def string_with_no_html(value: Any) -> str:
"""Validate that the value is a string without HTML."""
value = string(value)
regex = re.compile(r"<[a-z][\s\S]*>")
if regex.search(value):
raise vol.Invalid("the string should not contain HTML")
return str(value)
2019-09-20 15:23:34 +00:00
def temperature_unit(value: Any) -> str:
2016-03-28 01:48:51 +00:00
"""Validate and transform temperature unit."""
value = str(value).upper()
2019-07-31 19:25:30 +00:00
if value == "C":
2016-04-20 03:30:44 +00:00
return TEMP_CELSIUS
2019-07-31 19:25:30 +00:00
if value == "F":
return TEMP_FAHRENHEIT
2019-07-31 19:25:30 +00:00
raise vol.Invalid("invalid temperature unit (expected C or F)")
2019-07-31 19:25:30 +00:00
unit_system = vol.All(
vol.Lower, vol.Any(CONF_UNIT_SYSTEM_METRIC, CONF_UNIT_SYSTEM_IMPERIAL)
)
Add unit system support Add unit symbol constants Initial unit system object Import more constants Pydoc for unit system file Import constants for configuration validation Unit system validation method Typing for constants Inches are valid lengths too Typings Change base class to dict - needed for remote api call serialization Validation Use dictionary keys Defined unit systems Update location util to use metric instead of us fahrenheit Update constant imports Import defined unit systems Update configuration to use unit system Update schema to use unit system Update constants Add imports to core for unit system and distance Type for config Default unit system Convert distance from HASS instance Update temperature conversion to use unit system Update temperature conversion Set unit system based on configuration Set info unit system Return unit system dictionary with config dictionary Auto discover unit system Update location test for use metric Update forecast unit system Update mold indicator unit system Update thermostat unit system Update thermostat demo test Unit tests around unit system Update test common hass configuration Update configuration unit tests There should always be a unit system! Update core unit tests Constants typing Linting issues Remove unused import Update fitbit sensor to use application unit system Update google travel time to use application unit system Update configuration example Update dht sensor Update DHT temperature conversion to use the utility function Update swagger config Update my sensors metric flag Update hvac component temperature conversion HVAC conversion for temperature Pull unit from sensor type map Pull unit from sensor type map Update the temper sensor unit Update yWeather sensor unit Update hvac demo unit test Set unit test config unit system to metric Use hass unit system length for default in proximity Use the name of the system instead of temperature Use constants from const Unused import Forecasted temperature Fix calculation in case furthest distance is greater than 1000000 units Remove unneeded constants Set default length to km or miles Use constants Linting doesn't like importing just for typing Fix reference Test is expecting meters - set config to meters Use constant Use constant PyDoc for unit test Should be not in Rename to units Change unit system to be an object - not a dictionary Return tuple in conversion Move convert to temperature util Temperature conversion is now in unit system Update imports Rename to units Units is now an object Use temperature util conversion Unit system is now an object Validate and convert unit system config Return the scalar value in template distance Test is expecting meters Update unit tests around unit system Distance util returns tuple Fix location info test Set units Update unit tests Convert distance DOH Pull out the scalar from the vector Linting I really hate python linting Linting again BLARG Unit test documentation Unit test around is metric flag Break ternary statement into if/else blocks Don't use dictionary - use members is metric flag Rename constants Use is metric flag Move constants to CONST file Move to const file Raise error if unit is not expected Typing No need to return unit since only performing conversion if it can work Use constants Line wrapping Raise error if invalid value Remove subscripts from conversion as they are no longer returned as tuples No longer tuples No longer tuples Check for numeric type Fix string format to use correct variable Typing Assert errors raised Remove subscript Only convert temperature if we know the unit If no unit of measurement set - default to HASS config Convert only if we know the unit Remove subscription Fix not in clause Linting fixes Wants a boolean Clearer if-block Check if the key is in the config first Missed a couple expecting tuples Backwards compatibility No like-y ternary! Error handling around state setting Pretty unit system configuration validation More tuple crap Use is metric flag Error handling around min/max temp Explode if no unit Pull unit from config Celsius has a decimal Unused import Check if it's a temperature before we try to convert it to a temperature Linting says too many statements - combine lat/long in a fairly reasonable manner Backwards compatibility unit test Better doc
2016-07-31 20:24:49 +00:00
def template(value: Optional[Any]) -> template_helper.Template:
"""Validate a jinja2 template."""
if value is None:
2019-07-31 19:25:30 +00:00
raise vol.Invalid("template value is None")
if isinstance(value, (list, dict, template_helper.Template)):
2019-07-31 19:25:30 +00:00
raise vol.Invalid("template value should be a string")
template_value = template_helper.Template(str(value)) # type: ignore
try:
template_value.ensure_valid()
return cast(template_helper.Template, template_value)
except TemplateError as ex:
raise vol.Invalid(f"invalid template ({ex})") from ex
2016-03-28 01:48:51 +00:00
def dynamic_template(value: Optional[Any]) -> template_helper.Template:
"""Validate a dynamic (non static) jinja2 template."""
if value is None:
raise vol.Invalid("template value is None")
if isinstance(value, (list, dict, template_helper.Template)):
raise vol.Invalid("template value should be a string")
if not template_helper.is_template_string(str(value)):
raise vol.Invalid("template value does not contain a dynmamic template")
template_value = template_helper.Template(str(value)) # type: ignore
try:
template_value.ensure_valid()
return cast(template_helper.Template, template_value)
except TemplateError as ex:
raise vol.Invalid(f"invalid template ({ex})") from ex
def template_complex(value: Any) -> Any:
"""Validate a complex jinja2 template."""
if isinstance(value, list):
return_list = value.copy()
for idx, element in enumerate(return_list):
return_list[idx] = template_complex(element)
return return_list
if isinstance(value, dict):
return {
template_complex(key): template_complex(element)
for key, element in value.items()
}
if isinstance(value, str) and template_helper.is_template_string(value):
return template(value)
return value
positive_time_period_template = vol.Any(
positive_time_period, template, template_complex
)
def datetime(value: Any) -> datetime_sys:
"""Validate datetime."""
if isinstance(value, datetime_sys):
return value
try:
date_val = dt_util.parse_datetime(value)
except TypeError:
date_val = None
if date_val is None:
raise vol.Invalid(f"Invalid datetime specified: {value}")
return date_val
def time_zone(value: str) -> str:
2016-03-28 01:48:51 +00:00
"""Validate timezone."""
if dt_util.get_time_zone(value) is not None:
return value
raise vol.Invalid(
2019-07-31 19:25:30 +00:00
"Invalid time zone passed in. Valid options can be found here: "
"http://en.wikipedia.org/wiki/List_of_tz_database_time_zones"
)
2016-11-19 05:47:59 +00:00
weekdays = vol.All(ensure_list, [vol.In(WEEKDAYS)])
def socket_timeout(value: Optional[Any]) -> object:
"""Validate timeout float > 0.0.
None coerced to socket._GLOBAL_DEFAULT_TIMEOUT bare object.
"""
if value is None:
return _GLOBAL_DEFAULT_TIMEOUT
try:
float_value = float(value)
if float_value > 0.0:
return float_value
raise vol.Invalid("Invalid socket timeout value. float > 0.0 required.")
except Exception as err:
raise vol.Invalid(f"Invalid socket timeout: {err}")
# pylint: disable=no-value-for-parameter
def url(value: Any) -> str:
"""Validate an URL."""
url_in = str(value)
2019-07-31 19:25:30 +00:00
if urlparse(url_in).scheme in ["http", "https"]:
return cast(str, vol.Schema(vol.Url())(url_in))
2019-07-31 19:25:30 +00:00
raise vol.Invalid("invalid url")
def x10_address(value: str) -> str:
"""Validate an x10 address."""
2019-07-31 19:25:30 +00:00
regex = re.compile(r"([A-Pa-p]{1})(?:[2-9]|1[0-6]?)$")
if not regex.match(value):
2019-07-31 19:25:30 +00:00
raise vol.Invalid("Invalid X10 Address")
return str(value).lower()
def uuid4_hex(value: Any) -> str:
"""Validate a v4 UUID in hex format."""
try:
result = UUID(value, version=4)
except (ValueError, AttributeError, TypeError) as error:
2019-07-31 19:25:30 +00:00
raise vol.Invalid("Invalid Version4 UUID", error_message=str(error))
if result.hex != value.lower():
# UUID() will create a uuid4 if input is invalid
2019-07-31 19:25:30 +00:00
raise vol.Invalid("Invalid Version4 UUID")
return result.hex
def ensure_list_csv(value: Any) -> List:
"""Ensure that input is a list or make one from comma-separated string."""
if isinstance(value, str):
2019-07-31 19:25:30 +00:00
return [member.strip() for member in value.split(",")]
return ensure_list(value)
class multi_select:
"""Multi select validator returning list of selected values."""
def __init__(self, options: dict) -> None:
"""Initialize multi select."""
self.options = options
def __call__(self, selected: list) -> list:
"""Validate input."""
if not isinstance(selected, list):
raise vol.Invalid("Not a list")
for value in selected:
if value not in self.options:
raise vol.Invalid(f"{value} is not a valid option")
return selected
2019-07-31 19:25:30 +00:00
def deprecated(
key: str,
replacement_key: Optional[str] = None,
invalidation_version: Optional[str] = None,
default: Optional[Any] = None,
2019-09-20 15:23:34 +00:00
) -> Callable[[Dict], Dict]:
"""
Log key as deprecated and provide a replacement (if exists).
Expected behavior:
- Outputs the appropriate deprecation warning if key is detected
- Processes schema moving the value from key to replacement_key
- Processes schema changing nothing if only replacement_key provided
- No warning if only replacement_key provided
- No warning if neither key nor replacement_key are provided
- Adds replacement_key with default value in this case
- Once the invalidation_version is crossed, raises vol.Invalid if key
is detected
"""
2019-07-11 07:38:58 +00:00
module = inspect.getmodule(inspect.stack()[1][0])
if module is not None:
module_name = module.__name__
else:
# If Python is unable to access the sources files, the call stack frame
# will be missing information, so let's guard.
2019-07-11 07:38:58 +00:00
# https://github.com/home-assistant/home-assistant/issues/24982
module_name = __name__
if replacement_key and invalidation_version:
2019-07-31 19:25:30 +00:00
warning = (
"The '{key}' option is deprecated,"
" please replace it with '{replacement_key}'."
" This option {invalidation_status} invalid in version"
2019-07-31 19:25:30 +00:00
" {invalidation_version}"
)
elif replacement_key:
2019-07-31 19:25:30 +00:00
warning = (
"The '{key}' option is deprecated,"
" please replace it with '{replacement_key}'"
2019-07-31 19:25:30 +00:00
)
elif invalidation_version:
2019-07-31 19:25:30 +00:00
warning = (
"The '{key}' option is deprecated,"
" please remove it from your configuration."
" This option {invalidation_status} invalid in version"
2019-07-31 19:25:30 +00:00
" {invalidation_version}"
)
else:
2019-07-31 19:25:30 +00:00
warning = (
"The '{key}' option is deprecated,"
" please remove it from your configuration"
2019-07-31 19:25:30 +00:00
)
def check_for_invalid_version() -> None:
"""Raise error if current version has reached invalidation."""
if not invalidation_version:
return
if parse_version(__version__) >= parse_version(invalidation_version):
raise vol.Invalid(
warning.format(
key=key,
replacement_key=replacement_key,
invalidation_status="became",
2019-07-31 19:25:30 +00:00
invalidation_version=invalidation_version,
)
)
2019-09-20 15:23:34 +00:00
def validator(config: Dict) -> Dict:
"""Check if key is in config and log warning."""
if key in config:
check_for_invalid_version()
KeywordStyleAdapter(logging.getLogger(module_name)).warning(
warning,
key=key,
replacement_key=replacement_key,
invalidation_status="will become",
2019-07-31 19:25:30 +00:00
invalidation_version=invalidation_version,
)
value = config[key]
if replacement_key:
config.pop(key)
else:
value = default
keys = [key]
if replacement_key:
keys.append(replacement_key)
if value is not None and (
2019-07-31 19:25:30 +00:00
replacement_key not in config or default == config.get(replacement_key)
):
config[replacement_key] = value
return has_at_most_one_key(*keys)(config)
return validator
def key_value_schemas(
key: str, value_schemas: Dict[str, vol.Schema]
) -> Callable[[Any], Dict[str, Any]]:
"""Create a validator that validates based on a value for specific key.
This gives better error messages.
"""
def key_value_validator(value: Any) -> Dict[str, Any]:
if not isinstance(value, dict):
raise vol.Invalid("Expected a dictionary")
key_value = value.get(key)
if key_value not in value_schemas:
raise vol.Invalid(
2020-03-05 19:44:42 +00:00
f"Unexpected value for {key}: '{key_value}'. Expected {', '.join(value_schemas)}"
)
return cast(Dict[str, Any], value_schemas[key_value](value))
return key_value_validator
# Validator helpers
2019-07-31 19:25:30 +00:00
def key_dependency(
key: Hashable, dependency: Hashable
) -> Callable[[Dict[Hashable, Any]], Dict[Hashable, Any]]:
"""Validate that all dependencies exist for key."""
2019-07-31 19:25:30 +00:00
def validator(value: Dict[Hashable, Any]) -> Dict[Hashable, Any]:
"""Test dependencies."""
if not isinstance(value, dict):
2019-07-31 19:25:30 +00:00
raise vol.Invalid("key dependencies require a dict")
if key in value and dependency not in value:
2019-07-31 19:25:30 +00:00
raise vol.Invalid(
f'dependency violation - key "{key}" requires '
f'key "{dependency}" to exist'
2019-07-31 19:25:30 +00:00
)
return value
2019-07-31 19:25:30 +00:00
return validator
def custom_serializer(schema: Any) -> Any:
"""Serialize additional types for voluptuous_serialize."""
if schema is positive_time_period_dict:
return {"type": "positive_time_period_dict"}
if isinstance(schema, multi_select):
return {"type": "multi_select", "options": schema.options}
return voluptuous_serialize.UNSUPPORTED
# Schemas
2019-07-31 19:25:30 +00:00
PLATFORM_SCHEMA = vol.Schema(
{
vol.Required(CONF_PLATFORM): string,
vol.Optional(CONF_ENTITY_NAMESPACE): string,
vol.Optional(CONF_SCAN_INTERVAL): time_period,
}
)
PLATFORM_SCHEMA_BASE = PLATFORM_SCHEMA.extend({}, extra=vol.ALLOW_EXTRA)
ENTITY_SERVICE_FIELDS = (ATTR_ENTITY_ID, ATTR_AREA_ID)
def make_entity_service_schema(
schema: dict, *, extra: int = vol.PREVENT_EXTRA
) -> vol.All:
"""Create an entity service schema."""
return vol.All(
vol.Schema(
{
**schema,
vol.Optional(ATTR_ENTITY_ID): comp_entity_ids,
vol.Optional(ATTR_AREA_ID): vol.Any(
ENTITY_MATCH_NONE, vol.All(ensure_list, [str])
),
},
extra=extra,
),
has_at_least_one_key(*ENTITY_SERVICE_FIELDS),
)
2019-07-31 19:25:30 +00:00
2020-09-11 10:24:16 +00:00
SCRIPT_VARIABLES_SCHEMA = vol.All(
vol.Schema({str: template_complex}),
# pylint: disable=unnecessary-lambda
lambda val: script_variables_helper.ScriptVariables(val),
)
def script_action(value: Any) -> dict:
"""Validate a script action."""
if not isinstance(value, dict):
raise vol.Invalid("expected dictionary")
return ACTION_TYPE_SCHEMAS[determine_script_action(value)](value)
SCRIPT_SCHEMA = vol.All(ensure_list, [script_action])
2019-07-31 19:25:30 +00:00
EVENT_SCHEMA = vol.Schema(
{
vol.Optional(CONF_ALIAS): string,
2020-03-05 19:44:42 +00:00
vol.Required(CONF_EVENT): string,
vol.Optional(CONF_EVENT_DATA): vol.All(dict, template_complex),
vol.Optional(CONF_EVENT_DATA_TEMPLATE): vol.All(dict, template_complex),
2019-07-31 19:25:30 +00:00
}
)
SERVICE_SCHEMA = vol.All(
vol.Schema(
{
vol.Optional(CONF_ALIAS): string,
vol.Exclusive(CONF_SERVICE, "service name"): vol.Any(
service, dynamic_template
),
vol.Exclusive(CONF_SERVICE_TEMPLATE, "service name"): vol.Any(
service, dynamic_template
),
vol.Optional("data"): vol.All(dict, template_complex),
vol.Optional("data_template"): vol.All(dict, template_complex),
2019-07-31 19:25:30 +00:00
vol.Optional(CONF_ENTITY_ID): comp_entity_ids,
}
),
2020-03-05 19:44:42 +00:00
has_at_least_one_key(CONF_SERVICE, CONF_SERVICE_TEMPLATE),
2019-07-31 19:25:30 +00:00
)
NUMERIC_STATE_CONDITION_SCHEMA = vol.All(
vol.Schema(
{
vol.Required(CONF_CONDITION): "numeric_state",
vol.Required(CONF_ENTITY_ID): entity_ids,
vol.Optional(CONF_ATTRIBUTE): str,
CONF_BELOW: vol.Any(
vol.Coerce(float), vol.All(str, entity_domain("input_number"))
),
CONF_ABOVE: vol.Any(
vol.Coerce(float), vol.All(str, entity_domain("input_number"))
),
2019-07-31 19:25:30 +00:00
vol.Optional(CONF_VALUE_TEMPLATE): template,
}
),
has_at_least_one_key(CONF_BELOW, CONF_ABOVE),
)
STATE_CONDITION_SCHEMA = vol.All(
vol.Schema(
{
vol.Required(CONF_CONDITION): "state",
vol.Required(CONF_ENTITY_ID): entity_ids,
vol.Optional(CONF_ATTRIBUTE): str,
vol.Required(CONF_STATE): vol.Any(str, [str]),
vol.Optional(CONF_FOR): positive_time_period,
2019-07-31 19:25:30 +00:00
# To support use_trigger_value in automation
# Deprecated 2016/04/25
vol.Optional("from"): str,
}
),
key_dependency("for", "state"),
)
SUN_CONDITION_SCHEMA = vol.All(
vol.Schema(
{
vol.Required(CONF_CONDITION): "sun",
vol.Optional("before"): sun_event,
vol.Optional("before_offset"): time_period,
vol.Optional("after"): vol.All(
vol.Lower, vol.Any(SUN_EVENT_SUNSET, SUN_EVENT_SUNRISE)
),
vol.Optional("after_offset"): time_period,
}
),
has_at_least_one_key("before", "after"),
)
TEMPLATE_CONDITION_SCHEMA = vol.Schema(
{
vol.Required(CONF_CONDITION): "template",
vol.Required(CONF_VALUE_TEMPLATE): template,
}
)
TIME_CONDITION_SCHEMA = vol.All(
vol.Schema(
{
vol.Required(CONF_CONDITION): "time",
"before": vol.Any(time, vol.All(str, entity_domain("input_datetime"))),
"after": vol.Any(time, vol.All(str, entity_domain("input_datetime"))),
2019-07-31 19:25:30 +00:00
"weekday": weekdays,
}
),
has_at_least_one_key("before", "after", "weekday"),
)
ZONE_CONDITION_SCHEMA = vol.Schema(
{
vol.Required(CONF_CONDITION): "zone",
vol.Required(CONF_ENTITY_ID): entity_ids,
"zone": entity_ids,
2019-07-31 19:25:30 +00:00
# To support use_trigger_value in automation
# Deprecated 2016/04/25
vol.Optional("event"): vol.Any("enter", "leave"),
}
)
AND_CONDITION_SCHEMA = vol.Schema(
{
vol.Required(CONF_CONDITION): "and",
vol.Required(CONF_CONDITIONS): vol.All(
2019-07-31 19:25:30 +00:00
ensure_list,
# pylint: disable=unnecessary-lambda
[lambda value: CONDITION_SCHEMA(value)],
),
}
)
OR_CONDITION_SCHEMA = vol.Schema(
{
vol.Required(CONF_CONDITION): "or",
vol.Required(CONF_CONDITIONS): vol.All(
2019-07-31 19:25:30 +00:00
ensure_list,
# pylint: disable=unnecessary-lambda
[lambda value: CONDITION_SCHEMA(value)],
),
}
)
2020-04-24 16:40:23 +00:00
NOT_CONDITION_SCHEMA = vol.Schema(
{
vol.Required(CONF_CONDITION): "not",
vol.Required(CONF_CONDITIONS): vol.All(
2020-04-24 16:40:23 +00:00
ensure_list,
# pylint: disable=unnecessary-lambda
[lambda value: CONDITION_SCHEMA(value)],
),
}
)
DEVICE_CONDITION_BASE_SCHEMA = vol.Schema(
{
vol.Required(CONF_CONDITION): "device",
vol.Required(CONF_DEVICE_ID): str,
vol.Required(CONF_DOMAIN): str,
}
)
DEVICE_CONDITION_SCHEMA = DEVICE_CONDITION_BASE_SCHEMA.extend({}, extra=vol.ALLOW_EXTRA)
CONDITION_SCHEMA: vol.Schema = vol.Schema(
vol.Any(
key_value_schemas(
CONF_CONDITION,
{
"numeric_state": NUMERIC_STATE_CONDITION_SCHEMA,
"state": STATE_CONDITION_SCHEMA,
"sun": SUN_CONDITION_SCHEMA,
"template": TEMPLATE_CONDITION_SCHEMA,
"time": TIME_CONDITION_SCHEMA,
"zone": ZONE_CONDITION_SCHEMA,
"and": AND_CONDITION_SCHEMA,
"or": OR_CONDITION_SCHEMA,
"not": NOT_CONDITION_SCHEMA,
"device": DEVICE_CONDITION_SCHEMA,
},
),
dynamic_template,
)
)
2020-08-17 16:54:56 +00:00
TRIGGER_SCHEMA = vol.All(
ensure_list, [vol.Schema({vol.Required(CONF_PLATFORM): str}, extra=vol.ALLOW_EXTRA)]
)
2019-07-31 19:25:30 +00:00
_SCRIPT_DELAY_SCHEMA = vol.Schema(
{
vol.Optional(CONF_ALIAS): string,
vol.Required(CONF_DELAY): positive_time_period_template,
2019-07-31 19:25:30 +00:00
}
)
2016-04-21 22:52:20 +00:00
2019-07-31 19:25:30 +00:00
_SCRIPT_WAIT_TEMPLATE_SCHEMA = vol.Schema(
{
vol.Optional(CONF_ALIAS): string,
2020-03-05 19:44:42 +00:00
vol.Required(CONF_WAIT_TEMPLATE): template,
vol.Optional(CONF_TIMEOUT): positive_time_period_template,
2020-03-05 19:44:42 +00:00
vol.Optional(CONF_CONTINUE_ON_TIMEOUT): boolean,
2019-07-31 19:25:30 +00:00
}
)
DEVICE_ACTION_BASE_SCHEMA = vol.Schema(
{vol.Required(CONF_DEVICE_ID): string, vol.Required(CONF_DOMAIN): str}
)
DEVICE_ACTION_SCHEMA = DEVICE_ACTION_BASE_SCHEMA.extend({}, extra=vol.ALLOW_EXTRA)
2020-03-05 19:44:42 +00:00
_SCRIPT_SCENE_SCHEMA = vol.Schema({vol.Required(CONF_SCENE): entity_domain("scene")})
_SCRIPT_REPEAT_SCHEMA = vol.Schema(
{
vol.Optional(CONF_ALIAS): string,
vol.Required(CONF_REPEAT): vol.All(
{
vol.Exclusive(CONF_COUNT, "repeat"): vol.Any(vol.Coerce(int), template),
vol.Exclusive(CONF_WHILE, "repeat"): vol.All(
ensure_list, [CONDITION_SCHEMA]
),
vol.Exclusive(CONF_UNTIL, "repeat"): vol.All(
ensure_list, [CONDITION_SCHEMA]
),
vol.Required(CONF_SEQUENCE): SCRIPT_SCHEMA,
},
has_at_least_one_key(CONF_COUNT, CONF_WHILE, CONF_UNTIL),
),
}
)
_SCRIPT_CHOOSE_SCHEMA = vol.Schema(
{
vol.Optional(CONF_ALIAS): string,
vol.Required(CONF_CHOOSE): vol.All(
ensure_list,
[
{
vol.Required(CONF_CONDITIONS): vol.All(
ensure_list, [CONDITION_SCHEMA]
),
vol.Required(CONF_SEQUENCE): SCRIPT_SCHEMA,
}
],
),
vol.Optional(CONF_DEFAULT): SCRIPT_SCHEMA,
}
)
_SCRIPT_WAIT_FOR_TRIGGER_SCHEMA = vol.Schema(
{
vol.Optional(CONF_ALIAS): string,
vol.Required(CONF_WAIT_FOR_TRIGGER): TRIGGER_SCHEMA,
vol.Optional(CONF_TIMEOUT): positive_time_period_template,
vol.Optional(CONF_CONTINUE_ON_TIMEOUT): boolean,
}
)
_SCRIPT_SET_SCHEMA = vol.Schema(
{
vol.Optional(CONF_ALIAS): string,
vol.Required(CONF_VARIABLES): SCRIPT_VARIABLES_SCHEMA,
}
)
2020-03-05 19:44:42 +00:00
SCRIPT_ACTION_DELAY = "delay"
SCRIPT_ACTION_WAIT_TEMPLATE = "wait_template"
SCRIPT_ACTION_CHECK_CONDITION = "condition"
SCRIPT_ACTION_FIRE_EVENT = "event"
SCRIPT_ACTION_CALL_SERVICE = "call_service"
SCRIPT_ACTION_DEVICE_AUTOMATION = "device"
SCRIPT_ACTION_ACTIVATE_SCENE = "scene"
SCRIPT_ACTION_REPEAT = "repeat"
SCRIPT_ACTION_CHOOSE = "choose"
SCRIPT_ACTION_WAIT_FOR_TRIGGER = "wait_for_trigger"
SCRIPT_ACTION_VARIABLES = "variables"
2020-03-05 19:44:42 +00:00
def determine_script_action(action: dict) -> str:
"""Determine action type."""
if CONF_DELAY in action:
return SCRIPT_ACTION_DELAY
if CONF_WAIT_TEMPLATE in action:
return SCRIPT_ACTION_WAIT_TEMPLATE
if CONF_CONDITION in action:
return SCRIPT_ACTION_CHECK_CONDITION
if CONF_EVENT in action:
return SCRIPT_ACTION_FIRE_EVENT
if CONF_DEVICE_ID in action:
return SCRIPT_ACTION_DEVICE_AUTOMATION
if CONF_SCENE in action:
return SCRIPT_ACTION_ACTIVATE_SCENE
if CONF_REPEAT in action:
return SCRIPT_ACTION_REPEAT
if CONF_CHOOSE in action:
return SCRIPT_ACTION_CHOOSE
if CONF_WAIT_FOR_TRIGGER in action:
return SCRIPT_ACTION_WAIT_FOR_TRIGGER
if CONF_VARIABLES in action:
return SCRIPT_ACTION_VARIABLES
2020-03-05 19:44:42 +00:00
return SCRIPT_ACTION_CALL_SERVICE
ACTION_TYPE_SCHEMAS: Dict[str, Callable[[Any], dict]] = {
SCRIPT_ACTION_CALL_SERVICE: SERVICE_SCHEMA,
SCRIPT_ACTION_DELAY: _SCRIPT_DELAY_SCHEMA,
SCRIPT_ACTION_WAIT_TEMPLATE: _SCRIPT_WAIT_TEMPLATE_SCHEMA,
SCRIPT_ACTION_FIRE_EVENT: EVENT_SCHEMA,
SCRIPT_ACTION_CHECK_CONDITION: CONDITION_SCHEMA,
SCRIPT_ACTION_DEVICE_AUTOMATION: DEVICE_ACTION_SCHEMA,
SCRIPT_ACTION_ACTIVATE_SCENE: _SCRIPT_SCENE_SCHEMA,
SCRIPT_ACTION_REPEAT: _SCRIPT_REPEAT_SCHEMA,
SCRIPT_ACTION_CHOOSE: _SCRIPT_CHOOSE_SCHEMA,
SCRIPT_ACTION_WAIT_FOR_TRIGGER: _SCRIPT_WAIT_FOR_TRIGGER_SCHEMA,
SCRIPT_ACTION_VARIABLES: _SCRIPT_SET_SCHEMA,
2020-03-05 19:44:42 +00:00
}