Check config script (#2657)

* Add check_config, yaml linting script

* WIP: Start reusing some bootstrap methods for validation

* Start outputs

* Secrets, files and failed config

* requirements_all

* Fixes

* formatting

* Fix unit test after formatting
pull/2884/merge
Johann Kellerman 2016-08-23 06:42:05 +02:00 committed by Paulus Schoutsen
parent f00cdc50df
commit 14b034f452
7 changed files with 458 additions and 28 deletions

View File

@ -104,7 +104,7 @@ def _setup_component(hass: core.HomeAssistant, domain: str, config) -> bool:
try:
config = component.CONFIG_SCHEMA(config)
except vol.MultipleInvalid as ex:
_log_exception(ex, domain, config)
log_exception(ex, domain, config)
return False
elif hasattr(component, 'PLATFORM_SCHEMA'):
@ -114,7 +114,7 @@ def _setup_component(hass: core.HomeAssistant, domain: str, config) -> bool:
try:
p_validated = component.PLATFORM_SCHEMA(p_config)
except vol.MultipleInvalid as ex:
_log_exception(ex, domain, p_config)
log_exception(ex, domain, p_config)
return False
# Not all platform components follow same pattern for platforms
@ -135,8 +135,8 @@ def _setup_component(hass: core.HomeAssistant, domain: str, config) -> bool:
try:
p_validated = platform.PLATFORM_SCHEMA(p_validated)
except vol.MultipleInvalid as ex:
_log_exception(ex, '{}.{}'.format(domain, p_name),
p_validated)
log_exception(ex, '{}.{}'.format(domain, p_name),
p_validated)
return False
platforms.append(p_validated)
@ -240,7 +240,7 @@ def from_config_dict(config: Dict[str, Any],
try:
conf_util.process_ha_core_config(hass, core_config)
except vol.Invalid as ex:
_log_exception(ex, 'homeassistant', core_config)
log_exception(ex, 'homeassistant', core_config)
return None
conf_util.process_ha_config_upgrade(hass)
@ -374,7 +374,7 @@ def _ensure_loader_prepared(hass: core.HomeAssistant) -> None:
loader.prepare(hass)
def _log_exception(ex, domain, config):
def log_exception(ex, domain, config):
"""Generate log exception for config validation."""
message = 'Invalid config for [{}]: '.format(domain)
if 'extra keys not allowed' in ex.error_message:

View File

@ -0,0 +1,261 @@
"""Script to ensure a configuration file exists."""
import argparse
import os
from glob import glob
import logging
from typing import List, Dict, Sequence
from unittest.mock import patch
from platform import system
from homeassistant.exceptions import HomeAssistantError
import homeassistant.bootstrap as bootstrap
import homeassistant.config as config_util
import homeassistant.loader as loader
import homeassistant.util.yaml as yaml
REQUIREMENTS = ('colorlog>2.1<3',)
if system() == 'Windows': # Ensure colorama installed for colorlog on Windows
REQUIREMENTS += ('colorama<=1',)
_LOGGER = logging.getLogger(__name__)
# pylint: disable=protected-access
MOCKS = {
'load': ("homeassistant.util.yaml.load_yaml", yaml.load_yaml),
'get': ("homeassistant.loader.get_component", loader.get_component),
'secrets': ("homeassistant.util.yaml._secret_yaml", yaml._secret_yaml),
'except': ("homeassistant.bootstrap.log_exception",
bootstrap.log_exception)
}
SILENCE = (
'homeassistant.util.yaml.clear_secret_cache',
'homeassistant.core._LOGGER.info',
'homeassistant.loader._LOGGER.info',
'homeassistant.bootstrap._LOGGER.info',
'homeassistant.bootstrap._LOGGER.warning',
'homeassistant.util.yaml._LOGGER.debug',
)
PATCHES = {}
C_HEAD = 'bold'
ERROR_STR = 'General Errors'
def color(the_color, *args, reset=None):
"""Color helper."""
from colorlog.escape_codes import escape_codes, parse_colors
try:
if len(args) == 0:
assert reset is None, "You cannot reset if nothing being printed"
return parse_colors(the_color)
return parse_colors(the_color) + ' '.join(args) + \
escape_codes[reset or 'reset']
except KeyError as k:
raise ValueError("Invalid color {} in {}".format(str(k), the_color))
# pylint: disable=too-many-locals, too-many-branches
def run(script_args: List) -> int:
"""Handle ensure config commandline script."""
parser = argparse.ArgumentParser(
description=("Check Home Assistant configuration."))
parser.add_argument(
'--script', choices=['check_config'])
parser.add_argument(
'-c', '--config',
default=config_util.get_default_config_dir(),
help="Directory that contains the Home Assistant configuration")
parser.add_argument(
'-i', '--info',
default=None,
help="Show a portion of the config")
parser.add_argument(
'-f', '--files',
action='store_true',
help="Show used configuration files")
parser.add_argument(
'-s', '--secrets',
action='store_true',
help="Show secret information")
args = parser.parse_args()
config_dir = os.path.join(os.getcwd(), args.config)
config_path = os.path.join(config_dir, 'configuration.yaml')
if not os.path.isfile(config_path):
print('Config does not exist:', config_path)
return 1
print(color('bold', "Testing configuration at", config_dir))
domain_info = []
if args.info:
domain_info = args.info.split(',')
res = check(config_path)
if args.files:
print(color(C_HEAD, 'yaml files'), '(used /',
color('red', 'not used')+')')
# Python 3.5 gets a recursive, but not in 3.4
for yfn in sorted(glob(os.path.join(config_dir, '*.yaml')) +
glob(os.path.join(config_dir, '*/*.yaml'))):
the_color = '' if yfn in res['yaml_files'] else 'red'
print(color(the_color, '-', yfn))
if len(res['except']) > 0:
print(color('bold_white', 'Failed config'))
for domain, config in res['except'].items():
domain_info.append(domain)
print(' ', color('bold_red', domain + ':'),
color('red', '', reset='red'))
dump_dict(config, reset='red', indent_count=3)
print(color('reset'))
if domain_info:
if 'all' in domain_info:
print(color('bold_white', 'Successful config (all)'))
for domain, config in res['components']:
print(color(C_HEAD, domain + ':'))
dump_dict(config, indent_count=3)
else:
print(color('bold_white', 'Successful config (partial)'))
for domain in domain_info:
if domain == ERROR_STR:
continue
print(' ', color(C_HEAD, domain + ':'))
dump_dict(res['components'].get(domain, None), indent_count=3)
if args.secrets:
flatsecret = {}
for sfn, sdict in res['secret_cache'].items():
sss = []
for skey, sval in sdict.items():
if skey in flatsecret:
_LOGGER.error('Duplicated secrets in files %s and %s',
flatsecret[skey], sfn)
flatsecret[skey] = sfn
sss.append(color('green', skey) if skey in res['secrets']
else skey)
print(color(C_HEAD, 'Secrets from', sfn + ':'), ', '.join(sss))
print(color(C_HEAD, 'Used Secrets:'))
for skey, sval in res['secrets'].items():
print(' -', skey + ':', sval, color('cyan', '[from:', flatsecret
.get(skey, 'keyring') + ']'))
return 0
def check(config_path):
"""Perform a check by mocking hass load functions."""
res = {
'yaml_files': {}, # yaml_files loaded
'secrets': {}, # secret cache and secrets loaded
'except': {}, # exceptions raised (with config)
'components': {}, # successful components
'secret_cache': {},
}
def mock_load(filename): # pylint: disable=unused-variable
"""Mock hass.util.load_yaml to save config files."""
res['yaml_files'][filename] = True
return MOCKS['load'][1](filename)
def mock_get(comp_name): # pylint: disable=unused-variable
"""Mock hass.loader.get_component to replace setup & setup_platform."""
def mock_setup(*kwargs):
"""Mock setup, only record the component name & config."""
assert comp_name not in res['components'], \
"Components should contain a list of platforms"
res['components'][comp_name] = kwargs[1].get(comp_name)
return True
module = MOCKS['get'][1](comp_name)
if module is None:
# Ensure list
res['except'][ERROR_STR] = res['except'].get(ERROR_STR, [])
res['except'][ERROR_STR].append('{} not found: {}'.format(
'Platform' if '.' in comp_name else 'Component', comp_name))
return None
# Test if platform/component and overwrite setup
if '.' in comp_name:
module.setup_platform = mock_setup
else:
module.setup = mock_setup
return module
def mock_secrets(ldr, node): # pylint: disable=unused-variable
"""Mock _get_secrets."""
try:
val = MOCKS['secrets'][1](ldr, node)
except HomeAssistantError:
val = None
res['secrets'][node.value] = val
return val
def mock_except(ex, domain, config): # pylint: disable=unused-variable
"""Mock bootstrap.log_exception."""
MOCKS['except'][1](ex, domain, config)
res['except'][domain] = config.get(domain, config)
# Patches to skip functions
for sil in SILENCE:
PATCHES[sil] = patch(sil)
# Patches with local mock functions
for key, val in MOCKS.items():
mock_function = locals()['mock_'+key]
PATCHES[key] = patch(val[0], side_effect=mock_function)
# Start all patches
for pat in PATCHES.values():
pat.start()
# Ensure !secrets point to the patched function
yaml.yaml.SafeLoader.add_constructor('!secret', yaml._secret_yaml)
try:
bootstrap.from_config_file(config_path, skip_pip=True)
print(dir(yaml))
res['secret_cache'] = yaml.__SECRET_CACHE
return res
finally:
# Stop all patches
for pat in PATCHES.values():
pat.stop()
# Ensure !secrets point to the original function
yaml.yaml.SafeLoader.add_constructor('!secret', yaml._secret_yaml)
def dump_dict(layer, indent_count=1, listi=False, **kwargs):
"""Display a dict.
A friendly version of print yaml.yaml.dump(config).
"""
def line_src(this):
"""Display line config source."""
if hasattr(this, '__config_file__'):
return color('cyan', "[source {}:{}]"
.format(this.__config_file__, this.__line__ or '?'),
**kwargs)
return ''
indent_str = indent_count * ' '
if listi or isinstance(layer, list):
indent_str = indent_str[:-1]+'-'
if isinstance(layer, Dict):
for key, value in layer.items():
if isinstance(value, dict) or isinstance(value, list):
print(indent_str, key + ':', line_src(value))
dump_dict(value, indent_count+2)
else:
print(indent_str, key + ':', value)
indent_str = indent_count * ' '
if isinstance(layer, Sequence):
for i in layer:
if isinstance(i, dict):
dump_dict(i, indent_count, True)
else:
print(indent_str, i)

View File

@ -46,7 +46,7 @@ def load_yaml(fname: str) -> Union[List, Dict]:
def clear_secret_cache() -> None:
"""Clear the secrete cache."""
"""Clear the secret cache."""
__SECRET_CACHE.clear()
@ -150,10 +150,8 @@ def _env_var_yaml(loader: SafeLineLoader,
def _load_secret_yaml(secret_path: str) -> Dict:
"""Load the secrets yaml from path."""
_LOGGER.debug('Loading %s', os.path.join(secret_path, _SECRET_YAML))
secrets = {}
if os.path.isfile(os.path.join(secret_path, _SECRET_YAML)):
secrets = load_yaml(
os.path.join(secret_path, _SECRET_YAML))
try:
secrets = load_yaml(os.path.join(secret_path, _SECRET_YAML))
if 'logger' in secrets:
logger = str(secrets['logger']).lower()
if logger == 'debug':
@ -162,7 +160,9 @@ def _load_secret_yaml(secret_path: str) -> Dict:
_LOGGER.error("secrets.yaml: 'logger: debug' expected,"
" but 'logger: %s' found", logger)
del secrets['logger']
return secrets
return secrets
except FileNotFoundError:
return {}
# pylint: disable=protected-access
@ -170,24 +170,23 @@ def _secret_yaml(loader: SafeLineLoader,
node: yaml.nodes.Node):
"""Load secrets and embed it into the configuration YAML."""
secret_path = os.path.dirname(loader.name)
while os.path.exists(secret_path):
while True:
secrets = __SECRET_CACHE.get(secret_path,
_load_secret_yaml(secret_path))
if node.value in secrets:
_LOGGER.debug('Secret %s retrieved from secrets.yaml in '
'folder %s', node.value, secret_path)
return secrets[node.value]
next_path = os.path.dirname(secret_path)
if not next_path or next_path == secret_path \
or secret_path == os.path.dirname(sys.path[0]):
# Somehow we got past the .homeassistant configuration folder...
break
if secret_path == os.path.dirname(sys.path[0]):
break # sys.path[0] set to config/deps folder by bootstrap
secret_path = next_path
secret_path = os.path.dirname(secret_path)
if not os.path.exists(secret_path) or len(secret_path) < 5:
break # Somehow we got past the .homeassistant config folder
if keyring:
# do ome keyring stuff
# do some keyring stuff
pwd = keyring.get_password(_SECRET_NAMESPACE, node.value)
if pwd:
_LOGGER.debug('Secret %s retrieved from keyring.', node.value)

View File

@ -52,6 +52,9 @@ boto3==1.3.1
# homeassistant.components.http
cherrypy==7.1.0
# homeassistant.scripts.check_config
colorlog>2.1<3
# homeassistant.components.media_player.directv
directpy==0.1
@ -205,6 +208,9 @@ insteon_hub==0.4.5
# homeassistant.components.media_player.kodi
jsonrpc-requests==0.3
# homeassistant.scripts.keyring
keyring>=9.3,<10.0
# homeassistant.components.knx
knxip==0.3.3
@ -432,6 +438,7 @@ somecomfort==0.2.1
speedtest-cli==0.3.4
# homeassistant.components.recorder
# homeassistant.scripts.db_migrator
sqlalchemy==1.0.14
# homeassistant.components.http

View File

@ -31,7 +31,7 @@ def explore_module(package, explore_children):
if not hasattr(module, '__path__'):
return found
for _, name, ispkg in pkgutil.iter_modules(module.__path__, package + '.'):
for _, name, _ in pkgutil.iter_modules(module.__path__, package + '.'):
found.append(name)
if explore_children:
@ -60,7 +60,8 @@ def gather_modules():
errors = []
output = []
for package in sorted(explore_module('homeassistant.components', True)):
for package in sorted(explore_module('homeassistant.components', True) +
explore_module('homeassistant.scripts', True)):
try:
module = importlib.import_module(package)
except ImportError:

View File

@ -2,12 +2,16 @@
import os
from datetime import timedelta
from unittest import mock
from unittest.mock import patch
from io import StringIO
import logging
from homeassistant import core as ha, loader
from homeassistant.bootstrap import _setup_component
from homeassistant.helpers.entity import ToggleEntity
from homeassistant.util.unit_system import METRIC_SYSTEM
import homeassistant.util.dt as date_util
import homeassistant.util.yaml as yaml
from homeassistant.const import (
STATE_ON, STATE_OFF, DEVICE_DEFAULT_NAME, EVENT_TIME_CHANGED,
EVENT_STATE_CHANGED, EVENT_PLATFORM_DISCOVERED, ATTR_SERVICE,
@ -15,11 +19,12 @@ from homeassistant.const import (
from homeassistant.components import sun, mqtt
_TEST_INSTANCE_PORT = SERVER_PORT
_LOGGER = logging.getLogger(__name__)
def get_test_config_dir():
def get_test_config_dir(*add_path):
"""Return a path to a test config dir."""
return os.path.join(os.path.dirname(__file__), "testing_config")
return os.path.join(os.path.dirname(__file__), "testing_config", *add_path)
def get_test_home_assistant(num_threads=None):
@ -65,8 +70,7 @@ def mock_service(hass, domain, service):
"""
calls = []
hass.services.register(
domain, service, lambda call: calls.append(call))
hass.services.register(domain, service, calls.append)
return calls
@ -110,8 +114,8 @@ def ensure_sun_set(hass):
def load_fixture(filename):
"""Helper to load a fixture."""
path = os.path.join(os.path.dirname(__file__), 'fixtures', filename)
with open(path) as fp:
return fp.read()
with open(path) as fptr:
return fptr.read()
def mock_state_change_event(hass, new_state, old_state=None):
@ -147,6 +151,7 @@ def mock_mqtt_component(hass, mock_mqtt):
class MockModule(object):
"""Representation of a fake module."""
# pylint: disable=invalid-name,too-few-public-methods,too-many-arguments
def __init__(self, domain=None, dependencies=None, setup=None,
requirements=None, config_schema=None, platform_schema=None):
"""Initialize the mock module."""
@ -170,6 +175,7 @@ class MockModule(object):
class MockPlatform(object):
"""Provide a fake platform."""
# pylint: disable=invalid-name,too-few-public-methods
def __init__(self, setup_platform=None, dependencies=None,
platform_schema=None):
"""Initialize the platform."""
@ -234,3 +240,33 @@ class MockToggleDevice(ToggleEntity):
if call[0] == method)
except StopIteration:
return None
def patch_yaml_files(files_dict, endswith=True):
"""Patch load_yaml with a dictionary of yaml files."""
# match using endswith, start search with longest string
matchlist = sorted(list(files_dict.keys()), key=len) if endswith else []
# matchlist.sort(key=len)
def mock_open_f(fname, **_):
"""Mock open() in the yaml module, used by load_yaml."""
# Return the mocked file on full match
if fname in files_dict:
_LOGGER.debug('patch_yaml_files match %s', fname)
return StringIO(files_dict[fname])
# Match using endswith
for ends in matchlist:
if fname.endswith(ends):
_LOGGER.debug('patch_yaml_files end match %s: %s', ends, fname)
return StringIO(files_dict[ends])
# Fallback for hass.components (i.e. services.yaml)
if 'homeassistant/components' in fname:
_LOGGER.debug('patch_yaml_files using real file: %s', fname)
return open(fname, encoding='utf-8')
# Not found
raise IOError('File not found: {}'.format(fname))
return patch.object(yaml, 'open', mock_open_f, create=True)

View File

@ -0,0 +1,126 @@
"""Test check_config script."""
import unittest
import logging
import os
import homeassistant.scripts.check_config as check_config
from tests.common import patch_yaml_files, get_test_config_dir
_LOGGER = logging.getLogger(__name__)
BASE_CONFIG = (
'homeassistant:\n'
' name: Home\n'
' latitude: -26.107361\n'
' longitude: 28.054500\n'
' elevation: 1600\n'
' unit_system: metric\n'
' time_zone: GMT\n'
'\n\n'
)
def tearDownModule(self): # pylint: disable=invalid-name
"""Clean files."""
# .HA_VERSION created during bootstrap's config update
path = get_test_config_dir('.HA_VERSION')
if os.path.isfile(path):
os.remove(path)
class TestCheckConfig(unittest.TestCase):
"""Tests for the homeassistant.scripts.check_config module."""
# pylint: disable=no-self-use,invalid-name
def test_config_platform_valid(self):
"""Test a valid platform setup."""
files = {
'light.yaml': BASE_CONFIG + 'light:\n platform: hue',
}
with patch_yaml_files(files):
res = check_config.check(get_test_config_dir('light.yaml'))
self.assertDictEqual({
'components': {'light': [{'platform': 'hue'}]},
'except': {},
'secret_cache': {},
'secrets': {},
'yaml_files': {}
}, res)
def test_config_component_platform_fail_validation(self):
"""Test errors if component & platform not found."""
files = {
'component.yaml': BASE_CONFIG + 'http:\n password: err123',
}
with patch_yaml_files(files):
res = check_config.check(get_test_config_dir('component.yaml'))
self.assertDictEqual({
'components': {},
'except': {'http': {'password': 'err123'}},
'secret_cache': {},
'secrets': {},
'yaml_files': {}
}, res)
files = {
'platform.yaml': (BASE_CONFIG + 'mqtt:\n\n'
'light:\n platform: mqtt_json'),
}
with patch_yaml_files(files):
res = check_config.check(get_test_config_dir('platform.yaml'))
self.assertDictEqual({
'components': {'mqtt': {'keepalive': 60, 'port': 1883,
'protocol': '3.1.1'}},
'except': {'light.mqtt_json': {'platform': 'mqtt_json'}},
'secret_cache': {},
'secrets': {},
'yaml_files': {}
}, res)
def test_component_platform_not_found(self):
"""Test errors if component or platform not found."""
files = {
'badcomponent.yaml': BASE_CONFIG + 'beer:',
'badplatform.yaml': BASE_CONFIG + 'light:\n platform: beer',
}
with patch_yaml_files(files):
res = check_config.check(get_test_config_dir('badcomponent.yaml'))
self.assertDictEqual({
'components': {},
'except': {check_config.ERROR_STR:
['Component not found: beer']},
'secret_cache': {},
'secrets': {},
'yaml_files': {}
}, res)
res = check_config.check(get_test_config_dir('badplatform.yaml'))
self.assertDictEqual({
'components': {},
'except': {check_config.ERROR_STR:
['Platform not found: light.beer']},
'secret_cache': {},
'secrets': {},
'yaml_files': {}
}, res)
def test_secrets(self):
"""Test secrets config checking method."""
files = {
'secret.yaml': (BASE_CONFIG +
'http:\n'
' api_password: !secret http_pw'),
'secrets.yaml': ('logger: debug\n'
'http_pw: abc123'),
}
with patch_yaml_files(files):
res = check_config.check(get_test_config_dir('secret.yaml'))
self.assertDictEqual({
'components': {'http': {'api_password': 'abc123',
'server_port': 8123}},
'except': {},
'secret_cache': {},
'secrets': {'http_pw': 'abc123'},
'yaml_files': {'secrets.yaml': True}
}, res)