mirror of
https://github.com/esphome/esphome.git
synced 2025-09-28 08:02:23 +01:00
Cleanup dashboard JS (#491)
* Cleanup dashboard JS * Add vscode * Save start_mark/end_mark * Updates * Updates * Remove need for cv.nameable It's a bit hacky but removes so much bloat from integrations * Add enum helper * Document APIs, and Improvements * Fixes * Fixes * Update PULL_REQUEST_TEMPLATE.md * Updates * Updates * Updates
This commit is contained in:
@@ -1,17 +1,18 @@
|
||||
from __future__ import print_function
|
||||
|
||||
import codecs
|
||||
import fnmatch
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
import uuid
|
||||
|
||||
import uuid
|
||||
import yaml
|
||||
import yaml.constructor
|
||||
|
||||
from esphome import core
|
||||
from esphome.core import EsphomeError, HexInt, IPAddress, Lambda, MACAddress, TimePeriod
|
||||
from esphome.py_compat import string_types, text_type, IS_PY2
|
||||
from esphome.config_helpers import read_config_file
|
||||
from esphome.core import EsphomeError, IPAddress, Lambda, MACAddress, TimePeriod, DocumentRange
|
||||
from esphome.py_compat import text_type, IS_PY2
|
||||
from esphome.util import OrderedDict
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@@ -25,22 +26,299 @@ _SECRET_VALUES = {}
|
||||
|
||||
|
||||
class NodeListClass(list):
|
||||
"""Wrapper class to be able to add attributes on a list."""
|
||||
pass
|
||||
|
||||
|
||||
class NodeStrClass(text_type):
|
||||
"""Wrapper class to be able to add attributes on a string."""
|
||||
pass
|
||||
|
||||
|
||||
class SafeLineLoader(yaml.SafeLoader): # pylint: disable=too-many-ancestors
|
||||
class ESPHomeDataBase(object):
|
||||
@property
|
||||
def esp_range(self):
|
||||
return getattr(self, '_esp_range', None)
|
||||
|
||||
def from_node(self, node):
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
self._esp_range = DocumentRange.from_marks(node.start_mark, node.end_mark)
|
||||
|
||||
|
||||
class ESPInt(int, ESPHomeDataBase):
|
||||
pass
|
||||
|
||||
|
||||
class ESPFloat(float, ESPHomeDataBase):
|
||||
pass
|
||||
|
||||
|
||||
class ESPStr(str, ESPHomeDataBase):
|
||||
pass
|
||||
|
||||
|
||||
class ESPDict(OrderedDict, ESPHomeDataBase):
|
||||
pass
|
||||
|
||||
|
||||
class ESPList(list, ESPHomeDataBase):
|
||||
pass
|
||||
|
||||
|
||||
class ESPLambda(Lambda, ESPHomeDataBase):
|
||||
pass
|
||||
|
||||
|
||||
ESP_TYPES = {
|
||||
int: ESPInt,
|
||||
float: ESPFloat,
|
||||
str: ESPStr,
|
||||
dict: ESPDict,
|
||||
list: ESPList,
|
||||
Lambda: ESPLambda,
|
||||
}
|
||||
if IS_PY2:
|
||||
class ESPUnicode(unicode, ESPHomeDataBase):
|
||||
pass
|
||||
|
||||
ESP_TYPES[unicode] = ESPUnicode
|
||||
|
||||
|
||||
def make_data_base(value):
|
||||
for typ, cons in ESP_TYPES.items():
|
||||
if isinstance(value, typ):
|
||||
return cons(value)
|
||||
return value
|
||||
|
||||
|
||||
def _add_data_ref(fn):
|
||||
@functools.wraps(fn)
|
||||
def wrapped(loader, node):
|
||||
res = fn(loader, node)
|
||||
res = make_data_base(res)
|
||||
if isinstance(res, ESPHomeDataBase):
|
||||
res.from_node(node)
|
||||
return res
|
||||
|
||||
return wrapped
|
||||
|
||||
|
||||
class ESPHomeLoader(yaml.SafeLoader): # pylint: disable=too-many-ancestors
|
||||
"""Loader class that keeps track of line numbers."""
|
||||
|
||||
def compose_node(self, parent, index):
|
||||
"""Annotate a node with the first line it was seen."""
|
||||
last_line = self.line # type: int
|
||||
node = super(SafeLineLoader, self).compose_node(parent, index) # type: yaml.nodes.Node
|
||||
node.__line__ = last_line + 1
|
||||
return node
|
||||
@_add_data_ref
|
||||
def construct_yaml_int(self, node):
|
||||
return super(ESPHomeLoader, self).construct_yaml_int(node)
|
||||
|
||||
@_add_data_ref
|
||||
def construct_yaml_float(self, node):
|
||||
return super(ESPHomeLoader, self).construct_yaml_float(node)
|
||||
|
||||
@_add_data_ref
|
||||
def construct_yaml_binary(self, node):
|
||||
return super(ESPHomeLoader, self).construct_yaml_binary(node)
|
||||
|
||||
@_add_data_ref
|
||||
def construct_yaml_omap(self, node):
|
||||
return super(ESPHomeLoader, self).construct_yaml_omap(node)
|
||||
|
||||
@_add_data_ref
|
||||
def construct_yaml_str(self, node):
|
||||
return super(ESPHomeLoader, self).construct_yaml_str(node)
|
||||
|
||||
@_add_data_ref
|
||||
def construct_yaml_seq(self, node):
|
||||
return super(ESPHomeLoader, self).construct_yaml_seq(node)
|
||||
|
||||
def custom_flatten_mapping(self, node):
|
||||
pre_merge = []
|
||||
post_merge = []
|
||||
index = 0
|
||||
while index < len(node.value):
|
||||
if isinstance(node.value[index], yaml.ScalarNode):
|
||||
index += 1
|
||||
continue
|
||||
|
||||
key_node, value_node = node.value[index]
|
||||
if key_node.tag == u'tag:yaml.org,2002:merge':
|
||||
del node.value[index]
|
||||
|
||||
if isinstance(value_node, yaml.MappingNode):
|
||||
self.custom_flatten_mapping(value_node)
|
||||
node.value = node.value[:index] + value_node.value + node.value[index:]
|
||||
elif isinstance(value_node, yaml.SequenceNode):
|
||||
submerge = []
|
||||
for subnode in value_node.value:
|
||||
if not isinstance(subnode, yaml.MappingNode):
|
||||
raise yaml.constructor.ConstructorError(
|
||||
"while constructing a mapping", node.start_mark,
|
||||
"expected a mapping for merging, but found %{}".format(subnode.id),
|
||||
subnode.start_mark)
|
||||
self.custom_flatten_mapping(subnode)
|
||||
submerge.append(subnode.value)
|
||||
# submerge.reverse()
|
||||
node.value = node.value[:index] + submerge + node.value[index:]
|
||||
elif isinstance(value_node, yaml.ScalarNode):
|
||||
node.value = node.value[:index] + [value_node] + node.value[index:]
|
||||
# post_merge.append(value_node)
|
||||
else:
|
||||
raise yaml.constructor.ConstructorError(
|
||||
"while constructing a mapping", node.start_mark,
|
||||
"expected a mapping or list of mappings for merging, "
|
||||
"but found {}".format(value_node.id), value_node.start_mark)
|
||||
elif key_node.tag == u'tag:yaml.org,2002:value':
|
||||
key_node.tag = u'tag:yaml.org,2002:str'
|
||||
index += 1
|
||||
else:
|
||||
index += 1
|
||||
if pre_merge:
|
||||
node.value = pre_merge + node.value
|
||||
if post_merge:
|
||||
node.value = node.value + post_merge
|
||||
|
||||
def custom_construct_pairs(self, node):
|
||||
pairs = []
|
||||
for kv in node.value:
|
||||
if isinstance(kv, yaml.ScalarNode):
|
||||
obj = self.construct_object(kv)
|
||||
if not isinstance(obj, dict):
|
||||
raise EsphomeError(
|
||||
"Expected mapping for anchored include tag, got {}".format(type(obj)))
|
||||
for key, value in obj.items():
|
||||
pairs.append((key, value))
|
||||
else:
|
||||
key_node, value_node = kv
|
||||
key = self.construct_object(key_node)
|
||||
value = self.construct_object(value_node)
|
||||
pairs.append((key, value))
|
||||
|
||||
return pairs
|
||||
|
||||
@_add_data_ref
|
||||
def construct_yaml_map(self, node):
|
||||
self.custom_flatten_mapping(node)
|
||||
nodes = self.custom_construct_pairs(node)
|
||||
|
||||
seen = {}
|
||||
for (key, _), nv in zip(nodes, node.value):
|
||||
if isinstance(nv, yaml.ScalarNode):
|
||||
line = nv.start_mark.line
|
||||
else:
|
||||
line = nv[0].start_mark.line
|
||||
|
||||
try:
|
||||
hash(key)
|
||||
except TypeError:
|
||||
raise yaml.MarkedYAMLError(
|
||||
context="invalid key: \"{}\"".format(key),
|
||||
context_mark=yaml.Mark(self.name, 0, line, -1, None, None)
|
||||
)
|
||||
|
||||
if key in seen:
|
||||
raise yaml.MarkedYAMLError(
|
||||
context="duplicate key: \"{}\"".format(key),
|
||||
context_mark=yaml.Mark(self.name, 0, line, -1, None, None)
|
||||
)
|
||||
seen[key] = line
|
||||
|
||||
return OrderedDict(nodes)
|
||||
|
||||
@_add_data_ref
|
||||
def construct_env_var(self, node):
|
||||
args = node.value.split()
|
||||
# Check for a default value
|
||||
if len(args) > 1:
|
||||
return os.getenv(args[0], u' '.join(args[1:]))
|
||||
if args[0] in os.environ:
|
||||
return os.environ[args[0]]
|
||||
raise yaml.MarkedYAMLError(
|
||||
context=u"Environment variable '{}' not defined".format(node.value),
|
||||
context_mark=node.start_mark
|
||||
)
|
||||
|
||||
@property
|
||||
def _directory(self):
|
||||
return os.path.dirname(self.name)
|
||||
|
||||
def _rel_path(self, *args):
|
||||
return os.path.join(self._directory, *args)
|
||||
|
||||
@_add_data_ref
|
||||
def construct_secret(self, node):
|
||||
secrets = _load_yaml_internal(self._rel_path(SECRET_YAML))
|
||||
if node.value not in secrets:
|
||||
raise yaml.MarkedYAMLError(
|
||||
context=u"Secret '{}' not defined".format(node.value),
|
||||
context_mark=node.start_mark
|
||||
)
|
||||
val = secrets[node.value]
|
||||
_SECRET_VALUES[text_type(val)] = node.value
|
||||
return val
|
||||
|
||||
@_add_data_ref
|
||||
def construct_include(self, node):
|
||||
return _load_yaml_internal(self._rel_path(node.value))
|
||||
|
||||
@_add_data_ref
|
||||
def construct_include_dir_list(self, node):
|
||||
files = _filter_yaml_files(_find_files(self._rel_path(node.value), '*.yaml'))
|
||||
return [_load_yaml_internal(f) for f in files]
|
||||
|
||||
@_add_data_ref
|
||||
def construct_include_dir_merge_list(self, node):
|
||||
files = _filter_yaml_files(_find_files(self._rel_path(node.value), '*.yaml'))
|
||||
merged_list = []
|
||||
for fname in files:
|
||||
loaded_yaml = _load_yaml_internal(fname)
|
||||
if isinstance(loaded_yaml, list):
|
||||
merged_list.extend(loaded_yaml)
|
||||
return merged_list
|
||||
|
||||
@_add_data_ref
|
||||
def construct_include_dir_named(self, node):
|
||||
files = _filter_yaml_files(_find_files(self._rel_path(node.value), '*.yaml'))
|
||||
mapping = OrderedDict()
|
||||
for fname in files:
|
||||
filename = os.path.splitext(os.path.basename(fname))[0]
|
||||
mapping[filename] = _load_yaml_internal(fname)
|
||||
return mapping
|
||||
|
||||
@_add_data_ref
|
||||
def construct_include_dir_merge_named(self, node):
|
||||
files = _filter_yaml_files(_find_files(self._rel_path(node.value), '*.yaml'))
|
||||
mapping = OrderedDict()
|
||||
for fname in files:
|
||||
loaded_yaml = _load_yaml_internal(fname)
|
||||
if isinstance(loaded_yaml, dict):
|
||||
mapping.update(loaded_yaml)
|
||||
return mapping
|
||||
|
||||
@_add_data_ref
|
||||
def construct_lambda(self, node):
|
||||
return Lambda(text_type(node.value))
|
||||
|
||||
|
||||
def _filter_yaml_files(files):
|
||||
files = [f for f in files if os.path.basename(f) != SECRET_YAML]
|
||||
files = [f for f in files if not os.path.basename(f).startswith('.')]
|
||||
return files
|
||||
|
||||
|
||||
ESPHomeLoader.add_constructor(u'tag:yaml.org,2002:int', ESPHomeLoader.construct_yaml_int)
|
||||
ESPHomeLoader.add_constructor(u'tag:yaml.org,2002:float', ESPHomeLoader.construct_yaml_float)
|
||||
ESPHomeLoader.add_constructor(u'tag:yaml.org,2002:binary', ESPHomeLoader.construct_yaml_binary)
|
||||
ESPHomeLoader.add_constructor(u'tag:yaml.org,2002:omap', ESPHomeLoader.construct_yaml_omap)
|
||||
ESPHomeLoader.add_constructor(u'tag:yaml.org,2002:str', ESPHomeLoader.construct_yaml_str)
|
||||
ESPHomeLoader.add_constructor(u'tag:yaml.org,2002:seq', ESPHomeLoader.construct_yaml_seq)
|
||||
ESPHomeLoader.add_constructor(u'tag:yaml.org,2002:map', ESPHomeLoader.construct_yaml_map)
|
||||
ESPHomeLoader.add_constructor('!env_var', ESPHomeLoader.construct_env_var)
|
||||
ESPHomeLoader.add_constructor('!secret', ESPHomeLoader.construct_secret)
|
||||
ESPHomeLoader.add_constructor('!include', ESPHomeLoader.construct_include)
|
||||
ESPHomeLoader.add_constructor('!include_dir_list', ESPHomeLoader.construct_include_dir_list)
|
||||
ESPHomeLoader.add_constructor('!include_dir_merge_list',
|
||||
ESPHomeLoader.construct_include_dir_merge_list)
|
||||
ESPHomeLoader.add_constructor('!include_dir_named', ESPHomeLoader.construct_include_dir_named)
|
||||
ESPHomeLoader.add_constructor('!include_dir_merge_named',
|
||||
ESPHomeLoader.construct_include_dir_merge_named)
|
||||
ESPHomeLoader.add_constructor('!lambda', ESPHomeLoader.construct_lambda)
|
||||
|
||||
|
||||
def load_yaml(fname):
|
||||
@@ -50,158 +328,21 @@ def load_yaml(fname):
|
||||
|
||||
|
||||
def _load_yaml_internal(fname):
|
||||
"""Load a YAML file."""
|
||||
content = read_config_file(fname)
|
||||
loader = ESPHomeLoader(content)
|
||||
loader.name = fname
|
||||
try:
|
||||
with codecs.open(fname, encoding='utf-8') as conf_file:
|
||||
return yaml.load(conf_file, Loader=SafeLineLoader) or OrderedDict()
|
||||
return loader.get_single_data() or OrderedDict()
|
||||
except yaml.YAMLError as exc:
|
||||
raise EsphomeError(exc)
|
||||
except IOError as exc:
|
||||
raise EsphomeError(u"Error accessing file {}: {}".format(fname, exc))
|
||||
except UnicodeDecodeError as exc:
|
||||
_LOGGER.error(u"Unable to read file %s: %s", fname, exc)
|
||||
raise EsphomeError(exc)
|
||||
finally:
|
||||
loader.dispose()
|
||||
|
||||
|
||||
def dump(dict_):
|
||||
"""Dump YAML to a string and remove null."""
|
||||
return yaml.safe_dump(
|
||||
dict_, default_flow_style=False, allow_unicode=True)
|
||||
|
||||
|
||||
def custom_construct_pairs(loader, node):
|
||||
pairs = []
|
||||
for kv in node.value:
|
||||
if isinstance(kv, yaml.ScalarNode):
|
||||
obj = loader.construct_object(kv)
|
||||
if not isinstance(obj, dict):
|
||||
raise EsphomeError(
|
||||
"Expected mapping for anchored include tag, got {}".format(type(obj)))
|
||||
for key, value in obj.items():
|
||||
pairs.append((key, value))
|
||||
else:
|
||||
key_node, value_node = kv
|
||||
key = loader.construct_object(key_node)
|
||||
value = loader.construct_object(value_node)
|
||||
pairs.append((key, value))
|
||||
|
||||
return pairs
|
||||
|
||||
|
||||
def custom_flatten_mapping(loader, node):
|
||||
pre_merge = []
|
||||
post_merge = []
|
||||
index = 0
|
||||
while index < len(node.value):
|
||||
if isinstance(node.value[index], yaml.ScalarNode):
|
||||
index += 1
|
||||
continue
|
||||
|
||||
key_node, value_node = node.value[index]
|
||||
if key_node.tag == u'tag:yaml.org,2002:merge':
|
||||
del node.value[index]
|
||||
|
||||
if isinstance(value_node, yaml.MappingNode):
|
||||
custom_flatten_mapping(loader, value_node)
|
||||
node.value = node.value[:index] + value_node.value + node.value[index:]
|
||||
elif isinstance(value_node, yaml.SequenceNode):
|
||||
submerge = []
|
||||
for subnode in value_node.value:
|
||||
if not isinstance(subnode, yaml.MappingNode):
|
||||
raise yaml.constructor.ConstructorError(
|
||||
"while constructing a mapping", node.start_mark,
|
||||
"expected a mapping for merging, but found %{}".format(subnode.id),
|
||||
subnode.start_mark)
|
||||
custom_flatten_mapping(loader, subnode)
|
||||
submerge.append(subnode.value)
|
||||
# submerge.reverse()
|
||||
node.value = node.value[:index] + submerge + node.value[index:]
|
||||
elif isinstance(value_node, yaml.ScalarNode):
|
||||
node.value = node.value[:index] + [value_node] + node.value[index:]
|
||||
# post_merge.append(value_node)
|
||||
else:
|
||||
raise yaml.constructor.ConstructorError(
|
||||
"while constructing a mapping", node.start_mark,
|
||||
"expected a mapping or list of mappings for merging, "
|
||||
"but found {}".format(value_node.id), value_node.start_mark)
|
||||
elif key_node.tag == u'tag:yaml.org,2002:value':
|
||||
key_node.tag = u'tag:yaml.org,2002:str'
|
||||
index += 1
|
||||
else:
|
||||
index += 1
|
||||
if pre_merge:
|
||||
node.value = pre_merge + node.value
|
||||
if post_merge:
|
||||
node.value = node.value + post_merge
|
||||
|
||||
|
||||
def _ordered_dict(loader, node):
|
||||
"""Load YAML mappings into an ordered dictionary to preserve key order."""
|
||||
custom_flatten_mapping(loader, node)
|
||||
nodes = custom_construct_pairs(loader, node)
|
||||
|
||||
seen = {}
|
||||
for (key, _), nv in zip(nodes, node.value):
|
||||
if isinstance(nv, yaml.ScalarNode):
|
||||
line = nv.start_mark.line
|
||||
else:
|
||||
line = nv[0].start_mark.line
|
||||
|
||||
try:
|
||||
hash(key)
|
||||
except TypeError:
|
||||
fname = getattr(loader.stream, 'name', '')
|
||||
raise yaml.MarkedYAMLError(
|
||||
context="invalid key: \"{}\"".format(key),
|
||||
context_mark=yaml.Mark(fname, 0, line, -1, None, None)
|
||||
)
|
||||
|
||||
if key in seen:
|
||||
fname = getattr(loader.stream, 'name', '')
|
||||
raise EsphomeError(u'YAML file {} contains duplicate key "{}". '
|
||||
u'Check lines {} and {}.'.format(fname, key, seen[key], line))
|
||||
seen[key] = line
|
||||
|
||||
return _add_reference(OrderedDict(nodes), loader, node)
|
||||
|
||||
|
||||
def _construct_seq(loader, node):
|
||||
"""Add line number and file name to Load YAML sequence."""
|
||||
obj, = loader.construct_yaml_seq(node)
|
||||
return _add_reference(obj, loader, node)
|
||||
|
||||
|
||||
def _add_reference(obj, loader, node):
|
||||
"""Add file reference information to an object."""
|
||||
if isinstance(obj, string_types):
|
||||
obj = NodeStrClass(obj)
|
||||
if isinstance(obj, list):
|
||||
obj = NodeListClass(obj)
|
||||
setattr(obj, '__config_file__', loader.name)
|
||||
setattr(obj, '__line__', node.start_mark.line)
|
||||
return obj
|
||||
|
||||
|
||||
def _env_var_yaml(_, node):
|
||||
"""Load environment variables and embed it into the configuration YAML."""
|
||||
args = node.value.split()
|
||||
|
||||
# Check for a default value
|
||||
if len(args) > 1:
|
||||
return os.getenv(args[0], u' '.join(args[1:]))
|
||||
if args[0] in os.environ:
|
||||
return os.environ[args[0]]
|
||||
raise EsphomeError(u"Environment variable {} not defined.".format(node.value))
|
||||
|
||||
|
||||
def _include_yaml(loader, node):
|
||||
"""Load another YAML file and embeds it using the !include tag.
|
||||
|
||||
Example:
|
||||
device_tracker: !include device_tracker.yaml
|
||||
"""
|
||||
fname = os.path.join(os.path.dirname(loader.name), node.value)
|
||||
return _add_reference(_load_yaml_internal(fname), loader, node)
|
||||
return yaml.dump(dict_, default_flow_style=False, allow_unicode=True,
|
||||
Dumper=ESPHomeDumper)
|
||||
|
||||
|
||||
def _is_file_valid(name):
|
||||
@@ -219,49 +360,6 @@ def _find_files(directory, pattern):
|
||||
yield filename
|
||||
|
||||
|
||||
def _include_dir_named_yaml(loader, node):
|
||||
"""Load multiple files from directory as a dictionary."""
|
||||
mapping = OrderedDict() # type: OrderedDict
|
||||
loc = os.path.join(os.path.dirname(loader.name), node.value)
|
||||
for fname in _find_files(loc, '*.yaml'):
|
||||
filename = os.path.splitext(os.path.basename(fname))[0]
|
||||
mapping[filename] = _load_yaml_internal(fname)
|
||||
return _add_reference(mapping, loader, node)
|
||||
|
||||
|
||||
def _include_dir_merge_named_yaml(loader, node):
|
||||
"""Load multiple files from directory as a merged dictionary."""
|
||||
mapping = OrderedDict() # type: OrderedDict
|
||||
loc = os.path.join(os.path.dirname(loader.name), node.value)
|
||||
for fname in _find_files(loc, '*.yaml'):
|
||||
if os.path.basename(fname) == SECRET_YAML:
|
||||
continue
|
||||
loaded_yaml = _load_yaml_internal(fname)
|
||||
if isinstance(loaded_yaml, dict):
|
||||
mapping.update(loaded_yaml)
|
||||
return _add_reference(mapping, loader, node)
|
||||
|
||||
|
||||
def _include_dir_list_yaml(loader, node):
|
||||
"""Load multiple files from directory as a list."""
|
||||
loc = os.path.join(os.path.dirname(loader.name), node.value)
|
||||
return [_load_yaml_internal(f) for f in _find_files(loc, '*.yaml')
|
||||
if os.path.basename(f) != SECRET_YAML]
|
||||
|
||||
|
||||
def _include_dir_merge_list_yaml(loader, node):
|
||||
"""Load multiple files from directory as a merged list."""
|
||||
path = os.path.join(os.path.dirname(loader.name), node.value)
|
||||
merged_list = []
|
||||
for fname in _find_files(path, '*.yaml'):
|
||||
if os.path.basename(fname) == SECRET_YAML:
|
||||
continue
|
||||
loaded_yaml = _load_yaml_internal(fname)
|
||||
if isinstance(loaded_yaml, list):
|
||||
merged_list.extend(loaded_yaml)
|
||||
return _add_reference(merged_list, loader, node)
|
||||
|
||||
|
||||
def is_secret(value):
|
||||
try:
|
||||
return _SECRET_VALUES[text_type(value)]
|
||||
@@ -269,144 +367,99 @@ def is_secret(value):
|
||||
return None
|
||||
|
||||
|
||||
# pylint: disable=protected-access
|
||||
def _secret_yaml(loader, node):
|
||||
"""Load secrets and embed it into the configuration YAML."""
|
||||
secret_path = os.path.join(os.path.dirname(loader.name), SECRET_YAML)
|
||||
secrets = _load_yaml_internal(secret_path)
|
||||
if node.value not in secrets:
|
||||
raise EsphomeError(u"Secret {} not defined".format(node.value))
|
||||
val = secrets[node.value]
|
||||
_SECRET_VALUES[text_type(val)] = node.value
|
||||
return val
|
||||
class ESPHomeDumper(yaml.SafeDumper): # pylint: disable=too-many-ancestors
|
||||
def represent_mapping(self, tag, mapping, flow_style=None):
|
||||
value = []
|
||||
node = yaml.MappingNode(tag, value, flow_style=flow_style)
|
||||
if self.alias_key is not None:
|
||||
self.represented_objects[self.alias_key] = node
|
||||
best_style = True
|
||||
if hasattr(mapping, 'items'):
|
||||
mapping = list(mapping.items())
|
||||
for item_key, item_value in mapping:
|
||||
node_key = self.represent_data(item_key)
|
||||
node_value = self.represent_data(item_value)
|
||||
if not (isinstance(node_key, yaml.ScalarNode) and not node_key.style):
|
||||
best_style = False
|
||||
if not (isinstance(node_value, yaml.ScalarNode) and not node_value.style):
|
||||
best_style = False
|
||||
value.append((node_key, node_value))
|
||||
if flow_style is None:
|
||||
if self.default_flow_style is not None:
|
||||
node.flow_style = self.default_flow_style
|
||||
else:
|
||||
node.flow_style = best_style
|
||||
return node
|
||||
|
||||
def represent_secret(self, value):
|
||||
return self.represent_scalar(tag=u'!secret', value=_SECRET_VALUES[text_type(value)])
|
||||
|
||||
def _lambda(loader, node):
|
||||
return Lambda(text_type(node.value))
|
||||
def represent_stringify(self, value):
|
||||
if is_secret(value):
|
||||
return self.represent_secret(value)
|
||||
return self.represent_scalar(tag=u'tag:yaml.org,2002:str', value=text_type(value))
|
||||
|
||||
# pylint: disable=arguments-differ
|
||||
def represent_bool(self, value):
|
||||
return self.represent_scalar(u'tag:yaml.org,2002:bool', u'true' if value else u'false')
|
||||
|
||||
yaml.SafeLoader.add_constructor(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, _ordered_dict)
|
||||
yaml.SafeLoader.add_constructor(yaml.resolver.BaseResolver.DEFAULT_SEQUENCE_TAG, _construct_seq)
|
||||
yaml.SafeLoader.add_constructor('!env_var', _env_var_yaml)
|
||||
yaml.SafeLoader.add_constructor('!secret', _secret_yaml)
|
||||
yaml.SafeLoader.add_constructor('!include', _include_yaml)
|
||||
yaml.SafeLoader.add_constructor('!include_dir_list', _include_dir_list_yaml)
|
||||
yaml.SafeLoader.add_constructor('!include_dir_merge_list',
|
||||
_include_dir_merge_list_yaml)
|
||||
yaml.SafeLoader.add_constructor('!include_dir_named', _include_dir_named_yaml)
|
||||
yaml.SafeLoader.add_constructor('!include_dir_merge_named',
|
||||
_include_dir_merge_named_yaml)
|
||||
yaml.SafeLoader.add_constructor('!lambda', _lambda)
|
||||
def represent_int(self, value):
|
||||
if is_secret(value):
|
||||
return self.represent_secret(value)
|
||||
return self.represent_scalar(tag=u'tag:yaml.org,2002:int', value=text_type(value))
|
||||
|
||||
|
||||
# From: https://gist.github.com/miracle2k/3184458
|
||||
# pylint: disable=redefined-outer-name
|
||||
def represent_odict(dump, tag, mapping, flow_style=None):
|
||||
"""Like BaseRepresenter.represent_mapping but does not issue the sort()."""
|
||||
value = []
|
||||
node = yaml.MappingNode(tag, value, flow_style=flow_style)
|
||||
if dump.alias_key is not None:
|
||||
dump.represented_objects[dump.alias_key] = node
|
||||
best_style = True
|
||||
if hasattr(mapping, 'items'):
|
||||
mapping = list(mapping.items())
|
||||
for item_key, item_value in mapping:
|
||||
node_key = dump.represent_data(item_key)
|
||||
node_value = dump.represent_data(item_value)
|
||||
if not (isinstance(node_key, yaml.ScalarNode) and not node_key.style):
|
||||
best_style = False
|
||||
if not (isinstance(node_value, yaml.ScalarNode) and
|
||||
not node_value.style):
|
||||
best_style = False
|
||||
value.append((node_key, node_value))
|
||||
if flow_style is None:
|
||||
if dump.default_flow_style is not None:
|
||||
node.flow_style = dump.default_flow_style
|
||||
def represent_float(self, value):
|
||||
if is_secret(value):
|
||||
return self.represent_secret(value)
|
||||
# pylint: disable=comparison-with-itself
|
||||
if value != value or (value == 0.0 and value == 1.0):
|
||||
value = u'.nan'
|
||||
elif value == self.inf_value:
|
||||
value = u'.inf'
|
||||
elif value == -self.inf_value:
|
||||
value = u'-.inf'
|
||||
else:
|
||||
node.flow_style = best_style
|
||||
return node
|
||||
value = text_type(repr(value)).lower()
|
||||
# Note that in some cases `repr(data)` represents a float number
|
||||
# without the decimal parts. For instance:
|
||||
# >>> repr(1e17)
|
||||
# '1e17'
|
||||
# Unfortunately, this is not a valid float representation according
|
||||
# to the definition of the `!!float` tag. We fix this by adding
|
||||
# '.0' before the 'e' symbol.
|
||||
if u'.' not in value and u'e' in value:
|
||||
value = value.replace(u'e', u'.0e', 1)
|
||||
return self.represent_scalar(tag=u'tag:yaml.org,2002:float', value=value)
|
||||
|
||||
def represent_lambda(self, value):
|
||||
if is_secret(value.value):
|
||||
return self.represent_secret(value.value)
|
||||
return self.represent_scalar(tag='!lambda', value=value.value, style='|')
|
||||
|
||||
def represent_id(self, value):
|
||||
if is_secret(value.id):
|
||||
return self.represent_secret(value.id)
|
||||
return self.represent_stringify(value.id)
|
||||
|
||||
|
||||
def represent_secret(value):
|
||||
return yaml.ScalarNode(tag=u'!secret', value=_SECRET_VALUES[text_type(value)])
|
||||
|
||||
|
||||
def unicode_representer(_, uni):
|
||||
if is_secret(uni):
|
||||
return represent_secret(uni)
|
||||
node = yaml.ScalarNode(tag=u'tag:yaml.org,2002:str', value=uni)
|
||||
return node
|
||||
|
||||
|
||||
def hex_int_representer(_, data):
|
||||
if is_secret(data):
|
||||
return represent_secret(data)
|
||||
node = yaml.ScalarNode(tag=u'tag:yaml.org,2002:int', value=str(data))
|
||||
return node
|
||||
|
||||
|
||||
def stringify_representer(_, data):
|
||||
if is_secret(data):
|
||||
return represent_secret(data)
|
||||
node = yaml.ScalarNode(tag=u'tag:yaml.org,2002:str', value=str(data))
|
||||
return node
|
||||
|
||||
|
||||
TIME_PERIOD_UNIT_MAP = {
|
||||
'microseconds': 'us',
|
||||
'milliseconds': 'ms',
|
||||
'seconds': 's',
|
||||
'minutes': 'min',
|
||||
'hours': 'h',
|
||||
'days': 'd',
|
||||
}
|
||||
|
||||
|
||||
def represent_time_period(dumper, data):
|
||||
dictionary = data.as_dict()
|
||||
if len(dictionary) == 1:
|
||||
unit, value = dictionary.popitem()
|
||||
out = '{}{}'.format(value, TIME_PERIOD_UNIT_MAP[unit])
|
||||
return yaml.ScalarNode(tag=u'tag:yaml.org,2002:str', value=out)
|
||||
return represent_odict(dumper, 'tag:yaml.org,2002:map', dictionary)
|
||||
|
||||
|
||||
def represent_lambda(_, data):
|
||||
if is_secret(data.value):
|
||||
return represent_secret(data.value)
|
||||
node = yaml.ScalarNode(tag='!lambda', value=data.value, style='|')
|
||||
return node
|
||||
|
||||
|
||||
def represent_id(_, data):
|
||||
if is_secret(data.id):
|
||||
return represent_secret(data.id)
|
||||
return yaml.ScalarNode(tag=u'tag:yaml.org,2002:str', value=data.id)
|
||||
|
||||
|
||||
yaml.SafeDumper.add_representer(
|
||||
OrderedDict,
|
||||
lambda dumper, value:
|
||||
represent_odict(dumper, 'tag:yaml.org,2002:map', value)
|
||||
ESPHomeDumper.add_multi_representer(
|
||||
dict,
|
||||
lambda dumper, value: dumper.represent_mapping('tag:yaml.org,2002:map', value)
|
||||
)
|
||||
|
||||
yaml.SafeDumper.add_representer(
|
||||
NodeListClass,
|
||||
lambda dumper, value:
|
||||
dumper.represent_sequence('tag:yaml.org,2002:seq', value)
|
||||
)
|
||||
|
||||
yaml.SafeDumper.add_representer(str, unicode_representer)
|
||||
yaml.SafeDumper.add_representer(
|
||||
core.AutoLoad,
|
||||
lambda dumper, value: represent_odict(dumper, 'tag:yaml.org,2002:map', value)
|
||||
ESPHomeDumper.add_multi_representer(
|
||||
list,
|
||||
lambda dumper, value: dumper.represent_sequence('tag:yaml.org,2002:seq', value)
|
||||
)
|
||||
ESPHomeDumper.add_multi_representer(bool, ESPHomeDumper.represent_bool)
|
||||
ESPHomeDumper.add_multi_representer(str, ESPHomeDumper.represent_stringify)
|
||||
ESPHomeDumper.add_multi_representer(int, ESPHomeDumper.represent_int)
|
||||
ESPHomeDumper.add_multi_representer(float, ESPHomeDumper.represent_float)
|
||||
if IS_PY2:
|
||||
yaml.SafeDumper.add_representer(unicode, unicode_representer)
|
||||
yaml.SafeDumper.add_representer(HexInt, hex_int_representer)
|
||||
yaml.SafeDumper.add_representer(IPAddress, stringify_representer)
|
||||
yaml.SafeDumper.add_representer(MACAddress, stringify_representer)
|
||||
yaml.SafeDumper.add_multi_representer(TimePeriod, represent_time_period)
|
||||
yaml.SafeDumper.add_multi_representer(Lambda, represent_lambda)
|
||||
yaml.SafeDumper.add_multi_representer(core.ID, represent_id)
|
||||
yaml.SafeDumper.add_multi_representer(uuid.UUID, stringify_representer)
|
||||
ESPHomeDumper.add_multi_representer(unicode, ESPHomeDumper.represent_stringify)
|
||||
ESPHomeDumper.add_multi_representer(long, ESPHomeDumper.represent_int)
|
||||
ESPHomeDumper.add_multi_representer(IPAddress, ESPHomeDumper.represent_stringify)
|
||||
ESPHomeDumper.add_multi_representer(MACAddress, ESPHomeDumper.represent_stringify)
|
||||
ESPHomeDumper.add_multi_representer(TimePeriod, ESPHomeDumper.represent_stringify)
|
||||
ESPHomeDumper.add_multi_representer(Lambda, ESPHomeDumper.represent_lambda)
|
||||
ESPHomeDumper.add_multi_representer(core.ID, ESPHomeDumper.represent_id)
|
||||
ESPHomeDumper.add_multi_representer(uuid.UUID, ESPHomeDumper.represent_stringify)
|
||||
|
Reference in New Issue
Block a user