core/homeassistant/scripts/check_config.py

320 lines
12 KiB
Python

"""Script to ensure a configuration file exists."""
import argparse
import logging
import os
from collections import OrderedDict
from glob import glob
from platform import system
from unittest.mock import patch
from typing import Dict, List, Sequence
from homeassistant.core import callback
from homeassistant import bootstrap, loader, setup, config as config_util
import homeassistant.util.yaml as yaml
from homeassistant.exceptions import HomeAssistantError
REQUIREMENTS = ('colorlog==3.1.2',)
if system() == 'Windows': # Ensure colorama installed for colorlog on Windows
REQUIREMENTS += ('colorama<=1',)
_LOGGER = logging.getLogger(__name__)
# pylint: disable=protected-access
MOCKS = {
'load': ("homeassistant.util.yaml.load_yaml", yaml.load_yaml),
'load*': ("homeassistant.config.load_yaml", yaml.load_yaml),
'get': ("homeassistant.loader.get_component", loader.get_component),
'secrets': ("homeassistant.util.yaml._secret_yaml", yaml._secret_yaml),
'except': ("homeassistant.config.async_log_exception",
config_util.async_log_exception),
'package_error': ("homeassistant.config._log_pkg_error",
config_util._log_pkg_error),
'logger_exception': ("homeassistant.setup._LOGGER.error",
setup._LOGGER.error),
'logger_exception_bootstrap': ("homeassistant.bootstrap._LOGGER.error",
bootstrap._LOGGER.error),
}
SILENCE = (
'homeassistant.bootstrap.async_enable_logging', # callback
'homeassistant.bootstrap.clear_secret_cache',
'homeassistant.bootstrap.async_register_signal_handling', # callback
'homeassistant.config.process_ha_config_upgrade',
)
PATCHES = {}
C_HEAD = 'bold'
ERROR_STR = 'General Errors'
@callback
def mock_cb(*args):
"""Callback that returns None."""
return None
def color(the_color, *args, reset=None):
"""Color helper."""
from colorlog.escape_codes import escape_codes, parse_colors
try:
if not args:
assert reset is None, "You cannot reset if nothing being printed"
return parse_colors(the_color)
return parse_colors(the_color) + ' '.join(args) + \
escape_codes[reset or 'reset']
except KeyError as k:
raise ValueError("Invalid color {} in {}".format(str(k), the_color))
def run(script_args: List) -> int:
"""Handle ensure config commandline script."""
parser = argparse.ArgumentParser(
description=("Check Home Assistant configuration."))
parser.add_argument(
'--script', choices=['check_config'])
parser.add_argument(
'-c', '--config',
default=config_util.get_default_config_dir(),
help="Directory that contains the Home Assistant configuration")
parser.add_argument(
'-i', '--info',
default=None,
help="Show a portion of the config")
parser.add_argument(
'-f', '--files',
action='store_true',
help="Show used configuration files")
parser.add_argument(
'-s', '--secrets',
action='store_true',
help="Show secret information")
args = parser.parse_args()
config_dir = os.path.join(os.getcwd(), args.config)
config_path = os.path.join(config_dir, 'configuration.yaml')
if not os.path.isfile(config_path):
print('Config does not exist:', config_path)
return 1
print(color('bold', "Testing configuration at", config_dir))
domain_info = []
if args.info:
domain_info = args.info.split(',')
res = check(config_path)
if args.files:
print(color(C_HEAD, 'yaml files'), '(used /',
color('red', 'not used') + ')')
# Python 3.5 gets a recursive, but not in 3.4
for yfn in sorted(glob(os.path.join(config_dir, '*.yaml')) +
glob(os.path.join(config_dir, '*/*.yaml'))):
the_color = '' if yfn in res['yaml_files'] else 'red'
print(color(the_color, '-', yfn))
if res['except']:
print(color('bold_white', 'Failed config'))
for domain, config in res['except'].items():
domain_info.append(domain)
print(' ', color('bold_red', domain + ':'),
color('red', '', reset='red'))
dump_dict(config, reset='red')
print(color('reset'))
if domain_info:
if 'all' in domain_info:
print(color('bold_white', 'Successful config (all)'))
for domain, config in res['components'].items():
print(' ', color(C_HEAD, domain + ':'))
dump_dict(config)
else:
print(color('bold_white', 'Successful config (partial)'))
for domain in domain_info:
if domain == ERROR_STR:
continue
print(' ', color(C_HEAD, domain + ':'))
dump_dict(res['components'].get(domain, None))
if args.secrets:
flatsecret = {}
for sfn, sdict in res['secret_cache'].items():
sss = []
for skey in sdict:
if skey in flatsecret:
_LOGGER.error('Duplicated secrets in files %s and %s',
flatsecret[skey], sfn)
flatsecret[skey] = sfn
sss.append(color('green', skey) if skey in res['secrets']
else skey)
print(color(C_HEAD, 'Secrets from', sfn + ':'), ', '.join(sss))
print(color(C_HEAD, 'Used Secrets:'))
for skey, sval in res['secrets'].items():
print(' -', skey + ':', sval, color('cyan', '[from:', flatsecret
.get(skey, 'keyring') + ']'))
return len(res['except'])
def check(config_path):
"""Perform a check by mocking hass load functions."""
logging.getLogger('homeassistant.core').setLevel(logging.WARNING)
logging.getLogger('homeassistant.loader').setLevel(logging.WARNING)
logging.getLogger('homeassistant.setup').setLevel(logging.WARNING)
logging.getLogger('homeassistant.bootstrap').setLevel(logging.ERROR)
logging.getLogger('homeassistant.util.yaml').setLevel(logging.INFO)
res = {
'yaml_files': OrderedDict(), # yaml_files loaded
'secrets': OrderedDict(), # secret cache and secrets loaded
'except': OrderedDict(), # exceptions raised (with config)
'components': OrderedDict(), # successful components
'secret_cache': OrderedDict(),
}
# pylint: disable=unused-variable
def mock_load(filename):
"""Mock hass.util.load_yaml to save config files."""
res['yaml_files'][filename] = True
return MOCKS['load'][1](filename)
# pylint: disable=unused-variable
def mock_get(comp_name):
"""Mock hass.loader.get_component to replace setup & setup_platform."""
async def mock_async_setup(*args):
"""Mock setup, only record the component name & config."""
assert comp_name not in res['components'], \
"Components should contain a list of platforms"
res['components'][comp_name] = args[1].get(comp_name)
return True
module = MOCKS['get'][1](comp_name)
if module is None:
# Ensure list
msg = '{} not found: {}'.format(
'Platform' if '.' in comp_name else 'Component', comp_name)
res['except'].setdefault(ERROR_STR, []).append(msg)
return None
# Test if platform/component and overwrite setup
if '.' in comp_name:
module.async_setup_platform = mock_async_setup
if hasattr(module, 'setup_platform'):
del module.setup_platform
else:
module.async_setup = mock_async_setup
if hasattr(module, 'setup'):
del module.setup
return module
# pylint: disable=unused-variable
def mock_secrets(ldr, node):
"""Mock _get_secrets."""
try:
val = MOCKS['secrets'][1](ldr, node)
except HomeAssistantError:
val = None
res['secrets'][node.value] = val
return val
def mock_except(ex, domain, config, # pylint: disable=unused-variable
hass=None):
"""Mock config.log_exception."""
MOCKS['except'][1](ex, domain, config, hass)
res['except'][domain] = config.get(domain, config)
def mock_package_error( # pylint: disable=unused-variable
package, component, config, message):
"""Mock config_util._log_pkg_error."""
MOCKS['package_error'][1](package, component, config, message)
pkg_key = 'homeassistant.packages.{}'.format(package)
res['except'][pkg_key] = config.get('homeassistant', {}) \
.get('packages', {}).get(package)
def mock_logger_exception(msg, *params):
"""Log logger.exceptions."""
res['except'].setdefault(ERROR_STR, []).append(msg % params)
MOCKS['logger_exception'][1](msg, *params)
def mock_logger_exception_bootstrap(msg, *params):
"""Log logger.exceptions."""
res['except'].setdefault(ERROR_STR, []).append(msg % params)
MOCKS['logger_exception_bootstrap'][1](msg, *params)
# Patches to skip functions
for sil in SILENCE:
PATCHES[sil] = patch(sil, return_value=mock_cb())
# Patches with local mock functions
for key, val in MOCKS.items():
# The * in the key is removed to find the mock_function (side_effect)
# This allows us to use one side_effect to patch multiple locations
mock_function = locals()['mock_' + key.replace('*', '')]
PATCHES[key] = patch(val[0], side_effect=mock_function)
# Start all patches
for pat in PATCHES.values():
pat.start()
# Ensure !secrets point to the patched function
yaml.yaml.SafeLoader.add_constructor('!secret', yaml._secret_yaml)
try:
with patch('homeassistant.util.logging.AsyncHandler._process'):
bootstrap.from_config_file(config_path, skip_pip=True)
res['secret_cache'] = dict(yaml.__SECRET_CACHE)
except Exception as err: # pylint: disable=broad-except
print(color('red', 'Fatal error while loading config:'), str(err))
res['except'].setdefault(ERROR_STR, []).append(err)
finally:
# Stop all patches
for pat in PATCHES.values():
pat.stop()
# Ensure !secrets point to the original function
yaml.yaml.SafeLoader.add_constructor('!secret', yaml._secret_yaml)
bootstrap.clear_secret_cache()
return res
def line_info(obj, **kwargs):
"""Display line config source."""
if hasattr(obj, '__config_file__'):
return color('cyan', "[source {}:{}]"
.format(obj.__config_file__, obj.__line__ or '?'),
**kwargs)
return '?'
def dump_dict(layer, indent_count=3, listi=False, **kwargs):
"""Display a dict.
A friendly version of print yaml.yaml.dump(config).
"""
def sort_dict_key(val):
"""Return the dict key for sorting."""
key = str.lower(val[0])
return '0' if key == 'platform' else key
indent_str = indent_count * ' '
if listi or isinstance(layer, list):
indent_str = indent_str[:-1] + '-'
if isinstance(layer, Dict):
for key, value in sorted(layer.items(), key=sort_dict_key):
if isinstance(value, (dict, list)):
print(indent_str, key + ':', line_info(value, **kwargs))
dump_dict(value, indent_count + 2)
else:
print(indent_str, key + ':', value)
indent_str = indent_count * ' '
if isinstance(layer, Sequence):
for i in layer:
if isinstance(i, dict):
dump_dict(i, indent_count + 2, True)
else:
print(' ', indent_str, i)