1
0
mirror of https://github.com/esphome/esphome.git synced 2025-03-15 15:18:16 +00:00

Better invalid config messages

This commit is contained in:
Otto Winter 2018-12-04 20:06:32 +01:00
parent aaed9a878d
commit f9c62a660e
No known key found for this signature in database
GPG Key ID: DB66C0BE6013F97E
2 changed files with 245 additions and 142 deletions

View File

@ -125,10 +125,10 @@ def do_substitution_pass(config):
substitutions[new] = substitutions[old]
del substitutions[old]
except vol.Invalid as err:
from esphomeyaml.config import _format_config_error
from esphomeyaml.config import _format_vol_invalid
err.path.append(key)
raise EsphomeyamlError(_format_config_error(err, CONF_SUBSTITUTIONS, substitutions))
raise EsphomeyamlError(u"Error while parsing substitutions: {}".format(err))
config[CONF_SUBSTITUTIONS] = substitutions
_substitute_item(substitutions, config, [])

View File

@ -4,15 +4,16 @@ from collections import OrderedDict
import importlib
import json
import logging
import re
from typing import List, Optional, Tuple
from typing import List, Optional, Tuple, Set, Union, Any, Dict
import voluptuous as vol
from esphomeyaml import core, core_config, yaml_util
from esphomeyaml.components import substitutions
from esphomeyaml.const import CONF_ESPHOMEYAML, CONF_PLATFORM, CONF_WIFI, ESP_PLATFORMS
from esphomeyaml.const import CONF_ESPHOMEYAML, CONF_PLATFORM, ESP_PLATFORMS
from esphomeyaml.core import CORE, EsphomeyamlError, ConfigType
from esphomeyaml.helpers import color
from esphomeyaml.helpers import color, indent
from esphomeyaml.util import safe_print
_LOGGER = logging.getLogger(__name__)
@ -59,68 +60,120 @@ def iter_components(config):
yield p_name, platform, p_config
ConfigPath = List[Union[basestring, int]]
def _path_begins_with_(path, other): # type: (ConfigPath, ConfigPath) -> bool
if len(path) < len(other):
return False
return path[:len(other)] == other
def _path_begins_with(path, other): # type: (ConfigPath, ConfigPath) -> bool
ret = _path_begins_with_(path, other)
# print('_path_begins_with({}, {}) -> {}'.format(path, other, ret))
return ret
class Config(OrderedDict):
def __init__(self):
super(Config, self).__init__()
self.errors = []
self.errors = [] # type: List[Tuple[basestring, ConfigPath]]
self.domains = [] # type: List[Tuple[ConfigPath, basestring]]
def add_error(self, message, domain=None, config=None, path=None):
# type: (basestring, Optional[basestring], Optional[ConfigType], Optional[List[basestring]]) -> None
def add_error(self, message, path):
# type: (basestring, ConfigPath) -> None
if not isinstance(message, unicode):
message = unicode(message)
self.errors.append((message, domain, config, path))
self.errors.append((message, path))
def add_domain(self, path, name):
# type: (ConfigPath, basestring) -> None
self.domains.append((path, name))
def lookup_domain(self, path):
# type: (ConfigPath) -> Optional[basestring]
best_len = 0
best_domain = None
for d_path, domain in self.domains:
if len(d_path) < best_len:
continue
if _path_begins_with(path, d_path):
best_len = len(d_path)
best_domain = domain
return best_domain
def is_in_error_path(self, path):
for _, p in self.errors:
if _path_begins_with(p, path):
return True
return False
def get_error_for_path(self, path):
for msg, p in self.errors:
if p == path:
return msg
return None
def nested_item(self, path):
data = self
for item_index in path:
try:
data = data[item_index]
except (KeyError, IndexError, TypeError):
return {}
return data
def iter_ids(config, prefix=None, parent=None):
prefix = prefix or []
parent = parent or {}
def iter_ids(config, path=None):
path = path or []
if isinstance(config, core.ID):
yield config, prefix, parent
yield config, path
elif isinstance(config, core.Lambda):
for id in config.requires_ids:
yield id, prefix, parent
yield id, path
elif isinstance(config, list):
for i, item in enumerate(config):
for result in iter_ids(item, prefix + [str(i)], config):
for result in iter_ids(item, path + [i]):
yield result
elif isinstance(config, dict):
for key, value in config.iteritems():
for result in iter_ids(value, prefix + [str(key)], config):
for result in iter_ids(value, path + [key]):
yield result
def do_id_pass(result):
def do_id_pass(result): # type: (Config) -> None
from esphomeyaml.cpp_generator import MockObjClass
declare_ids = []
searching_ids = []
for id, prefix, config in iter_ids(result):
declare_ids = [] # type: List[Tuple[core.ID, ConfigPath]]
searching_ids = [] # type: List[Tuple[core.ID, ConfigPath]]
for id, path in iter_ids(result):
if id.is_declaration:
if id.id is not None and any(v[0].id == id.id for v in declare_ids):
result.add_error("ID {} redefined!".format(id.id), '.'.join(prefix), config)
result.add_error(u"ID {} redefined!".format(id.id), path)
continue
declare_ids.append((id, prefix, config))
declare_ids.append((id, path))
else:
searching_ids.append((id, prefix, config))
searching_ids.append((id, path))
# Resolve default ids after manual IDs
for id, _, _ in declare_ids:
for id, _ in declare_ids:
id.resolve([v[0].id for v in declare_ids])
# Check searched IDs
for id, prefix, config in searching_ids:
for id, path in searching_ids:
if id.id is not None:
# manually declared
match = next((v[0] for v in declare_ids if v[0].id == id.id), None)
if match is None:
# No declared ID with this name
result.add_error("Couldn't find ID {}".format(id.id), '.'.join(prefix), config)
result.add_error("Couldn't find ID {}".format(id.id), path)
continue
if not isinstance(match.type, MockObjClass) or not isinstance(id.type, MockObjClass):
continue
if not match.type.inherits_from(id.type):
result.add_error("ID '{}' of type {} doesn't inherit from {}. Please double check "
"your ID is pointing to the correct value"
"".format(id.id, match.type, id.type))
"".format(id.id, match.type, id.type), path)
if id.id is None and id.type is not None:
for v in declare_ids:
@ -131,41 +184,52 @@ def do_id_pass(result):
id.id = v[0].id
break
else:
result.add_error("Couldn't resolve ID for type {}".format(id.type),
'.'.join(prefix), config)
result.add_error("Couldn't resolve ID for type {}".format(id.type), path)
def validate_config(config):
result = Config()
def _comp_error(ex, domain, config, path):
result.add_error(_format_config_error(ex, domain, config), domain, config, path)
def _comp_error(ex, path):
# type: (vol.Invalid, List[basestring]) -> None
if isinstance(ex, vol.MultipleInvalid):
errors = ex.errors
else:
errors = [ex]
skip_domains = set()
for e in errors:
path_ = path + e.path
domain = result.lookup_domain(path_) or ''
result.add_error(_format_vol_invalid(e, config, path, domain), path_)
skip_paths = list() # type: List[ConfigPath]
# Step 1: Load everything
result.add_domain([CONF_ESPHOMEYAML], CONF_ESPHOMEYAML)
for domain, conf in config.iteritems():
domain = str(domain)
if domain == CONF_ESPHOMEYAML or domain.startswith(u'.'):
skip_domains.add(domain)
skip_paths.append([domain])
continue
result.add_domain([domain], domain)
result[domain] = conf
if conf is None:
conf = {}
config[domain] = conf = {}
component = get_component(domain)
if component is None:
result.add_error(u"Component not found: {}".format(domain), domain, conf, None)
skip_domains.add(domain)
result.add_error(u"Component not found: {}".format(domain), [domain])
skip_paths.append([domain])
continue
success = True
dependencies = getattr(component, 'DEPENDENCIES', [])
for dependency in dependencies:
if dependency not in config:
result.add_error(u"Component {} requires component {}".format(domain, dependency),
domain, conf)
result.add_error(u"Component {} requires component {}".format(domain, dependency), [domain])
success = False
if not success:
skip_domains.add(domain)
skip_paths.append([domain])
continue
success = True
@ -173,48 +237,51 @@ def validate_config(config):
for conflict in conflicts_with:
if conflict not in config:
result.add_error(u"Component {} cannot be used together with component {}"
u"".format(domain, conflict),
domain, conf)
u"".format(domain, conflict), [domain])
success = False
if not success:
skip_domains.add(domain)
skip_paths.append([domain])
continue
esp_platforms = getattr(component, 'ESP_PLATFORMS', ESP_PLATFORMS)
if CORE.esp_platform not in esp_platforms:
result.add_error(u"Component {} doesn't support {}.".format(domain, CORE.esp_platform),
domain, conf)
skip_domains.add(domain)
result.add_error(u"Component {} doesn't support {}.".format(domain, CORE.esp_platform), [domain])
skip_paths.append([domain])
continue
if not hasattr(component, 'PLATFORM_SCHEMA'):
continue
if not isinstance(conf, list) and conf:
result[domain] = conf = [conf]
for i, p_config in enumerate(conf):
if not isinstance(p_config, dict):
result.add_error(u"Platform schemas must have 'platform:' key", None, p_config, [])
result.add_error(u"Platform schemas must have 'platform:' key", [domain, i])
skip_paths.append([domain, i])
continue
p_name = p_config.get(u'platform')
p_name = p_config.get('platform')
if p_name is None:
result.add_error(u"No platform specified for {}".format(domain))
result.add_error(u"No platform specified for {}".format(domain), [domain, i])
skip_paths.append([domain, i])
continue
p_domain = u'{}.{}'.format(domain, p_name)
result.add_domain([domain, i], p_domain)
platform = get_platform(domain, p_name)
if platform is None:
result.add_error(u"Platform not found: '{}'".format(p_domain), p_domain, p_config)
skip_domains.add(p_domain)
result.add_error(u"Platform not found: '{}'".format(p_domain), [domain, i])
skip_paths.append([domain, i])
continue
success = True
dependencies = getattr(platform, 'DEPENDENCIES', [])
for dependency in dependencies:
if dependency not in config:
result.add_error(
u"Platform {} requires component {}".format(p_domain, dependency),
p_domain, p_config)
result.add_error(u"Platform {} requires component {}".format(p_domain, dependency),
[domain, i])
success = False
if not success:
skip_domains.add(p_domain)
skip_paths.append([domain, i])
continue
success = True
@ -222,32 +289,29 @@ def validate_config(config):
for conflict in conflicts_with:
if conflict not in config:
result.add_error(u"Platform {} cannot be used together with component {}"
u"".format(p_domain, conflict),
domain, conf)
u"".format(p_domain, conflict), [domain, i])
success = False
if not success:
skip_domains.add(p_domain)
skip_paths.append([domain, i])
continue
esp_platforms = getattr(platform, 'ESP_PLATFORMS', ESP_PLATFORMS)
if CORE.esp_platform not in esp_platforms:
result.add_error(
u"Platform {} doesn't support {}.".format(p_domain, CORE.esp_platform),
p_domain, p_config)
skip_domains.add(p_domain)
result.add_error(u"Platform {} doesn't support {}.".format(p_domain, CORE.esp_platform),
[domain, i])
skip_paths.append([domain, i])
continue
# Step 2: Validate configuration
try:
result[CONF_ESPHOMEYAML] = config[CONF_ESPHOMEYAML]
result[CONF_ESPHOMEYAML] = core_config.CONFIG_SCHEMA(config[CONF_ESPHOMEYAML])
except vol.Invalid as ex:
_comp_error(ex, CONF_ESPHOMEYAML, config[CONF_ESPHOMEYAML])
_comp_error(ex, [CONF_ESPHOMEYAML])
for domain, conf in config.iteritems():
if conf is None:
conf = {}
domain = str(domain)
if domain in skip_domains:
if [domain] in skip_paths:
continue
component = get_component(domain)
@ -256,32 +320,25 @@ def validate_config(config):
validated = component.CONFIG_SCHEMA(conf)
result[domain] = validated
except vol.Invalid as ex:
_comp_error(ex, domain, conf)
_comp_error(ex, [domain])
continue
if not hasattr(component, 'PLATFORM_SCHEMA'):
continue
platforms = []
for p_config in conf:
if not isinstance(p_config, dict):
continue
p_name = p_config.get(u'platform')
if p_name is None:
continue
p_domain = u'{}.{}'.format(domain, p_name)
if p_domain in skip_domains:
for i, p_config in enumerate(conf):
if [domain, i] in skip_paths:
continue
p_name = p_config['platform']
platform = get_platform(domain, p_name)
if hasattr(platform, u'PLATFORM_SCHEMA'):
if hasattr(platform, 'PLATFORM_SCHEMA'):
try:
p_validated = platform.PLATFORM_SCHEMA(p_config)
except vol.Invalid as ex:
_comp_error(ex, p_domain, p_config)
_comp_error(ex, [domain, i])
continue
platforms.append(p_validated)
result[domain] = platforms
result[domain][i] = p_validated
do_id_pass(result)
return result
@ -296,43 +353,32 @@ def _nested_getitem(data, path):
return data
def _format_path(path):
return u'->'.join(unicode(m) for m in path)
def humanize_error(config, validation_error):
offending_item_summary = _nested_getitem(config, validation_error.path)
if isinstance(offending_item_summary, dict):
offending_item_summary = json.dumps(offending_item_summary)
return u'{}. Got {}'.format(validation_error, offending_item_summary)
try:
offending_item_summary = json.dumps(offending_item_summary)
except (TypeError, ValueError):
pass
validation_error = unicode(validation_error)
m = re.match(r'^(.*)\s*for dictionary value @.*$', validation_error)
if m is not None:
validation_error = m.group(1)
validation_error = validation_error.strip()
if not validation_error.endswith(u'.'):
validation_error += u'.'
return u"{} Got '{}'".format(validation_error, offending_item_summary)
def _format_config_error(ex, domain, config, recursion=False):
message = u"" if recursion else u"Invalid config for [{}]: ".format(domain)
if isinstance(ex, vol.MultipleInvalid):
return color('red', message + u'\n'.join(sorted(
_format_config_error(sub_error, domain, config, recursion=True)
for sub_error in ex.errors
)))
def _format_vol_invalid(ex, config, path, domain):
# type: (vol.Invalid, ConfigType, ConfigPath, basestring) -> unicode
message = u''
if u'extra keys not allowed' in ex.error_message:
message += u'[{}] is an invalid option for [{}].' \
.format(ex.path[-1], domain)
message += u'[{}] is an invalid option for [{}].'.format(ex.path[-1], domain)
elif u'required key not provided' in ex.error_message:
message += u"'{}' is a required option for [{}]." \
u"".format(ex.path[-1], domain)
message += u"'{}' is a required option for [{}].".format(ex.path[-1], domain)
else:
message += u'{}.'.format(humanize_error(config, ex))
message += u' Check {}->{}.'.format(domain, _format_path(ex.path))
message = color('red', message)
if isinstance(config, list):
return message
domain_config = config.get(domain, config)
message += color('cyan', u" (See {}, line {}). ".format(
getattr(domain_config, '__config_file__', '?'),
getattr(domain_config, '__line__', '?')))
message += u'{}.'.format(humanize_error(_nested_getitem(config, path), ex))
return message
@ -362,32 +408,92 @@ def line_info(obj):
if hasattr(obj, '__config_file__'):
return color('cyan', "[source {}:{}]"
.format(obj.__config_file__, obj.__line__ or '?'))
return '?'
return None
def dump_dict(layer, indent_count=0, listi=False):
def sort_dict_key(val):
"""Return the dict key for sorting."""
key = str.lower(val[0])
return '0' if key == 'platform' else key
def _print_on_next_line(obj):
if isinstance(obj, (list, tuple, dict)):
return True
if isinstance(obj, str):
return len(obj) > 80
if isinstance(obj, core.Lambda):
return len(obj.value) > 80
return False
indent_str = indent_count * ' '
if listi or isinstance(layer, list):
indent_str = indent_str[:-1] + '-'
if isinstance(layer, dict):
for key, value in sorted(layer.items(), key=sort_dict_key):
if isinstance(value, (dict, list)):
safe_print(u"{} {}: {}".format(indent_str, key, line_info(value)))
dump_dict(value, indent_count + 2)
else:
safe_print(u"{} {}: {}".format(indent_str, key, value))
indent_str = indent_count * ' '
if isinstance(layer, (list, tuple)):
for i in layer:
if isinstance(i, dict):
dump_dict(i, indent_count + 2, True)
else:
safe_print(u" {} {}".format(indent_str, i))
def dump_dict(config, path, at_root=True):
# type: (Config, ConfigPath, bool) -> Tuple[unicode, bool]
conf = config.nested_item(path)
ret = u''
multiline = False
if at_root:
error = config.get_error_for_path(path)
if error is not None:
ret += u'\n' + color('bold_red', error) + u'\n'
if isinstance(conf, (list, tuple)):
multiline = True
for i, obj in enumerate(conf):
path_ = path + [i]
error = config.get_error_for_path(path_)
if error is not None:
ret += u'\n' + color('bold_red', error) + u'\n'
sep = u'- '
if config.is_in_error_path(path_):
sep = color('red', sep)
msg, _ = dump_dict(config, path_, at_root=False)
msg = indent(msg)
inf = line_info(config.nested_item(path_))
if inf is not None:
msg = inf + u'\n' + msg
elif msg:
msg = msg[2:]
ret += sep + msg + u'\n'
elif isinstance(conf, dict):
multiline = True
for k, v in conf.iteritems():
path_ = path + [k]
error = config.get_error_for_path(path_)
if error is not None:
ret += u'\n' + color('bold_red', error) + u'\n'
st = u'{}: '.format(k)
if config.is_in_error_path(path_):
st = color('red', st)
msg, m = dump_dict(config, path_, at_root=False)
inf = line_info(config.nested_item(path_))
if m:
msg = u'\n' + indent(msg)
if inf is not None:
if m:
msg = u' ' + inf + msg
else:
msg = msg + u' ' + inf
ret += st + msg + u'\n'
elif isinstance(conf, str):
if len(conf) > 80:
ret += u'|-\n'
conf = indent(conf)
error = config.get_error_for_path(path)
col = 'bold_red' if error else 'white'
ret += color(col, unicode(conf))
elif isinstance(conf, core.Lambda):
ret += u'|-\n'
conf = indent(unicode(conf.value))
error = config.get_error_for_path(path)
col = 'bold_red' if error else 'white'
ret += color(col, conf)
else:
error = config.get_error_for_path(path)
col = 'bold_red' if error else 'white'
ret += color(col, unicode(conf))
multiline = u'\n' in ret
return ret, multiline
def read_config():
@ -397,17 +503,14 @@ def read_config():
except EsphomeyamlError as err:
_LOGGER.error(u"Error while reading config: %s", err)
return None
excepts = {}
for message, domain, config in res.errors:
domain = domain or u"General Error"
excepts.setdefault(domain, []).append(message)
if config is not None:
excepts[domain].append(config)
if res.errors:
safe_print(color('bold_red', u"Failed config"))
safe_print('')
for path, domain in res.domains:
if not res.is_in_error_path(path):
continue
if excepts:
safe_print(color('bold_white', u"Failed config"))
for domain, config in excepts.iteritems():
safe_print(color('bold_red', domain + u':'))
dump_dict(config)
safe_print(color('bold_red', u'{}:'.format(domain)) + u' ' + (line_info(res.nested_item(path)) or u''))
safe_print(indent(dump_dict(res, path)[0]))
return None
return OrderedDict(res)