"""Test config validators.""" from collections import OrderedDict from datetime import date, datetime, timedelta import enum import logging import os from socket import _GLOBAL_DEFAULT_TIMEOUT from unittest.mock import Mock, patch import uuid import pytest import voluptuous as vol import homeassistant from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers import ( config_validation as cv, issue_registry as ir, selector, template, ) def test_boolean() -> None: """Test boolean validation.""" schema = vol.Schema(cv.boolean) for value in ( None, "T", "negative", "lock", "tr ue", [], [1, 2], {"one": "two"}, test_boolean, ): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ("true", "On", "1", "YES", " true ", "enable", 1, 50, True, 0.1): assert schema(value) for value in ("false", "Off", "0", "NO", "disable", 0, False): assert not schema(value) def test_latitude() -> None: """Test latitude validation.""" schema = vol.Schema(cv.latitude) for value in ("invalid", None, -91, 91, "-91", "91", "123.01A"): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ("-89", 89, "12.34"): schema(value) def test_longitude() -> None: """Test longitude validation.""" schema = vol.Schema(cv.longitude) for value in ("invalid", None, -181, 181, "-181", "181", "123.01A"): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ("-179", 179, "12.34"): schema(value) def test_port() -> None: """Test TCP/UDP network port.""" schema = vol.Schema(cv.port) for value in ("invalid", None, -1, 0, 80000, "81000"): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ("1000", 21, 24574): schema(value) def test_isfile() -> None: """Validate that the value is an existing file.""" schema = vol.Schema(cv.isfile) fake_file = "this-file-does-not.exist" assert not os.path.isfile(fake_file) for value in ("invalid", None, -1, 0, 80000, fake_file): with pytest.raises(vol.Invalid): schema(value) # patching methods that allow us to fake a file existing # with write access with ( patch("os.path.isfile", Mock(return_value=True)), patch("os.access", Mock(return_value=True)), ): schema("test.txt") def test_url() -> None: """Test URL.""" schema = vol.Schema(cv.url) for value in ( "invalid", None, 100, "htp://ha.io", "http//ha.io", "http://??,**", "https://??,**", ): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ( "http://localhost", "https://localhost/test/index.html", "http://home-assistant.io", "http://home-assistant.io/test/", "https://community.home-assistant.io/", ): assert schema(value) def test_configuration_url() -> None: """Test URL.""" schema = vol.Schema(cv.configuration_url) for value in ( "invalid", None, 100, "htp://ha.io", "http//ha.io", "http://??,**", "https://??,**", "homeassistant://??,**", ): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ( "http://localhost", "https://localhost/test/index.html", "http://home-assistant.io", "http://home-assistant.io/test/", "https://community.home-assistant.io/", "homeassistant://api", "homeassistant://api/hassio_ingress/XXXXXXX", ): assert schema(value) def test_url_no_path() -> None: """Test URL.""" schema = vol.Schema(cv.url_no_path) for value in ( "https://localhost/test/index.html", "http://home-assistant.io/test/", ): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ( "http://localhost", "http://home-assistant.io", "https://community.home-assistant.io/", ): assert schema(value) def test_platform_config() -> None: """Test platform config validation.""" options = ({}, {"hello": "world"}) for value in options: with pytest.raises(vol.MultipleInvalid): cv.PLATFORM_SCHEMA(value) options = ({"platform": "mqtt"}, {"platform": "mqtt", "beer": "yes"}) for value in options: cv.PLATFORM_SCHEMA_BASE(value) def test_ensure_list() -> None: """Test ensure_list.""" schema = vol.Schema(cv.ensure_list) assert schema(None) == [] assert schema(1) == [1] assert schema([1]) == [1] assert schema("1") == ["1"] assert schema(["1"]) == ["1"] assert schema({"1": "2"}) == [{"1": "2"}] def test_entity_id() -> None: """Test entity ID validation.""" schema = vol.Schema(cv.entity_id) with pytest.raises(vol.MultipleInvalid): schema("invalid_entity") assert schema("sensor.LIGHT") == "sensor.light" @pytest.mark.parametrize("validator", [cv.entity_ids, cv.entity_ids_or_uuids]) def test_entity_ids(validator) -> None: """Test entity ID validation.""" schema = vol.Schema(validator) options = ( "invalid_entity", "sensor.light,sensor_invalid", ["invalid_entity"], ["sensor.light", "sensor_invalid"], ["sensor.light,sensor_invalid"], ) for value in options: with pytest.raises(vol.MultipleInvalid): schema(value) options = ([], ["sensor.light"], "sensor.light") for value in options: schema(value) assert schema("sensor.LIGHT, light.kitchen ") == ["sensor.light", "light.kitchen"] def test_entity_ids_or_uuids() -> None: """Test entity ID validation.""" schema = vol.Schema(cv.entity_ids_or_uuids) valid_uuid = "a266a680b608c32770e6c45bfe6b8411" valid_uuid2 = "a266a680b608c32770e6c45bfe6b8412" invalid_uuid_capital_letters = "A266A680B608C32770E6C45bfE6B8412" options = ( "invalid_uuid", invalid_uuid_capital_letters, f"{valid_uuid},invalid_uuid", ["invalid_uuid"], [valid_uuid, "invalid_uuid"], [f"{valid_uuid},invalid_uuid"], ) for value in options: with pytest.raises(vol.MultipleInvalid): schema(value) options = ([], [valid_uuid], valid_uuid) for value in options: schema(value) assert schema(f"{valid_uuid}, {valid_uuid2} ") == [valid_uuid, valid_uuid2] def test_entity_domain() -> None: """Test entity domain validation.""" schema = vol.Schema(cv.entity_domain("sensor")) for value in ( "invalid_entity", "cover.demo", "cover.demo,sensor.another_entity", "", ): with pytest.raises(vol.MultipleInvalid): schema(value) assert schema("sensor.LIGHT") == "sensor.light" schema = vol.Schema(cv.entity_domain(("sensor", "binary_sensor"))) for value in ("invalid_entity", "cover.demo"): with pytest.raises(vol.MultipleInvalid): schema(value) assert schema("sensor.LIGHT") == "sensor.light" assert schema("binary_sensor.LIGHT") == "binary_sensor.light" def test_entities_domain() -> None: """Test entities domain validation.""" schema = vol.Schema(cv.entities_domain("sensor")) options = ( None, "", "invalid_entity", ["sensor.light", "cover.demo"], ["sensor.light", "sensor_invalid"], ) for value in options: with pytest.raises(vol.MultipleInvalid): schema(value) options = ("sensor.light", ["SENSOR.light"], ["sensor.light", "sensor.demo"]) for value in options: schema(value) assert schema("sensor.LIGHT, sensor.demo ") == ["sensor.light", "sensor.demo"] assert schema(["sensor.light", "SENSOR.demo"]) == ["sensor.light", "sensor.demo"] def test_ensure_list_csv() -> None: """Test ensure_list_csv.""" schema = vol.Schema(cv.ensure_list_csv) options = (None, 12, [], ["string"], "string1,string2") for value in options: schema(value) assert schema("string1, string2 ") == ["string1", "string2"] def test_event_schema() -> None: """Test event_schema validation.""" options = ( {}, None, {"event_data": {}}, {"event": "state_changed", "event_data": 1}, ) for value in options: with pytest.raises(vol.MultipleInvalid): cv.EVENT_SCHEMA(value) options = ( {"event": "state_changed"}, {"event": "state_changed", "event_data": {"hello": "world"}}, ) for value in options: cv.EVENT_SCHEMA(value) def test_icon() -> None: """Test icon validation.""" schema = vol.Schema(cv.icon) for value in (False, "work"): with pytest.raises(vol.MultipleInvalid): schema(value) schema("mdi:work") schema("custom:prefix") def test_time_period() -> None: """Test time_period validation.""" schema = vol.Schema(cv.time_period) options = ( None, "", "hello:world", "12:", "12:34:56:78", {}, {"wrong_key": -10}, "12.5:30", "12:30.5", "12.5:30:30", "12:30.5:30", ) for value in options: with pytest.raises(vol.MultipleInvalid): schema(value) options = ( ("8:20", timedelta(hours=8, minutes=20)), ("23:59", timedelta(hours=23, minutes=59)), ("-8:20", -1 * timedelta(hours=8, minutes=20)), ("-1:15", -1 * timedelta(hours=1, minutes=15)), ("-23:59:59", -1 * timedelta(hours=23, minutes=59, seconds=59)), ("-48:00", -1 * timedelta(days=2)), ({"minutes": 5}, timedelta(minutes=5)), (1, timedelta(seconds=1)), ("5", timedelta(seconds=5)), ("180", timedelta(seconds=180)), ("00:08:20.5", timedelta(minutes=8, seconds=20, milliseconds=500)), ("00:23:59.999", timedelta(minutes=23, seconds=59, milliseconds=999)), ("-00:08:20.5", -1 * timedelta(minutes=8, seconds=20, milliseconds=500)), ( "-12:59:59.999", -1 * timedelta(hours=12, minutes=59, seconds=59, milliseconds=999), ), ({"milliseconds": 1.5}, timedelta(milliseconds=1, microseconds=500)), ({"seconds": "1.5"}, timedelta(seconds=1, milliseconds=500)), ({"minutes": "1.5"}, timedelta(minutes=1, seconds=30)), ({"hours": -1.5}, -1 * timedelta(hours=1, minutes=30)), ({"days": "-1.5"}, -1 * timedelta(days=1, hours=12)), ) for value, result in options: assert schema(value) == result def test_remove_falsy() -> None: """Test remove falsy.""" assert cv.remove_falsy([0, None, 1, "1", {}, [], ""]) == [1, "1"] def test_service() -> None: """Test service validation.""" schema = vol.Schema(cv.service) with pytest.raises(vol.MultipleInvalid): schema("invalid_turn_on") schema("homeassistant.turn_on") def test_service_schema(hass: HomeAssistant) -> None: """Test service_schema validation.""" options = ( {}, None, { "service": "homeassistant.turn_on", "service_template": "homeassistant.turn_on", }, {"data": {"entity_id": "light.kitchen"}}, {"service": "homeassistant.turn_on", "data": None}, { "service": "homeassistant.turn_on", "data_template": {"brightness": "{{ no_end"}, }, ) for value in options: with pytest.raises(vol.MultipleInvalid): cv.SERVICE_SCHEMA(value) options = ( {"service": "homeassistant.turn_on"}, {"service": "homeassistant.turn_on", "entity_id": "light.kitchen"}, {"service": "light.turn_on", "entity_id": "all"}, { "service": "homeassistant.turn_on", "entity_id": ["light.kitchen", "light.ceiling"], }, { "service": "light.turn_on", "entity_id": "all", "alias": "turn on kitchen lights", }, {"service": "scene.turn_on", "metadata": {}}, ) for value in options: cv.SERVICE_SCHEMA(value) # Check metadata is removed from the validated output assert cv.SERVICE_SCHEMA({"service": "scene.turn_on", "metadata": {}}) == { "service": "scene.turn_on" } def test_entity_service_schema() -> None: """Test make_entity_service_schema validation.""" schema = cv.make_entity_service_schema( {vol.Required("required"): cv.positive_int, vol.Optional("optional"): cv.string} ) options = ( {}, None, {"entity_id": "light.kitchen"}, {"optional": "value", "entity_id": "light.kitchen"}, {"required": 1}, {"required": 2, "area_id": "kitchen", "foo": "bar"}, {"required": "str", "area_id": "kitchen"}, ) for value in options: with pytest.raises(vol.MultipleInvalid): cv.SERVICE_SCHEMA(value) options = ( {"required": 1, "entity_id": "light.kitchen"}, {"required": 2, "optional": "value", "device_id": "a_device"}, {"required": 3, "area_id": "kitchen"}, ) for value in options: schema(value) options = ( { "required": 1, "entity_id": "light.kitchen", "metadata": {"some": "frontend_stuff"}, }, ) for value in options: validated = schema(value) assert "metadata" not in validated def test_entity_service_schema_with_metadata() -> None: """Test make_entity_service_schema with overridden metadata key.""" schema = cv.make_entity_service_schema({vol.Required("metadata"): cv.positive_int}) options = ({"metadata": {"some": "frontend_stuff"}, "entity_id": "light.kitchen"},) for value in options: with pytest.raises(vol.MultipleInvalid): cv.SERVICE_SCHEMA(value) options = ({"metadata": 1, "entity_id": "light.kitchen"},) for value in options: validated = schema(value) assert "metadata" in validated def test_slug() -> None: """Test slug validation.""" schema = vol.Schema(cv.slug) for value in (None, "hello world"): with pytest.raises(vol.MultipleInvalid): schema(value) for value in (12345, "hello"): schema(value) def test_string(hass: HomeAssistant) -> None: """Test string validation.""" schema = vol.Schema(cv.string) with pytest.raises(vol.Invalid): schema(None) with pytest.raises(vol.Invalid): schema([]) with pytest.raises(vol.Invalid): schema({}) for value in (True, 1, "hello"): schema(value) # Test subclasses of str are returned class MyString(str): __slots__ = () my_string = MyString("hello") assert schema(my_string) is my_string # Test template support for text, native in ( ("[1, 2]", [1, 2]), ("{1, 2}", {1, 2}), ("(1, 2)", (1, 2)), ('{"hello": True}', {"hello": True}), ): tpl = template.Template(text, hass) result = tpl.async_render() assert isinstance(result, template.ResultWrapper) assert result == native assert schema(result) == text def test_string_with_no_html() -> None: """Test string with no html validation.""" schema = vol.Schema(cv.string_with_no_html) with pytest.raises(vol.Invalid): schema("This has HTML in it Link") with pytest.raises(vol.Invalid): schema("Bold") with pytest.raises(vol.Invalid): schema("HTML element names are case-insensitive.") for value in ( True, 3, "Hello", "**Hello**", "This has no HTML [Link](https://home-assistant.io)", ): schema(value) def test_temperature_unit() -> None: """Test temperature unit validation.""" schema = vol.Schema(cv.temperature_unit) with pytest.raises(vol.MultipleInvalid): schema("K") schema("C") schema("F") def test_x10_address() -> None: """Test x10 addr validator.""" schema = vol.Schema(cv.x10_address) with pytest.raises(vol.Invalid): schema("Q1") with pytest.raises(vol.Invalid): schema("q55") with pytest.raises(vol.Invalid): schema("garbage_addr") schema("a1") schema("C11") def test_template(hass: HomeAssistant) -> None: """Test template validator.""" schema = vol.Schema(cv.template) for value in ( None, "{{ partial_print }", "{% if True %}Hello", ["test"], ): with pytest.raises(vol.Invalid): schema(value) options = ( 1, "Hello", "{{ beer }}", "{% if 1 == 1 %}Hello{% else %}World{% endif %}", # Function added as an extension by Home Assistant "{{ expand('group.foo')|map(attribute='entity_id')|list }}", # Filter added as an extension by Home Assistant "{{ ['group.foo']|expand|map(attribute='entity_id')|list }}", ) for value in options: schema(value) async def test_template_no_hass(hass: HomeAssistant) -> None: """Test template validator.""" schema = vol.Schema(cv.template) for value in ( None, "{{ partial_print }", "{% if True %}Hello", ["test"], # Filter added as an extension by Home Assistant "{{ ['group.foo']|expand|map(attribute='entity_id')|list }}", ): with pytest.raises(vol.Invalid): await hass.async_add_executor_job(schema, value) options = ( 1, "Hello", "{{ beer }}", "{% if 1 == 1 %}Hello{% else %}World{% endif %}", # Function added as an extension by Home Assistant "{{ expand('group.foo')|map(attribute='entity_id')|list }}", ) for value in options: await hass.async_add_executor_job(schema, value) def test_dynamic_template(hass: HomeAssistant) -> None: """Test dynamic template validator.""" schema = vol.Schema(cv.dynamic_template) for value in ( None, 1, "{{ partial_print }", "{% if True %}Hello", ["test"], "just a string", ): with pytest.raises(vol.Invalid): schema(value) options = ( "{{ beer }}", "{% if 1 == 1 %}Hello{% else %}World{% endif %}", # Function added as an extension by Home Assistant "{{ expand('group.foo')|map(attribute='entity_id')|list }}", # Filter added as an extension by Home Assistant "{{ ['group.foo']|expand|map(attribute='entity_id')|list }}", ) for value in options: schema(value) async def test_dynamic_template_no_hass(hass: HomeAssistant) -> None: """Test dynamic template validator.""" schema = vol.Schema(cv.dynamic_template) for value in ( None, 1, "{{ partial_print }", "{% if True %}Hello", ["test"], "just a string", # Filter added as an extension by Home Assistant "{{ ['group.foo']|expand|map(attribute='entity_id')|list }}", ): with pytest.raises(vol.Invalid): await hass.async_add_executor_job(schema, value) options = ( "{{ beer }}", "{% if 1 == 1 %}Hello{% else %}World{% endif %}", # Function added as an extension by Home Assistant "{{ expand('group.foo')|map(attribute='entity_id')|list }}", ) for value in options: await hass.async_add_executor_job(schema, value) def test_template_complex() -> None: """Test template_complex validator.""" schema = vol.Schema(cv.template_complex) for value in ("{{ partial_print }", "{% if True %}Hello"): with pytest.raises(vol.MultipleInvalid): schema(value) options = ( 1, "Hello", "{{ beer }}", "{% if 1 == 1 %}Hello{% else %}World{% endif %}", {"test": 1, "test2": "{{ beer }}"}, ["{{ beer }}", 1], ) for value in options: schema(value) # ensure the validator didn't mutate the input assert options == ( 1, "Hello", "{{ beer }}", "{% if 1 == 1 %}Hello{% else %}World{% endif %}", {"test": 1, "test2": "{{ beer }}"}, ["{{ beer }}", 1], ) # Ensure we don't mutate non-string types that cannot be templates. for value in (1, True, None): assert schema(value) == value def test_time_zone() -> None: """Test time zone validation.""" schema = vol.Schema(cv.time_zone) with pytest.raises(vol.MultipleInvalid): schema("America/Do_Not_Exist") schema("America/Los_Angeles") schema("UTC") def test_date() -> None: """Test date validation.""" schema = vol.Schema(cv.date) for value in ("Not a date", "23:42", "2016-11-23T18:59:08"): with pytest.raises(vol.Invalid): schema(value) schema(datetime.now().date()) schema("2016-11-23") def test_time() -> None: """Test date validation.""" schema = vol.Schema(cv.time) for value in ("Not a time", "2016-11-23", "2016-11-23T18:59:08"): with pytest.raises(vol.Invalid): schema(value) schema(datetime.now().time()) schema("23:42:00") schema("23:42") def test_datetime() -> None: """Test date time validation.""" schema = vol.Schema(cv.datetime) for value in (date.today(), "Wrong DateTime"): with pytest.raises(vol.MultipleInvalid): schema(value) schema(datetime.now()) schema("2016-11-23T18:59:08") def test_multi_select() -> None: """Test multi select validation. Expected behavior: - Will not accept any input but a list - Will not accept selections outside of configured scope """ schema = vol.Schema(cv.multi_select({"paulus": "Paulus", "robban": "Robban"})) with pytest.raises(vol.Invalid): schema("robban") with pytest.raises(vol.Invalid): schema(["paulus", "martinhj"]) schema(["robban", "paulus"]) def test_multi_select_in_serializer() -> None: """Test multi_select with custom_serializer.""" assert cv.custom_serializer(cv.multi_select({"paulus": "Paulus"})) == { "type": "multi_select", "options": {"paulus": "Paulus"}, } def test_boolean_in_serializer() -> None: """Test boolean with custom_serializer.""" assert cv.custom_serializer(cv.boolean) == { "type": "boolean", } def test_string_in_serializer() -> None: """Test string with custom_serializer.""" assert cv.custom_serializer(cv.string) == { "type": "string", } def test_selector_in_serializer() -> None: """Test selector with custom_serializer.""" assert cv.custom_serializer(selector.selector({"text": {}})) == { "selector": { "text": { "multiline": False, "multiple": False, } } } def test_positive_time_period_dict_in_serializer() -> None: """Test positive_time_period_dict with custom_serializer.""" assert cv.custom_serializer(cv.positive_time_period_dict) == { "type": "positive_time_period_dict", } @pytest.fixture def schema(): """Create a schema used for testing deprecation.""" return vol.Schema({"venus": cv.boolean, "mars": cv.boolean, "jupiter": cv.boolean}) @pytest.fixture def version(monkeypatch: pytest.MonkeyPatch) -> None: """Patch the version used for testing to 0.5.0.""" monkeypatch.setattr(homeassistant.const, "__version__", "0.5.0") def test_deprecated_with_no_optionals(caplog: pytest.LogCaptureFixture, schema) -> None: """Test deprecation behaves correctly when optional params are None. Expected behavior: - Outputs the appropriate deprecation warning if key is detected - Processes schema without changing any values - No warning or difference in output if key is not provided """ deprecated_schema = vol.All(cv.deprecated("mars"), schema) test_data = {"mars": True} output = deprecated_schema(test_data.copy()) assert len(caplog.records) == 1 assert caplog.records[0].name in [ __name__, "homeassistant.helpers.config_validation", ] assert ( "The 'mars' option is deprecated, please remove it from your configuration" ) in caplog.text assert test_data == output caplog.clear() assert len(caplog.records) == 0 test_data = {"venus": True} output = deprecated_schema(test_data.copy()) assert len(caplog.records) == 0 assert test_data == output def test_deprecated_or_removed_param_and_raise( caplog: pytest.LogCaptureFixture, schema ) -> None: """Test removed or deprecation options and fail the config validation by raising an exception. Expected behavior: - Outputs the appropriate deprecation or removed from support error if key is detected """ removed_schema = vol.All(cv.deprecated("mars", raise_if_present=True), schema) test_data = {"mars": True} with pytest.raises(vol.Invalid) as excinfo: removed_schema(test_data) assert ( "The 'mars' option is deprecated, please remove it from your configuration" in str(excinfo.value) ) assert len(caplog.records) == 0 test_data = {"venus": True} output = removed_schema(test_data.copy()) assert len(caplog.records) == 0 assert test_data == output deprecated_schema = vol.All(cv.removed("mars"), schema) test_data = {"mars": True} with pytest.raises(vol.Invalid) as excinfo: deprecated_schema(test_data) assert ( "The 'mars' option has been removed, please remove it from your configuration" in str(excinfo.value) ) assert len(caplog.records) == 0 test_data = {"venus": True} output = deprecated_schema(test_data.copy()) assert len(caplog.records) == 0 assert test_data == output def test_deprecated_with_replacement_key( caplog: pytest.LogCaptureFixture, schema ) -> None: """Test deprecation behaves correctly when only a replacement key is provided. Expected behavior: - Outputs the appropriate deprecation warning if key is detected - Processes schema moving the value from key to replacement_key - Processes schema changing nothing if only replacement_key provided - No warning if only replacement_key provided - No warning or difference in output if neither key nor replacement_key are provided """ deprecated_schema = vol.All( cv.deprecated("mars", replacement_key="jupiter"), schema ) test_data = {"mars": True} output = deprecated_schema(test_data.copy()) assert len(caplog.records) == 1 assert ( "The 'mars' option is deprecated, please replace it with 'jupiter'" ) in caplog.text assert output == {"jupiter": True} caplog.clear() assert len(caplog.records) == 0 test_data = {"jupiter": True} output = deprecated_schema(test_data.copy()) assert len(caplog.records) == 0 assert test_data == output test_data = {"venus": True} output = deprecated_schema(test_data.copy()) assert len(caplog.records) == 0 assert test_data == output def test_deprecated_with_default(caplog: pytest.LogCaptureFixture, schema) -> None: """Test deprecation behaves correctly with a default value. This is likely a scenario that would never occur. Expected behavior: - Behaves identically as when the default value was not present """ deprecated_schema = vol.All(cv.deprecated("mars", default=False), schema) test_data = {"mars": True} with patch( "homeassistant.helpers.config_validation.get_integration_logger", return_value=logging.getLogger(__name__), ): output = deprecated_schema(test_data.copy()) assert len(caplog.records) == 1 assert caplog.records[0].name == __name__ assert ( "The 'mars' option is deprecated, please remove it from your configuration" ) in caplog.text assert test_data == output caplog.clear() assert len(caplog.records) == 0 test_data = {"venus": True} output = deprecated_schema(test_data.copy()) assert len(caplog.records) == 0 assert test_data == output def test_deprecated_with_replacement_key_and_default( caplog: pytest.LogCaptureFixture, schema ) -> None: """Test deprecation with a replacement key and default. Expected behavior: - Outputs the appropriate deprecation warning if key is detected - Processes schema moving the value from key to replacement_key - Processes schema changing nothing if only replacement_key provided - No warning if only replacement_key provided - No warning if neither key nor replacement_key are provided - Adds replacement_key with default value in this case """ deprecated_schema = vol.All( cv.deprecated("mars", replacement_key="jupiter", default=False), schema ) test_data = {"mars": True} output = deprecated_schema(test_data.copy()) assert len(caplog.records) == 1 assert ( "The 'mars' option is deprecated, please replace it with 'jupiter'" ) in caplog.text assert output == {"jupiter": True} caplog.clear() assert len(caplog.records) == 0 test_data = {"jupiter": True} output = deprecated_schema(test_data.copy()) assert len(caplog.records) == 0 assert test_data == output test_data = {"venus": True} output = deprecated_schema(test_data.copy()) assert len(caplog.records) == 0 assert output == {"venus": True, "jupiter": False} deprecated_schema_with_default = vol.All( vol.Schema( { "venus": cv.boolean, vol.Optional("mars", default=False): cv.boolean, vol.Optional("jupiter", default=False): cv.boolean, } ), cv.deprecated("mars", replacement_key="jupiter", default=False), ) test_data = {"mars": True} output = deprecated_schema_with_default(test_data.copy()) assert len(caplog.records) == 1 assert ( "The 'mars' option is deprecated, please replace it with 'jupiter'" ) in caplog.text assert output == {"jupiter": True} def test_deprecated_cant_find_module() -> None: """Test if the current module cannot be found.""" # This used to raise. cv.deprecated( "mars", replacement_key="jupiter", default=False, ) # This used to raise. cv.removed( "mars", default=False, ) def test_deprecated_or_removed_logger_with_config_attributes( caplog: pytest.LogCaptureFixture, ) -> None: """Test if the logger outputs the correct message if the line and file attribute is available in config.""" file: str = "configuration.yaml" line: int = 54 # test as deprecated option replacement_key = "jupiter" option_status = "is deprecated" replacement = f"'mars' option near {file}:{line} {option_status}, please replace it with '{replacement_key}'" config = OrderedDict([("mars", "blah")]) setattr(config, "__config_file__", file) setattr(config, "__line__", line) validated = cv.deprecated("mars", replacement_key=replacement_key, default=False)( config ) assert "mars" not in validated # Removed because a replacement_key is defined assert len(caplog.records) == 1 assert replacement in caplog.text caplog.clear() assert len(caplog.records) == 0 # test as removed option option_status = "has been removed" replacement = f"'mars' option near {file}:{line} {option_status}, please remove it from your configuration" config = OrderedDict([("mars", "blah")]) setattr(config, "__config_file__", file) setattr(config, "__line__", line) validated = cv.removed("mars", default=False, raise_if_present=False)(config) assert "mars" not in validated # Removed because by cv.removed assert len(caplog.records) == 1 assert replacement in caplog.text caplog.clear() assert len(caplog.records) == 0 def test_deprecated_logger_with_one_config_attribute( caplog: pytest.LogCaptureFixture, ) -> None: """Test if the logger outputs the correct message if only one of line and file attribute is available in config.""" file: str = "configuration.yaml" line: int = 54 replacement = f"'mars' option near {file}:{line} is deprecated" config = OrderedDict([("mars", "blah")]) setattr(config, "__config_file__", file) cv.deprecated("mars", replacement_key="jupiter", default=False)(config) assert len(caplog.records) == 1 assert replacement not in caplog.text assert ( "The 'mars' option is deprecated, please replace it with 'jupiter'" ) in caplog.text caplog.clear() assert len(caplog.records) == 0 config = OrderedDict([("mars", "blah")]) setattr(config, "__line__", line) cv.deprecated("mars", replacement_key="jupiter", default=False)(config) assert len(caplog.records) == 1 assert replacement not in caplog.text assert ( "The 'mars' option is deprecated, please replace it with 'jupiter'" ) in caplog.text caplog.clear() assert len(caplog.records) == 0 def test_deprecated_logger_without_config_attributes( caplog: pytest.LogCaptureFixture, ) -> None: """Test if the logger outputs the correct message if the line and file attribute is not available in config.""" file: str = "configuration.yaml" line: int = 54 replacement = f"'mars' option near {file}:{line} is deprecated" config = OrderedDict([("mars", "blah")]) cv.deprecated("mars", replacement_key="jupiter", default=False)(config) assert len(caplog.records) == 1 assert replacement not in caplog.text assert ( "The 'mars' option is deprecated, please replace it with 'jupiter'" ) in caplog.text caplog.clear() assert len(caplog.records) == 0 def test_key_dependency() -> None: """Test key_dependency validator.""" schema = vol.Schema(cv.key_dependency("beer", "soda")) options = {"beer": None} for value in options: with pytest.raises(vol.MultipleInvalid): schema(value) options = ({"beer": None, "soda": None}, {"soda": None}, {}) for value in options: schema(value) def test_has_at_most_one_key() -> None: """Test has_at_most_one_key validator.""" schema = vol.Schema(cv.has_at_most_one_key("beer", "soda")) for value in (None, [], {"beer": None, "soda": None}): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ({}, {"beer": None}, {"soda": None}, {vol.Optional("soda"): None}): schema(value) def test_has_at_least_one_key() -> None: """Test has_at_least_one_key validator.""" schema = vol.Schema(cv.has_at_least_one_key("beer", "soda")) for value in (None, [], {}, {"wine": None}): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ({"beer": None}, {"soda": None}, {vol.Required("soda"): None}): schema(value) def test_enum() -> None: """Test enum validator.""" class TestEnum(enum.Enum): """Test enum.""" value1 = "Value 1" value2 = "Value 2" schema = vol.Schema(cv.enum(TestEnum)) with pytest.raises(vol.Invalid): schema("value3") def test_socket_timeout() -> None: """Test socket timeout validator.""" schema = vol.Schema(cv.socket_timeout) with pytest.raises(vol.Invalid): schema(0.0) with pytest.raises(vol.Invalid): schema(-1) assert schema(None) == _GLOBAL_DEFAULT_TIMEOUT assert schema(1) == 1.0 def test_matches_regex() -> None: """Test matches_regex validator.""" schema = vol.Schema(cv.matches_regex(".*uiae.*")) with pytest.raises(vol.Invalid): schema(1.0) with pytest.raises(vol.Invalid): schema(" nrtd ") test_str = "This is a test including uiae." assert schema(test_str) == test_str def test_is_regex() -> None: """Test the is_regex validator.""" schema = vol.Schema(cv.is_regex) with pytest.raises(vol.Invalid): schema("(") with pytest.raises(vol.Invalid): schema({"a dict": "is not a regex"}) valid_re = ".*" schema(valid_re) def test_comp_entity_ids() -> None: """Test config validation for component entity IDs.""" schema = vol.Schema(cv.comp_entity_ids) for valid in ( "ALL", "all", "AlL", "light.kitchen", ["light.kitchen"], ["light.kitchen", "light.ceiling"], [], ): schema(valid) for invalid in (["light.kitchen", "not-entity-id"], "*", ""): with pytest.raises(vol.Invalid): schema(invalid) def test_uuid4_hex(caplog: pytest.LogCaptureFixture) -> None: """Test uuid validation.""" schema = vol.Schema(cv.uuid4_hex) for value in ("Not a hex string", "0", 0): with pytest.raises(vol.Invalid): schema(value) with pytest.raises(vol.Invalid): # the 13th char should be 4 schema("a03d31b22eee1acc9b90eec40be6ed23") with pytest.raises(vol.Invalid): # the 17th char should be 8-a schema("a03d31b22eee4acc7b90eec40be6ed23") _hex = uuid.uuid4().hex assert schema(_hex) == _hex assert schema(_hex.upper()) == _hex def test_key_value_schemas() -> None: """Test key value schemas.""" schema = vol.Schema( cv.key_value_schemas( "mode", { "number": vol.Schema({"mode": "number", "data": int}), "string": vol.Schema({"mode": "string", "data": str}), }, ) ) with pytest.raises(vol.Invalid) as excinfo: schema(True) assert str(excinfo.value) == "Expected a dictionary" for mode in None, {"a": "dict"}, "invalid": with pytest.raises(vol.Invalid) as excinfo: schema({"mode": mode}) assert ( str(excinfo.value) == f"Unexpected value for mode: '{mode}'. Expected number, string" ) with pytest.raises(vol.Invalid) as excinfo: schema({"mode": "number", "data": "string-value"}) assert str(excinfo.value) == "expected int for dictionary value @ data['data']" with pytest.raises(vol.Invalid) as excinfo: schema({"mode": "string", "data": 1}) assert str(excinfo.value) == "expected str for dictionary value @ data['data']" for mode, data in (("number", 1), ("string", "hello")): schema({"mode": mode, "data": data}) def test_key_value_schemas_with_default() -> None: """Test key value schemas.""" schema = vol.Schema( cv.key_value_schemas( "mode", { "number": vol.Schema({"mode": "number", "data": int}), "string": vol.Schema({"mode": "string", "data": str}), }, vol.Schema({"mode": cv.dynamic_template}), "a cool template", ) ) with pytest.raises(vol.Invalid) as excinfo: schema(True) assert str(excinfo.value) == "Expected a dictionary" for mode in None, {"a": "dict"}, "invalid": with pytest.raises(vol.Invalid) as excinfo: schema({"mode": mode}) assert ( str(excinfo.value) == f"Unexpected value for mode: '{mode}'. Expected number, string, a cool template" ) with pytest.raises(vol.Invalid) as excinfo: schema({"mode": "number", "data": "string-value"}) assert str(excinfo.value) == "expected int for dictionary value @ data['data']" with pytest.raises(vol.Invalid) as excinfo: schema({"mode": "string", "data": 1}) assert str(excinfo.value) == "expected str for dictionary value @ data['data']" for mode, data in (("number", 1), ("string", "hello")): schema({"mode": mode, "data": data}) schema({"mode": "{{ 1 + 1}}"}) @pytest.mark.parametrize( ("config", "error"), [ ({"delay": "{{ invalid"}, "should be format 'HH:MM'"), ({"wait_template": "{{ invalid"}, "invalid template"), ({"condition": "invalid"}, "Unexpected value for condition: 'invalid'"), ( {"condition": "not", "conditions": {"condition": "invalid"}}, "Unexpected value for condition: 'invalid'", ), # The validation error message could be improved to explain that this is not # a valid shorthand template ( {"condition": "not", "conditions": "not a dynamic template"}, "Expected a dictionary", ), ( {"event": None}, r"string value is None for dictionary value @ data\['event'\]", ), ( {"device_id": None}, r"string value is None for dictionary value @ data\['device_id'\]", ), ( {"scene": "light.kitchen"}, "Entity ID 'light.kitchen' does not belong to domain 'scene'", ), ( { "alias": "stop step", "stop": "In the name of love", "error": True, "response_variable": "response-value", }, "not allowed to add a response to an error stop action", ), ], ) def test_script(caplog: pytest.LogCaptureFixture, config: dict, error: str) -> None: """Test script validation is user friendly.""" with pytest.raises(vol.Invalid, match=error): cv.script_action(config) def test_whitespace() -> None: """Test whitespace validation.""" schema = vol.Schema(cv.whitespace) for value in ( None, "T", "negative", "lock", "tr ue", [], [1, 2], {"one": "two"}, ): with pytest.raises(vol.MultipleInvalid): schema(value) for value in (" ", " "): assert schema(value) def test_currency() -> None: """Test currency validator.""" schema = vol.Schema(cv.currency) for value in ( None, "BTC", ): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ("EUR", "USD"): assert schema(value) def test_historic_currency() -> None: """Test historic currency validator.""" schema = vol.Schema(cv.historic_currency) for value in (None, "BTC", "EUR"): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ("DEM", "NLG"): assert schema(value) def test_country() -> None: """Test country validator.""" schema = vol.Schema(cv.country) for value in (None, "Candyland", "USA"): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ("NL", "SE"): assert schema(value) def test_language() -> None: """Test language validator.""" schema = vol.Schema(cv.language) for value in (None, "Klingon", "english"): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ("en", "sv"): assert schema(value) def test_positive_time_period_template() -> None: """Test positive time period template validation.""" schema = vol.Schema(cv.positive_time_period_template) with pytest.raises(vol.MultipleInvalid): schema({}) with pytest.raises(vol.MultipleInvalid): schema({5: 5}) with pytest.raises(vol.MultipleInvalid): schema({"invalid": 5}) with pytest.raises(vol.MultipleInvalid): schema("invalid") # Time periods pass schema("00:01") schema("00:00:01") schema("00:00:00.500") schema({"minutes": 5}) # Templates are not evaluated and will pass schema("{{ 'invalid' }}") schema({"{{ 'invalid' }}": 5}) schema({"minutes": "{{ 'invalid' }}"}) def test_empty_schema(caplog: pytest.LogCaptureFixture) -> None: """Test empty_config_schema.""" expected_message = ( "The test_domain integration does not support any configuration parameters" ) cv.empty_config_schema("test_domain")({}) assert expected_message not in caplog.text cv.empty_config_schema("test_domain")({"test_domain": {}}) assert expected_message not in caplog.text cv.empty_config_schema("test_domain")({"test_domain": {"foo": "bar"}}) assert expected_message in caplog.text def test_empty_schema_cant_find_module() -> None: """Test if the current module cannot be inspected.""" cv.empty_config_schema("test_domain")({"test_domain": {"foo": "bar"}}) def test_config_entry_only_schema( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, issue_registry: ir.IssueRegistry, ) -> None: """Test config_entry_only_config_schema.""" expected_issue = "config_entry_only_test_domain" expected_message = ( "The test_domain integration does not support YAML setup, please remove " "it from your configuration" ) cv.config_entry_only_config_schema("test_domain")({}) assert expected_message not in caplog.text assert not issue_registry.async_get_issue(HOMEASSISTANT_DOMAIN, expected_issue) cv.config_entry_only_config_schema("test_domain")({"test_domain": {}}) assert expected_message in caplog.text assert issue_registry.async_get_issue(HOMEASSISTANT_DOMAIN, expected_issue) issue_registry.async_delete(HOMEASSISTANT_DOMAIN, expected_issue) cv.config_entry_only_config_schema("test_domain")({"test_domain": {"foo": "bar"}}) assert expected_message in caplog.text assert issue_registry.async_get_issue(HOMEASSISTANT_DOMAIN, expected_issue) def test_config_entry_only_schema_cant_find_module() -> None: """Test if the current module cannot be inspected.""" cv.config_entry_only_config_schema("test_domain")({"test_domain": {"foo": "bar"}}) def test_config_entry_only_schema_no_hass( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, issue_registry: ir.IssueRegistry, ) -> None: """Test if the hass context is not set in our context.""" with patch( "homeassistant.helpers.config_validation.async_get_hass", side_effect=HomeAssistantError, ): cv.config_entry_only_config_schema("test_domain")( {"test_domain": {"foo": "bar"}} ) expected_message = ( "The test_domain integration does not support YAML setup, please remove " "it from your configuration" ) assert expected_message in caplog.text assert not issue_registry.issues def test_platform_only_schema( hass: HomeAssistant, caplog: pytest.LogCaptureFixture, issue_registry: ir.IssueRegistry, ) -> None: """Test config_entry_only_config_schema.""" expected_issue = "platform_only_test_domain" expected_message = ( "The test_domain integration does not support YAML setup, please remove " "it from your configuration" ) cv.platform_only_config_schema("test_domain")({}) assert expected_message not in caplog.text assert not issue_registry.async_get_issue(HOMEASSISTANT_DOMAIN, expected_issue) cv.platform_only_config_schema("test_domain")({"test_domain": {}}) assert expected_message in caplog.text assert issue_registry.async_get_issue(HOMEASSISTANT_DOMAIN, expected_issue) issue_registry.async_delete(HOMEASSISTANT_DOMAIN, expected_issue) cv.platform_only_config_schema("test_domain")({"test_domain": {"foo": "bar"}}) assert expected_message in caplog.text assert issue_registry.async_get_issue(HOMEASSISTANT_DOMAIN, expected_issue) def test_domain() -> None: """Test domain.""" with pytest.raises(vol.Invalid): cv.domain_key(5) with pytest.raises(vol.Invalid): cv.domain_key("") with pytest.raises(vol.Invalid): cv.domain_key("hue ") with pytest.raises(vol.Invalid): cv.domain_key("hue ") assert cv.domain_key("hue") == "hue" assert cv.domain_key("hue1") == "hue1" assert cv.domain_key("hue 1") == "hue" assert cv.domain_key("hue 1") == "hue" def test_color_hex() -> None: """Test color validation in hex format.""" assert cv.color_hex("#123456") == "#123456" assert cv.color_hex("#FFaaFF") == "#FFaaFF" assert cv.color_hex("#FFFFFF") == "#FFFFFF" assert cv.color_hex("#000000") == "#000000" msg = r"Color should be in the format #RRGGBB" with pytest.raises(vol.Invalid, match=msg): cv.color_hex("#777") with pytest.raises(vol.Invalid, match=msg): cv.color_hex("FFFFF") with pytest.raises(vol.Invalid, match=msg): cv.color_hex("FFFFFF") with pytest.raises(vol.Invalid, match=msg): cv.color_hex("#FFFFFFF") with pytest.raises(vol.Invalid, match=msg): cv.color_hex(123456) def test_determine_script_action_ambiguous() -> None: """Test determine script action with ambiguous actions.""" assert ( cv.determine_script_action( { "type": "is_power", "condition": "device", "device_id": "9c2bda81bc7997c981f811c32cafdb22", "entity_id": "2ee287ec70dd0c6db187b539bee429b7", "domain": "sensor", "below": "15", } ) == "condition" ) def test_determine_script_action_non_ambiguous() -> None: """Test determine script action with a non ambiguous action.""" assert cv.determine_script_action({"delay": "00:00:05"}) == "delay"