mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-31 15:12:06 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1272 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1272 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import annotations
 | |
| 
 | |
| import abc
 | |
| from contextlib import contextmanager
 | |
| import contextvars
 | |
| import functools
 | |
| import heapq
 | |
| import logging
 | |
| import re
 | |
| from typing import Any
 | |
| 
 | |
| import voluptuous as vol
 | |
| 
 | |
| from esphome import core, loader, pins, yaml_util
 | |
| from esphome.config_helpers import Extend, Remove, merge_dicts_ordered
 | |
| import esphome.config_validation as cv
 | |
| from esphome.const import (
 | |
|     CONF_ESPHOME,
 | |
|     CONF_EXTERNAL_COMPONENTS,
 | |
|     CONF_ID,
 | |
|     CONF_MIN_VERSION,
 | |
|     CONF_PACKAGES,
 | |
|     CONF_PLATFORM,
 | |
|     CONF_SUBSTITUTIONS,
 | |
| )
 | |
| from esphome.core import CORE, DocumentRange, EsphomeError
 | |
| import esphome.core.config as core_config
 | |
| import esphome.final_validate as fv
 | |
| from esphome.helpers import indent
 | |
| from esphome.loader import ComponentManifest, get_component, get_platform
 | |
| from esphome.log import AnsiFore, color
 | |
| from esphome.types import ConfigFragmentType, ConfigType
 | |
| from esphome.util import OrderedDict, safe_print
 | |
| from esphome.voluptuous_schema import ExtraKeysInvalid
 | |
| from esphome.yaml_util import ESPHomeDataBase, ESPLiteralValue, is_secret
 | |
| 
 | |
| _LOGGER = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| def iter_components(config):
 | |
|     for domain, conf in config.items():
 | |
|         component = get_component(domain)
 | |
|         yield domain, component
 | |
|         if component.is_platform_component:
 | |
|             for p_config in conf:
 | |
|                 p_name = f"{domain}.{p_config[CONF_PLATFORM]}"
 | |
|                 platform = get_platform(domain, p_config[CONF_PLATFORM])
 | |
|                 yield p_name, platform
 | |
| 
 | |
| 
 | |
| def iter_component_configs(config):
 | |
|     for domain, conf in config.items():
 | |
|         component = get_component(domain)
 | |
|         if component.multi_conf:
 | |
|             for conf_ in conf:
 | |
|                 yield domain, component, conf_
 | |
|         else:
 | |
|             yield domain, component, conf
 | |
|         if component.is_platform_component:
 | |
|             for p_config in conf:
 | |
|                 p_name = f"{domain}.{p_config[CONF_PLATFORM]}"
 | |
|                 platform = get_platform(domain, p_config[CONF_PLATFORM])
 | |
|                 yield p_name, platform, p_config
 | |
| 
 | |
| 
 | |
| ConfigPath = list[str | int]
 | |
| path_context = contextvars.ContextVar("Config path")
 | |
| 
 | |
| 
 | |
| def _add_auto_load_steps(result: Config, loads: list[str]) -> None:
 | |
|     """Add AutoLoadValidationStep for each component in loads that isn't already loaded."""
 | |
|     for load in loads:
 | |
|         if load not in result:
 | |
|             result.add_validation_step(AutoLoadValidationStep(load))
 | |
| 
 | |
| 
 | |
| def _process_auto_load(
 | |
|     result: Config, platform: ComponentManifest, path: ConfigPath
 | |
| ) -> None:
 | |
|     # Process platform's AUTO_LOAD
 | |
|     auto_load = platform.auto_load
 | |
|     if isinstance(auto_load, list):
 | |
|         _add_auto_load_steps(result, auto_load)
 | |
|     elif callable(auto_load):
 | |
|         import inspect
 | |
| 
 | |
|         if inspect.signature(auto_load).parameters:
 | |
|             result.add_validation_step(
 | |
|                 AddDynamicAutoLoadsValidationStep(path, platform)
 | |
|             )
 | |
|         else:
 | |
|             _add_auto_load_steps(result, auto_load())
 | |
| 
 | |
| 
 | |
| def _process_platform_config(
 | |
|     result: Config,
 | |
|     component_name: str,
 | |
|     platform_name: str,
 | |
|     platform_config: ConfigType,
 | |
|     path: ConfigPath,
 | |
| ) -> None:
 | |
|     """Process a platform configuration and add necessary validation steps.
 | |
| 
 | |
|     This is shared between LoadValidationStep and AutoLoadValidationStep to avoid duplication.
 | |
|     """
 | |
|     # Get the platform manifest
 | |
|     platform = get_platform(component_name, platform_name)
 | |
|     if platform is None:
 | |
|         result.add_str_error(
 | |
|             f"Platform not found: '{component_name}.{platform_name}'", path
 | |
|         )
 | |
|         return
 | |
| 
 | |
|     # Add platform to loaded integrations
 | |
|     CORE.loaded_integrations.add(platform_name)
 | |
|     CORE.loaded_platforms.add(f"{component_name}/{platform_name}")
 | |
| 
 | |
|     # Process platform's AUTO_LOAD
 | |
|     _process_auto_load(result, platform, path)
 | |
| 
 | |
|     # Add validation steps for the platform
 | |
|     p_domain = f"{component_name}.{platform_name}"
 | |
|     result.add_output_path(path, p_domain)
 | |
|     result.add_validation_step(
 | |
|         MetadataValidationStep(path, p_domain, platform_config, platform)
 | |
|     )
 | |
| 
 | |
| 
 | |
| def _path_begins_with(path: ConfigPath, other: ConfigPath) -> bool:
 | |
|     if len(path) < len(other):
 | |
|         return False
 | |
|     return path[: len(other)] == other
 | |
| 
 | |
| 
 | |
| @functools.total_ordering
 | |
| class _ValidationStepTask:
 | |
|     def __init__(self, priority: float, id_number: int, step: ConfigValidationStep):
 | |
|         self.priority = priority
 | |
|         self.id_number = id_number
 | |
|         self.step = step
 | |
| 
 | |
|     @property
 | |
|     def _cmp_tuple(self) -> tuple[float, int]:
 | |
|         return (-self.priority, self.id_number)
 | |
| 
 | |
|     def __eq__(self, other):
 | |
|         return self._cmp_tuple == other._cmp_tuple
 | |
| 
 | |
|     def __ne__(self, other):
 | |
|         return not (self == other)
 | |
| 
 | |
|     def __lt__(self, other):
 | |
|         return self._cmp_tuple < other._cmp_tuple
 | |
| 
 | |
| 
 | |
| class Config(OrderedDict, fv.FinalValidateConfig):
 | |
|     def __init__(self):
 | |
|         super().__init__()
 | |
|         # A list of voluptuous errors
 | |
|         self.errors: list[vol.Invalid] = []
 | |
|         # A list of paths that should be fully outputted
 | |
|         # The values will be the paths to all "domain", for example (['logger'], 'logger')
 | |
|         # or (['sensor', 'ultrasonic'], 'sensor.ultrasonic')
 | |
|         self.output_paths: list[tuple[ConfigPath, str]] = []
 | |
|         # A list of components ids with the config path
 | |
|         self.declare_ids: list[tuple[core.ID, ConfigPath]] = []
 | |
|         self._data = {}
 | |
|         # Store pending validation tasks (in heap order)
 | |
|         self._validation_tasks: list[_ValidationStepTask] = []
 | |
|         # ID to ensure stable order for keys with equal priority
 | |
|         self._validation_tasks_id = 0
 | |
| 
 | |
|     def add_error(self, error: vol.Invalid) -> None:
 | |
|         if isinstance(error, vol.MultipleInvalid):
 | |
|             for err in error.errors:
 | |
|                 self.add_error(err)
 | |
|             return
 | |
|         if cv.ROOT_CONFIG_PATH in error.path:
 | |
|             # Root value means that the path before the root should be ignored
 | |
|             last_root = max(
 | |
|                 i for i, v in enumerate(error.path) if v is cv.ROOT_CONFIG_PATH
 | |
|             )
 | |
|             # can't change the path so re-create the error
 | |
|             error = vol.Invalid(
 | |
|                 message=error.error_message,
 | |
|                 path=error.path[last_root + 1 :],
 | |
|                 error_type=error.error_type,
 | |
|             )
 | |
|         self.errors.append(error)
 | |
| 
 | |
|     def add_validation_step(self, step: ConfigValidationStep):
 | |
|         id_num = self._validation_tasks_id
 | |
|         self._validation_tasks_id += 1
 | |
|         heapq.heappush(
 | |
|             self._validation_tasks, _ValidationStepTask(step.priority, id_num, step)
 | |
|         )
 | |
| 
 | |
|     def run_validation_steps(self):
 | |
|         while self._validation_tasks and not self.errors:
 | |
|             task = heapq.heappop(self._validation_tasks)
 | |
|             task.step.run(self)
 | |
| 
 | |
|     @contextmanager
 | |
|     def catch_error(self, path=None):
 | |
|         path = path or []
 | |
|         try:
 | |
|             yield
 | |
|         except cv.FinalExternalInvalid as e:
 | |
|             self.add_error(e)
 | |
|         except vol.Invalid as e:
 | |
|             e.prepend(path)
 | |
|             self.add_error(e)
 | |
| 
 | |
|     def add_str_error(self, message: str, path: ConfigPath) -> None:
 | |
|         self.add_error(vol.Invalid(message, path))
 | |
| 
 | |
|     def add_output_path(self, path: ConfigPath, domain: str) -> None:
 | |
|         self.output_paths.append((path, domain))
 | |
| 
 | |
|     def remove_output_path(self, path: ConfigPath, domain: str) -> None:
 | |
|         self.output_paths.remove((path, domain))
 | |
| 
 | |
|     def is_in_error_path(self, path: ConfigPath) -> bool:
 | |
|         return any(_path_begins_with(err.path, path) for err in self.errors)
 | |
| 
 | |
|     def set_by_path(self, path, value):
 | |
|         conf = self
 | |
|         for key in path[:-1]:
 | |
|             conf = conf[key]
 | |
|         conf[path[-1]] = value
 | |
| 
 | |
|     def get_error_for_path(self, path: ConfigPath) -> vol.Invalid | None:
 | |
|         for err in self.errors:
 | |
|             if self.get_deepest_path(err.path) == path:
 | |
|                 self.errors.remove(err)
 | |
|                 return err
 | |
|         return None
 | |
| 
 | |
|     def get_deepest_document_range_for_path(
 | |
|         self, path: ConfigPath, get_key: bool = False
 | |
|     ) -> DocumentRange | None:
 | |
|         data = self
 | |
|         doc_range = None
 | |
|         for index, path_item in enumerate(path):
 | |
|             try:
 | |
|                 if path_item in data:
 | |
|                     key_data = [x for x in data if x == path_item][0]
 | |
|                     if isinstance(key_data, ESPHomeDataBase):
 | |
|                         doc_range = key_data.esp_range
 | |
|                         if get_key and index == len(path) - 1:
 | |
|                             return doc_range
 | |
|                 data = data[path_item]
 | |
|             except (KeyError, IndexError, TypeError, AttributeError):
 | |
|                 return doc_range
 | |
|             if isinstance(data, core.ID):
 | |
|                 data = data.id
 | |
|             if isinstance(data, ESPHomeDataBase) and data.esp_range is not None:
 | |
|                 doc_range = data.esp_range
 | |
|             elif isinstance(data, dict):
 | |
|                 platform_item = data.get("platform")
 | |
|                 if (
 | |
|                     isinstance(platform_item, ESPHomeDataBase)
 | |
|                     and platform_item.esp_range is not None
 | |
|                 ):
 | |
|                     doc_range = platform_item.esp_range
 | |
| 
 | |
|         return doc_range
 | |
| 
 | |
|     def get_nested_item(
 | |
|         self, path: ConfigPath, raise_error: bool = False
 | |
|     ) -> ConfigFragmentType:
 | |
|         data = self
 | |
|         for item_index in path:
 | |
|             try:
 | |
|                 data = data[item_index]
 | |
|             except (KeyError, IndexError, TypeError):
 | |
|                 if raise_error:
 | |
|                     raise
 | |
|                 return {}
 | |
|         return data
 | |
| 
 | |
|     def get_deepest_path(self, path: ConfigPath) -> ConfigPath:
 | |
|         """Return the path that is the deepest reachable by following path."""
 | |
|         data = self
 | |
|         part = []
 | |
|         for item_index in path:
 | |
|             try:
 | |
|                 data = data[item_index]
 | |
|             except (KeyError, IndexError, TypeError):
 | |
|                 return part
 | |
|             part.append(item_index)
 | |
|         return part
 | |
| 
 | |
|     def get_path_for_id(self, id: core.ID):
 | |
|         """Return the config fragment where the given ID is declared."""
 | |
|         for declared_id, path in self.declare_ids:
 | |
|             if declared_id.id == str(id):
 | |
|                 return path
 | |
|         raise KeyError(f"ID {id} not found in configuration")
 | |
| 
 | |
|     def get_config_for_path(self, path: ConfigPath) -> ConfigFragmentType:
 | |
|         return self.get_nested_item(path, raise_error=True)
 | |
| 
 | |
|     @property
 | |
|     def data(self):
 | |
|         """Return temporary data used by final validation functions."""
 | |
|         return self._data
 | |
| 
 | |
| 
 | |
| def iter_ids(config, path=None):
 | |
|     path = path or []
 | |
|     if isinstance(config, core.ID):
 | |
|         yield config, path
 | |
|     elif isinstance(config, core.Lambda):
 | |
|         for id in config.requires_ids:
 | |
|             yield id, path
 | |
|     elif isinstance(config, list):
 | |
|         for i, item in enumerate(config):
 | |
|             yield from iter_ids(item, path + [i])
 | |
|     elif isinstance(config, dict):
 | |
|         for key, value in config.items():
 | |
|             if isinstance(key, core.ID):
 | |
|                 yield key, path
 | |
|             yield from iter_ids(value, path + [key])
 | |
| 
 | |
| 
 | |
| def recursive_check_replaceme(value):
 | |
|     if isinstance(value, list):
 | |
|         return cv.Schema([recursive_check_replaceme])(value)
 | |
|     if isinstance(value, dict):
 | |
|         return cv.Schema({cv.valid: recursive_check_replaceme})(value)
 | |
|     if isinstance(value, ESPLiteralValue):
 | |
|         pass
 | |
|     if isinstance(value, str) and value == "REPLACEME":
 | |
|         raise cv.Invalid(
 | |
|             "Found 'REPLACEME' in configuration, this is most likely an error. "
 | |
|             "Please make sure you have replaced all fields from the sample "
 | |
|             "configuration.\n"
 | |
|             "If you want to use the literal REPLACEME string, "
 | |
|             'please use "!literal REPLACEME"'
 | |
|         )
 | |
|     return value
 | |
| 
 | |
| 
 | |
| class ConfigValidationStep(abc.ABC):
 | |
|     """A step to for the validation phase."""
 | |
| 
 | |
|     # Priority of this step, higher means run earlier
 | |
|     priority: float = 0.0
 | |
| 
 | |
|     @abc.abstractmethod
 | |
|     def run(self, result: Config) -> None: ...  # noqa: E704
 | |
| 
 | |
| 
 | |
| class LoadTargetPlatformValidationStep(ConfigValidationStep):
 | |
|     """Load target platform step."""
 | |
| 
 | |
|     def __init__(self, domain: str, conf: ConfigType):
 | |
|         self.domain = domain
 | |
|         self.conf = conf
 | |
| 
 | |
|     def run(self, result: Config) -> None:
 | |
|         if self.conf is None:
 | |
|             result[self.domain] = self.conf = {}
 | |
|         result.add_output_path([self.domain], self.domain)
 | |
|         component = get_component(self.domain)
 | |
| 
 | |
|         result[self.domain] = self.conf
 | |
|         path = [self.domain]
 | |
|         CORE.loaded_integrations.add(self.domain)
 | |
| 
 | |
|         result.add_validation_step(
 | |
|             SchemaValidationStep(self.domain, path, self.conf, component)
 | |
|         )
 | |
| 
 | |
| 
 | |
| class LoadValidationStep(ConfigValidationStep):
 | |
|     """Load step, this step is called once for each domain config fragment.
 | |
| 
 | |
|     Responsibilities:
 | |
|     - Load component code
 | |
|     - Ensure all AUTO_LOADs are added
 | |
|     - Set output paths of result
 | |
|     """
 | |
| 
 | |
|     def __init__(self, domain: str, conf: ConfigType):
 | |
|         self.domain = domain
 | |
|         self.conf = conf
 | |
| 
 | |
|     def run(self, result: Config) -> None:
 | |
|         if self.domain.startswith("."):
 | |
|             # Ignore top-level keys starting with a dot
 | |
|             return
 | |
|         result.add_output_path([self.domain], self.domain)
 | |
|         component = get_component(self.domain)
 | |
|         if (
 | |
|             component is not None
 | |
|             and component.multi_conf_no_default
 | |
|             and isinstance(self.conf, core.AutoLoad)
 | |
|         ):
 | |
|             self.conf = []
 | |
|         result[self.domain] = self.conf
 | |
|         path = [self.domain]
 | |
|         if component is None:
 | |
|             result.add_str_error(f"Component not found: {self.domain}", path)
 | |
|             return
 | |
|         CORE.loaded_integrations.add(self.domain)
 | |
|         # For platform components, normalize conf before creating MetadataValidationStep
 | |
|         if component.is_platform_component:
 | |
|             if not self.conf:
 | |
|                 result[self.domain] = self.conf = []
 | |
|             elif not isinstance(self.conf, list):
 | |
|                 result[self.domain] = self.conf = [self.conf]
 | |
| 
 | |
|         # Process AUTO_LOAD
 | |
|         _process_auto_load(result, component, path)
 | |
| 
 | |
|         result.add_validation_step(
 | |
|             MetadataValidationStep([self.domain], self.domain, self.conf, component)
 | |
|         )
 | |
| 
 | |
|         if not component.is_platform_component:
 | |
|             return
 | |
| 
 | |
|         # This is a platform component, proceed to reading platform entries
 | |
|         # Remove this is as an output path
 | |
|         result.remove_output_path([self.domain], self.domain)
 | |
| 
 | |
|         for i, p_config in enumerate(self.conf):
 | |
|             path = [self.domain, i]
 | |
|             # Construct temporary unknown output path
 | |
|             p_domain = f"{self.domain}.unknown"
 | |
|             result.add_output_path(path, p_domain)
 | |
|             result[self.domain][i] = p_config
 | |
|             if not isinstance(p_config, dict):
 | |
|                 result.add_str_error("Platform schemas must be key-value pairs.", path)
 | |
|                 continue
 | |
|             p_name = p_config.get("platform")
 | |
|             if p_name is None:
 | |
|                 p_id = p_config.get(CONF_ID)
 | |
|                 if isinstance(p_id, Extend):
 | |
|                     result.add_str_error(
 | |
|                         f"Source for extension of ID '{p_id.value}' was not found.",
 | |
|                         path + [CONF_ID],
 | |
|                     )
 | |
|                     continue
 | |
|                 if isinstance(p_id, Remove):
 | |
|                     result.add_str_error(
 | |
|                         f"Source for removal of ID '{p_id.value}' was not found.",
 | |
|                         path + [CONF_ID],
 | |
|                     )
 | |
|                     continue
 | |
|                 result.add_str_error(
 | |
|                     f"'{self.domain}' requires a 'platform' key but it was not specified.",
 | |
|                     path,
 | |
|                 )
 | |
|                 continue
 | |
|             # Remove temp output path
 | |
|             result.remove_output_path(path, p_domain)
 | |
| 
 | |
|             # Process the platform configuration
 | |
|             _process_platform_config(result, self.domain, p_name, p_config, path)
 | |
| 
 | |
| 
 | |
| class AutoLoadValidationStep(ConfigValidationStep):
 | |
|     """Auto load step. This step is used to automatically load components if
 | |
|     a component requested that with AUTO_LOAD.
 | |
|     """
 | |
| 
 | |
|     # Only load after all regular loads have taken place
 | |
|     priority = -1.0
 | |
| 
 | |
|     def __init__(self, domain: str):
 | |
|         self.domain = domain
 | |
| 
 | |
|     def run(self, result: Config) -> None:
 | |
|         # Regular component auto-load (no platform)
 | |
|         if "." not in self.domain:
 | |
|             if self.domain in result:
 | |
|                 # already loaded
 | |
|                 return
 | |
|             result.add_validation_step(LoadValidationStep(self.domain, core.AutoLoad()))
 | |
|             return
 | |
| 
 | |
|         # Platform-specific auto-load (e.g., "ota.web_server")
 | |
|         component_name, _, platform_name = self.domain.partition(".")
 | |
| 
 | |
|         # Check if component exists
 | |
|         if component_name not in result:
 | |
|             # Component doesn't exist, load it first
 | |
|             result.add_validation_step(LoadValidationStep(component_name, []))
 | |
|             # Re-run this step after the component is loaded
 | |
|             result.add_validation_step(AutoLoadValidationStep(self.domain))
 | |
|             return
 | |
| 
 | |
|         # Component exists, check if it's a platform component
 | |
|         component = get_component(component_name)
 | |
|         if component is None or not component.is_platform_component:
 | |
|             result.add_str_error(
 | |
|                 f"Component {component_name} is not a platform component, "
 | |
|                 f"cannot auto-load platform {platform_name}",
 | |
|                 [component_name],
 | |
|             )
 | |
|             return
 | |
| 
 | |
|         # Ensure the component config is a list
 | |
|         component_conf = result.get(component_name)
 | |
|         if not isinstance(component_conf, list):
 | |
|             component_conf = result[component_name] = []
 | |
| 
 | |
|         # Check if platform already exists
 | |
|         if any(
 | |
|             isinstance(conf, dict) and conf.get(CONF_PLATFORM) == platform_name
 | |
|             for conf in component_conf
 | |
|         ):
 | |
|             return
 | |
| 
 | |
|         # Add and process the platform configuration
 | |
|         platform_conf = core.AutoLoad()
 | |
|         platform_conf[CONF_PLATFORM] = platform_name
 | |
|         component_conf.append(platform_conf)
 | |
| 
 | |
|         path = [component_name, len(component_conf) - 1]
 | |
|         _process_platform_config(
 | |
|             result, component_name, platform_name, platform_conf, path
 | |
|         )
 | |
| 
 | |
| 
 | |
| class MetadataValidationStep(ConfigValidationStep):
 | |
|     """Validate component metadata
 | |
| 
 | |
|     Responsibilties:
 | |
|      - Config transformation (nullable, multi conf)
 | |
|      - Check dependencies
 | |
|      - Check conflicts
 | |
|      - Check supported target platforms
 | |
|     """
 | |
| 
 | |
|     # All components need to be loaded first to ensure dependency check works
 | |
|     priority = -2.0
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         path: ConfigPath,
 | |
|         domain: str,
 | |
|         conf: ConfigType,
 | |
|         component: ComponentManifest,
 | |
|     ) -> None:
 | |
|         self.path = path
 | |
|         self.domain = domain
 | |
|         self.conf = conf
 | |
|         self.comp = component
 | |
| 
 | |
|     def run(self, result: Config) -> None:
 | |
|         if self.conf is None:
 | |
|             if self.comp.multi_conf and self.comp.multi_conf_no_default:
 | |
|                 result[self.domain] = self.conf = []
 | |
|             else:
 | |
|                 result[self.domain] = self.conf = {}
 | |
| 
 | |
|         success = True
 | |
|         for dependency in self.comp.dependencies:
 | |
|             dependency_parts = dependency.split(".")
 | |
|             if len(dependency_parts) > 2:
 | |
|                 result.add_str_error(
 | |
|                     "Dependencies must be specified as a single component or in component.platform format only",
 | |
|                     self.path,
 | |
|                 )
 | |
|                 return
 | |
|             component_dep = dependency_parts[0]
 | |
|             platform_dep = dependency_parts[-1]
 | |
|             if component_dep not in result:
 | |
|                 result.add_str_error(
 | |
|                     f"Component {self.domain} requires component {component_dep}",
 | |
|                     self.path,
 | |
|                 )
 | |
|                 success = False
 | |
|             elif component_dep != platform_dep and (
 | |
|                 not isinstance(platform_list := result.get(component_dep), list)
 | |
|                 or not any(CONF_PLATFORM in p for p in platform_list)
 | |
|                 or not any(p[CONF_PLATFORM] == platform_dep for p in platform_list)
 | |
|             ):
 | |
|                 result.add_str_error(
 | |
|                     f"Component {self.domain} requires 'platform: {platform_dep}' in component '{component_dep}'",
 | |
|                     self.path,
 | |
|                 )
 | |
|                 success = False
 | |
|         if not success:
 | |
|             return
 | |
| 
 | |
|         success = True
 | |
|         for conflict in self.comp.conflicts_with:
 | |
|             if conflict in result:
 | |
|                 result.add_str_error(
 | |
|                     f"Component {self.domain} cannot be used together with component {conflict}",
 | |
|                     self.path,
 | |
|                 )
 | |
|                 success = False
 | |
|         if not success:
 | |
|             return
 | |
| 
 | |
|         if (
 | |
|             not self.comp.is_platform_component
 | |
|             and self.comp.config_schema is None
 | |
|             and not isinstance(self.conf, core.AutoLoad)
 | |
|         ):
 | |
|             result.add_str_error(
 | |
|                 f"Component {self.domain} cannot be loaded via YAML "
 | |
|                 "(no CONFIG_SCHEMA).",
 | |
|                 self.path,
 | |
|             )
 | |
|             return
 | |
| 
 | |
|         if self.comp.multi_conf:
 | |
|             if not isinstance(self.conf, list):
 | |
|                 result[self.domain] = self.conf = [self.conf]
 | |
|             if (
 | |
|                 not isinstance(self.comp.multi_conf, bool)
 | |
|                 and len(self.conf) > self.comp.multi_conf
 | |
|             ):
 | |
|                 result.add_str_error(
 | |
|                     f"Component {self.domain} supports a maximum of {self.comp.multi_conf} "
 | |
|                     f"entries ({len(self.conf)} found).",
 | |
|                     self.path,
 | |
|                 )
 | |
|                 return
 | |
|             for i, part_conf in enumerate(self.conf):
 | |
|                 path = self.path + [i]
 | |
|                 result.add_validation_step(
 | |
|                     SchemaValidationStep(self.domain, path, part_conf, self.comp)
 | |
|                 )
 | |
|                 result.add_validation_step(FinalValidateValidationStep(path, self.comp))
 | |
| 
 | |
|             return
 | |
| 
 | |
|         result.add_validation_step(
 | |
|             SchemaValidationStep(self.domain, self.path, self.conf, self.comp)
 | |
|         )
 | |
|         result.add_validation_step(FinalValidateValidationStep(self.path, self.comp))
 | |
| 
 | |
| 
 | |
| class AddDynamicAutoLoadsValidationStep(ConfigValidationStep):
 | |
|     """Add dynamic auto loads step.
 | |
| 
 | |
|     This step is used to auto-load components where one component can alter its
 | |
|     AUTO_LOAD based on its configuration.
 | |
|     """
 | |
| 
 | |
|     # Has to happen after normal schema is validated and before final schema validation
 | |
|     priority = -5.0
 | |
| 
 | |
|     def __init__(self, path: ConfigPath, comp: ComponentManifest) -> None:
 | |
|         self.path = path
 | |
|         self.comp = comp
 | |
| 
 | |
|     def run(self, result: Config) -> None:
 | |
|         if result.errors:
 | |
|             # If result already has errors, skip this step
 | |
|             return
 | |
| 
 | |
|         conf = result.get_nested_item(self.path)
 | |
|         with result.catch_error(self.path):
 | |
|             auto_load = self.comp.auto_load
 | |
|             if not callable(auto_load):
 | |
|                 return
 | |
|             loads = auto_load(conf)
 | |
|             _add_auto_load_steps(result, loads)
 | |
| 
 | |
| 
 | |
| class SchemaValidationStep(ConfigValidationStep):
 | |
|     """Schema validation step.
 | |
| 
 | |
|     During this step all CONFIG_SCHEMAs are checked against the configs.
 | |
|     """
 | |
| 
 | |
|     def __init__(
 | |
|         self, domain: str, path: ConfigPath, conf: ConfigType, comp: ComponentManifest
 | |
|     ):
 | |
|         self.domain = domain
 | |
|         self.path = path
 | |
|         self.conf = conf
 | |
|         self.comp = comp
 | |
| 
 | |
|     def run(self, result: Config) -> None:
 | |
|         token = path_context.set(self.path)
 | |
|         # The domain already contains the full component path (e.g., "sensor.template", "sensor.uptime")
 | |
|         with CORE.component_context(self.domain), result.catch_error(self.path):
 | |
|             if self.comp.is_platform:
 | |
|                 # Remove 'platform' key for validation
 | |
|                 input_conf = OrderedDict(self.conf)
 | |
|                 platform_val = input_conf.pop("platform")
 | |
|                 schema = cv.Schema(self.comp.config_schema)
 | |
|                 validated = schema(input_conf)
 | |
|                 # Ensure result is OrderedDict so we can call move_to_end
 | |
|                 if not isinstance(validated, OrderedDict):
 | |
|                     validated = OrderedDict(validated)
 | |
|                 validated["platform"] = platform_val
 | |
|                 validated.move_to_end("platform", last=False)
 | |
|                 result.set_by_path(self.path, validated)
 | |
|             elif self.comp.config_schema is not None:
 | |
|                 schema = cv.Schema(self.comp.config_schema)
 | |
|                 validated = schema(self.conf)
 | |
|                 result.set_by_path(self.path, validated)
 | |
| 
 | |
|         path_context.reset(token)
 | |
| 
 | |
| 
 | |
| class IDPassValidationStep(ConfigValidationStep):
 | |
|     """ID Pass step.
 | |
| 
 | |
|     During this step all ID references are checked.
 | |
| 
 | |
|     If an automatic ID reference is used, a fitting declared ID is automatically searched.
 | |
|     Also checks duplicate ID names, and that referenced IDs are declared.
 | |
|     """
 | |
| 
 | |
|     # Has to happen after all schemas validated
 | |
|     priority = -10.0
 | |
| 
 | |
|     def __init__(self) -> None:
 | |
|         pass
 | |
| 
 | |
|     def run(self, result: Config) -> None:
 | |
|         from esphome.cpp_generator import MockObjClass
 | |
|         from esphome.cpp_types import Component
 | |
| 
 | |
|         if result.errors:
 | |
|             # If result already has errors, skip this step
 | |
|             # Otherwise the user will get a bunch of missing ID warnings
 | |
|             # because the component that did not validate doesn't have any IDs set
 | |
|             return
 | |
| 
 | |
|         searching_ids: list[tuple[core.ID, ConfigPath]] = []
 | |
|         for id, path in iter_ids(result):
 | |
|             if id.is_declaration:
 | |
|                 if id.id is not None:
 | |
|                     # Look for duplicate definitions
 | |
|                     match = next(
 | |
|                         (v for v in result.declare_ids if v[0].id == id.id), None
 | |
|                     )
 | |
|                     if match is not None:
 | |
|                         opath = "->".join(str(v) for v in match[1])
 | |
|                         result.add_str_error(
 | |
|                             f"ID {id.id} redefined! Check {opath}", path
 | |
|                         )
 | |
|                         continue
 | |
|                 result.declare_ids.append((id, path))
 | |
|             else:
 | |
|                 searching_ids.append((id, path))
 | |
| 
 | |
|         # Resolve default ids after manual IDs
 | |
|         for id, _ in result.declare_ids:
 | |
|             id.resolve([v[0].id for v in result.declare_ids])
 | |
|             if isinstance(id.type, MockObjClass) and id.type.inherits_from(Component):
 | |
|                 CORE.component_ids.add(id.id)
 | |
| 
 | |
|         # Check searched IDs
 | |
|         for id, path in searching_ids:
 | |
|             if id.id is not None:
 | |
|                 # manually declared
 | |
|                 match = next(
 | |
|                     (v[0] for v in result.declare_ids if v[0].id == id.id), None
 | |
|                 )
 | |
|                 if match is None or not match.is_manual:
 | |
|                     # No declared ID with this name
 | |
|                     import difflib
 | |
| 
 | |
|                     error = (
 | |
|                         f"Couldn't find ID '{id.id}'. Please check you have defined "
 | |
|                         "an ID with that name in your configuration."
 | |
|                     )
 | |
|                     # Find candidates
 | |
|                     matches = difflib.get_close_matches(
 | |
|                         id.id, [v[0].id for v in result.declare_ids if v[0].is_manual]
 | |
|                     )
 | |
|                     if matches:
 | |
|                         matches_s = ", ".join(f'"{x}"' for x in matches)
 | |
|                         error += f" These IDs look similar: {matches_s}."
 | |
|                     result.add_str_error(error, path)
 | |
|                     continue
 | |
|                 if not isinstance(match.type, MockObjClass) or not isinstance(
 | |
|                     id.type, MockObjClass
 | |
|                 ):
 | |
|                     continue
 | |
|                 if not match.type.inherits_from(id.type):
 | |
|                     result.add_str_error(
 | |
|                         f"ID '{id.id}' of type {match.type} doesn't inherit from {id.type}. "
 | |
|                         "Please double check your ID is pointing to the correct value",
 | |
|                         path,
 | |
|                     )
 | |
| 
 | |
|             if id.id is None and id.type is not None:
 | |
|                 matches = []
 | |
|                 for v in result.declare_ids:
 | |
|                     if v[0] is None or not isinstance(v[0].type, MockObjClass):
 | |
|                         continue
 | |
|                     inherits = v[0].type.inherits_from(id.type)
 | |
|                     if inherits:
 | |
|                         matches.append(v[0])
 | |
| 
 | |
|                 if len(matches) == 0:
 | |
|                     result.add_str_error(
 | |
|                         f"Couldn't find any component that can be used for '{id.type}'. Are you missing a hub declaration?",
 | |
|                         path,
 | |
|                     )
 | |
|                 elif len(matches) == 1:
 | |
|                     id.id = matches[0].id
 | |
|                 elif len(matches) > 1:
 | |
|                     if str(id.type) == "time::RealTimeClock":
 | |
|                         id.id = matches[0].id
 | |
|                     else:
 | |
|                         manual_declared_count = sum(1 for m in matches if m.is_manual)
 | |
|                         if manual_declared_count > 0:
 | |
|                             ids = ", ".join(
 | |
|                                 [f"'{m.id}'" for m in matches if m.is_manual]
 | |
|                             )
 | |
|                             result.add_str_error(
 | |
|                                 f"Too many candidates found for '{path[-1]}' type '{id.type}' {'Some are' if manual_declared_count > 1 else 'One is'} {ids}",
 | |
|                                 path,
 | |
|                             )
 | |
|                         else:
 | |
|                             result.add_str_error(
 | |
|                                 f"Too many candidates found for '{path[-1]}' type '{id.type}' You must assign an explicit ID to the parent component you want to use.",
 | |
|                                 path,
 | |
|                             )
 | |
| 
 | |
| 
 | |
| class RemoveReferenceValidationStep(ConfigValidationStep):
 | |
|     """
 | |
|     Make sure all !remove references have been removed from the config.
 | |
|     Any left overs mean the merge step couldn't find corresponding previously existing id/key
 | |
|     """
 | |
| 
 | |
|     def run(self, result: Config) -> None:
 | |
|         if result.errors:
 | |
|             # If result already has errors, skip this step
 | |
|             return
 | |
| 
 | |
|         def recursive_check_remove_tag(config: Config, path: ConfigPath = None):
 | |
|             path = path or []
 | |
| 
 | |
|             if isinstance(config, Remove):
 | |
|                 result.add_str_error(
 | |
|                     f"Source for removal at '{'->'.join([str(p) for p in path])}' was not found.",
 | |
|                     path,
 | |
|                 )
 | |
|             elif isinstance(config, list):
 | |
|                 for i, item in enumerate(config):
 | |
|                     recursive_check_remove_tag(item, path + [i])
 | |
|             elif isinstance(config, dict):
 | |
|                 for key, value in config.items():
 | |
|                     recursive_check_remove_tag(value, path + [key])
 | |
| 
 | |
|         recursive_check_remove_tag(result)
 | |
| 
 | |
| 
 | |
| class FinalValidateValidationStep(ConfigValidationStep):
 | |
|     """Run final_validate_schema for all components."""
 | |
| 
 | |
|     # Has to happen after ID pass validated
 | |
|     priority = -20.0
 | |
| 
 | |
|     def __init__(self, path: ConfigPath, comp: ComponentManifest) -> None:
 | |
|         self.path = path
 | |
|         self.comp = comp
 | |
| 
 | |
|     def run(self, result: Config) -> None:
 | |
|         if result.errors:
 | |
|             # If result already has errors, skip this step
 | |
|             return
 | |
| 
 | |
|         token = fv.full_config.set(result)
 | |
| 
 | |
|         conf = result.get_nested_item(self.path)
 | |
|         with result.catch_error(self.path):
 | |
|             if self.comp.final_validate_schema is not None:
 | |
|                 self.comp.final_validate_schema(conf)
 | |
| 
 | |
|         fv.full_config.reset(token)
 | |
| 
 | |
| 
 | |
| class PinUseValidationCheck(ConfigValidationStep):
 | |
|     """Check for pin reuse"""
 | |
| 
 | |
|     priority = -30  # Should happen after component final validations
 | |
| 
 | |
|     def __init__(self) -> None:
 | |
|         pass
 | |
| 
 | |
|     def run(self, result: Config) -> None:
 | |
|         if result.errors:
 | |
|             # If result already has errors, skip this step
 | |
|             return
 | |
|         pins.PIN_SCHEMA_REGISTRY.final_validate(result)
 | |
| 
 | |
| 
 | |
| def validate_config(
 | |
|     config: dict[str, Any],
 | |
|     command_line_substitutions: dict[str, Any],
 | |
|     skip_external_update: bool = False,
 | |
| ) -> Config:
 | |
|     result = Config()
 | |
| 
 | |
|     loader.clear_component_meta_finders()
 | |
|     loader.install_custom_components_meta_finder()
 | |
| 
 | |
|     # 0. Load packages
 | |
|     if CONF_PACKAGES in config:
 | |
|         from esphome.components.packages import do_packages_pass
 | |
| 
 | |
|         result.add_output_path([CONF_PACKAGES], CONF_PACKAGES)
 | |
|         try:
 | |
|             config = do_packages_pass(config, skip_update=skip_external_update)
 | |
|         except vol.Invalid as err:
 | |
|             result.update(config)
 | |
|             result.add_error(err)
 | |
|             return result
 | |
| 
 | |
|     CORE.raw_config = config
 | |
| 
 | |
|     # 1. Load substitutions
 | |
|     if CONF_SUBSTITUTIONS in config or command_line_substitutions:
 | |
|         from esphome.components import substitutions
 | |
| 
 | |
|         result[CONF_SUBSTITUTIONS] = merge_dicts_ordered(
 | |
|             config.get(CONF_SUBSTITUTIONS) or {}, command_line_substitutions
 | |
|         )
 | |
|         result.add_output_path([CONF_SUBSTITUTIONS], CONF_SUBSTITUTIONS)
 | |
|         try:
 | |
|             substitutions.do_substitution_pass(config, command_line_substitutions)
 | |
|         except vol.Invalid as err:
 | |
|             result.add_error(err)
 | |
|             return result
 | |
| 
 | |
|     CORE.raw_config = config
 | |
| 
 | |
|     # 1.1. Check for REPLACEME special value
 | |
|     try:
 | |
|         recursive_check_replaceme(config)
 | |
|     except vol.Invalid as err:
 | |
|         result.add_error(err)
 | |
| 
 | |
|     # 1.2. Load external_components
 | |
|     if CONF_EXTERNAL_COMPONENTS in config:
 | |
|         from esphome.components.external_components import do_external_components_pass
 | |
| 
 | |
|         result.add_output_path([CONF_EXTERNAL_COMPONENTS], CONF_EXTERNAL_COMPONENTS)
 | |
|         try:
 | |
|             do_external_components_pass(config, skip_update=skip_external_update)
 | |
|         except vol.Invalid as err:
 | |
|             result.update(config)
 | |
|             result.add_error(err)
 | |
|             return result
 | |
| 
 | |
|     if "esphomeyaml" in config:
 | |
|         _LOGGER.warning(
 | |
|             "The esphomeyaml section has been renamed to esphome in 1.11.0. "
 | |
|             "Please replace 'esphomeyaml:' in your configuration with 'esphome:'."
 | |
|         )
 | |
|         config[CONF_ESPHOME] = config.pop("esphomeyaml")
 | |
| 
 | |
|     if CONF_ESPHOME not in config:
 | |
|         result.add_str_error(
 | |
|             "'esphome' section missing from configuration. Please make sure "
 | |
|             "your configuration has an 'esphome:' line in it.",
 | |
|             [],
 | |
|         )
 | |
|         return result
 | |
| 
 | |
|     # 2. Load partial core config
 | |
|     result[CONF_ESPHOME] = config[CONF_ESPHOME]
 | |
|     result.add_output_path([CONF_ESPHOME], CONF_ESPHOME)
 | |
|     try:
 | |
|         target_platform = core_config.preload_core_config(config, result)
 | |
|     except vol.Invalid as err:
 | |
|         result.add_error(err)
 | |
|         return result
 | |
|     # Remove temporary esphome config path again, it will be reloaded later
 | |
|     result.remove_output_path([CONF_ESPHOME], CONF_ESPHOME)
 | |
| 
 | |
|     # Check version number now to avoid loading components that are not supported
 | |
|     if min_version := config[CONF_ESPHOME].get(CONF_MIN_VERSION):
 | |
|         cv.All(cv.version_number, cv.validate_esphome_version)(min_version)
 | |
| 
 | |
|     # First run platform validation steps
 | |
|     result.add_validation_step(
 | |
|         LoadTargetPlatformValidationStep(target_platform, config[target_platform])
 | |
|     )
 | |
|     result.run_validation_steps()
 | |
| 
 | |
|     if result.errors:
 | |
|         # do not try to validate further as we don't know what the target is
 | |
|         return result
 | |
| 
 | |
|     # Reset the pin registry so that any target platforms with pin validations do not get the duplicate pin warning.
 | |
|     pins.PIN_SCHEMA_REGISTRY.reset()
 | |
| 
 | |
|     for domain, conf in config.items():
 | |
|         result.add_validation_step(LoadValidationStep(domain, conf))
 | |
|     result.add_validation_step(IDPassValidationStep())
 | |
|     result.add_validation_step(PinUseValidationCheck())
 | |
| 
 | |
|     result.add_validation_step(RemoveReferenceValidationStep())
 | |
| 
 | |
|     result.run_validation_steps()
 | |
| 
 | |
|     return result
 | |
| 
 | |
| 
 | |
| def humanize_error(config, validation_error):
 | |
|     validation_error = str(validation_error)
 | |
|     m = re.match(
 | |
|         r"^(.*?)\s*(?:for dictionary value )?@ data\[.*$", validation_error, re.DOTALL
 | |
|     )
 | |
|     if m is not None:
 | |
|         validation_error = m.group(1)
 | |
|     validation_error = validation_error.strip()
 | |
|     if not validation_error.endswith("."):
 | |
|         validation_error += "."
 | |
|     return validation_error
 | |
| 
 | |
| 
 | |
| def _get_parent_name(path, config):
 | |
|     if not path:
 | |
|         return "<root>"
 | |
|     for domain_path, domain in config.output_paths:
 | |
|         if _path_begins_with(path, domain_path):
 | |
|             if len(path) > len(domain_path):
 | |
|                 # Sub-item
 | |
|                 break
 | |
|             return domain
 | |
|     # When processing a list, skip back over the index
 | |
|     while len(path) > 1 and isinstance(path[-1], int):
 | |
|         path = path[:-1]
 | |
|     return path[-1]
 | |
| 
 | |
| 
 | |
| def _format_vol_invalid(ex: vol.Invalid, config: Config) -> str:
 | |
|     message = ""
 | |
| 
 | |
|     paren = _get_parent_name(ex.path[:-1], config)
 | |
| 
 | |
|     if isinstance(ex, ExtraKeysInvalid):
 | |
|         if ex.candidates:
 | |
|             message += f"[{ex.path[-1]}] is an invalid option for [{paren}]. Did you mean {', '.join(f'[{x}]' for x in ex.candidates)}?"
 | |
|         else:
 | |
|             message += f"[{ex.path[-1]}] is an invalid option for [{paren}]. Please check the indentation."
 | |
|     elif "extra keys not allowed" in str(ex):
 | |
|         message += f"[{ex.path[-1]}] is an invalid option for [{paren}]."
 | |
|     elif isinstance(ex, vol.RequiredFieldInvalid):
 | |
|         if ex.msg == "required key not provided":
 | |
|             message += f"'{ex.path[-1]}' is a required option for [{paren}]."
 | |
|         else:
 | |
|             # Required has set a custom error message
 | |
|             message += ex.msg
 | |
|     else:
 | |
|         message += humanize_error(config, ex)
 | |
| 
 | |
|     return message
 | |
| 
 | |
| 
 | |
| class InvalidYAMLError(EsphomeError):
 | |
|     def __init__(self, base_exc):
 | |
|         try:
 | |
|             base = str(base_exc)
 | |
|         except UnicodeDecodeError:
 | |
|             base = repr(base_exc)
 | |
|         message = f"Invalid YAML syntax:\n\n{base}"
 | |
|         super().__init__(message)
 | |
|         self.base_exc = base_exc
 | |
| 
 | |
| 
 | |
| def _load_config(
 | |
|     command_line_substitutions: dict[str, Any], skip_external_update: bool = False
 | |
| ) -> Config:
 | |
|     """Load the configuration file."""
 | |
|     try:
 | |
|         config = yaml_util.load_yaml(CORE.config_path)
 | |
|     except EsphomeError as e:
 | |
|         raise InvalidYAMLError(e) from e
 | |
| 
 | |
|     try:
 | |
|         return validate_config(config, command_line_substitutions, skip_external_update)
 | |
|     except EsphomeError:
 | |
|         raise
 | |
|     except Exception:
 | |
|         _LOGGER.error("Unexpected exception while reading configuration:")
 | |
|         raise
 | |
| 
 | |
| 
 | |
| def load_config(
 | |
|     command_line_substitutions: dict[str, Any], skip_external_update: bool = False
 | |
| ) -> Config:
 | |
|     try:
 | |
|         return _load_config(command_line_substitutions, skip_external_update)
 | |
|     except vol.Invalid as err:
 | |
|         raise EsphomeError(f"Error while parsing config: {err}") from err
 | |
| 
 | |
| 
 | |
| def line_info(config, path, highlight=True):
 | |
|     """Display line config source."""
 | |
|     if not highlight:
 | |
|         return None
 | |
|     obj = config.get_deepest_document_range_for_path(path)
 | |
|     if obj:
 | |
|         mark = obj.start_mark
 | |
|         source = f"[source {mark.document}:{mark.line + 1}]"
 | |
|         return color(AnsiFore.CYAN, source)
 | |
|     return "None"
 | |
| 
 | |
| 
 | |
| def _print_on_next_line(obj):
 | |
|     if isinstance(obj, (list, tuple, dict)):
 | |
|         return True
 | |
|     if isinstance(obj, str):
 | |
|         return len(obj) > 80
 | |
|     if isinstance(obj, core.Lambda):
 | |
|         return len(obj.value) > 80
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def dump_dict(
 | |
|     config: Config, path: ConfigPath, at_root: bool = True
 | |
| ) -> tuple[str, bool]:
 | |
|     conf = config.get_nested_item(path)
 | |
|     ret = ""
 | |
|     multiline = False
 | |
| 
 | |
|     if at_root:
 | |
|         error = config.get_error_for_path(path)
 | |
|         if error is not None:
 | |
|             ret += f"\n{color(AnsiFore.BOLD_RED, _format_vol_invalid(error, config))}\n"
 | |
| 
 | |
|     if isinstance(conf, (list, tuple)):
 | |
|         multiline = True
 | |
|         if not conf:
 | |
|             ret += "[]"
 | |
|             multiline = False
 | |
| 
 | |
|         for i in range(len(conf)):
 | |
|             path_ = path + [i]
 | |
|             error = config.get_error_for_path(path_)
 | |
|             if error is not None:
 | |
|                 ret += f"\n{color(AnsiFore.BOLD_RED, _format_vol_invalid(error, config))}\n"
 | |
| 
 | |
|             sep = "- "
 | |
|             if config.is_in_error_path(path_):
 | |
|                 sep = color(AnsiFore.RED, sep)
 | |
|             msg, _ = dump_dict(config, path_, at_root=False)
 | |
|             msg = indent(msg)
 | |
|             inf = line_info(config, path_, highlight=config.is_in_error_path(path_))
 | |
|             if inf is not None:
 | |
|                 msg = f"{inf}\n{msg}"
 | |
|             elif msg:
 | |
|                 msg = msg[2:]
 | |
|             ret += f"{sep + msg}\n"
 | |
|     elif isinstance(conf, dict):
 | |
|         multiline = True
 | |
|         if not conf:
 | |
|             ret += "{}"
 | |
|             multiline = False
 | |
| 
 | |
|         for k in conf:
 | |
|             path_ = path + [k]
 | |
|             error = config.get_error_for_path(path_)
 | |
|             if error is not None:
 | |
|                 ret += f"\n{color(AnsiFore.BOLD_RED, _format_vol_invalid(error, config))}\n"
 | |
| 
 | |
|             st = f"{k}: "
 | |
|             if config.is_in_error_path(path_):
 | |
|                 st = color(AnsiFore.RED, st)
 | |
|             msg, m = dump_dict(config, path_, at_root=False)
 | |
| 
 | |
|             inf = line_info(config, path_, highlight=config.is_in_error_path(path_))
 | |
|             if m:
 | |
|                 msg = f"\n{indent(msg)}"
 | |
| 
 | |
|             if inf is not None:
 | |
|                 msg = f" {inf}{msg}" if m else f"{msg} {inf}"
 | |
|             ret += f"{st + msg}\n"
 | |
|     elif isinstance(conf, str):
 | |
|         if is_secret(conf):
 | |
|             conf = f"!secret {is_secret(conf)}"
 | |
|         if not conf:
 | |
|             conf += "''"
 | |
| 
 | |
|         if len(conf) > 80:
 | |
|             conf = f"|-\n{indent(conf)}"
 | |
|         error = config.get_error_for_path(path)
 | |
|         col = AnsiFore.BOLD_RED if error else AnsiFore.KEEP
 | |
|         ret += color(col, str(conf))
 | |
|     elif isinstance(conf, core.Lambda):
 | |
|         if is_secret(conf):
 | |
|             conf = f"!secret {is_secret(conf)}"
 | |
| 
 | |
|         conf = f"!lambda |-\n{indent(str(conf.value))}"
 | |
|         error = config.get_error_for_path(path)
 | |
|         col = AnsiFore.BOLD_RED if error else AnsiFore.KEEP
 | |
|         ret += color(col, conf)
 | |
|     elif conf is None:
 | |
|         pass
 | |
|     else:
 | |
|         error = config.get_error_for_path(path)
 | |
|         col = AnsiFore.BOLD_RED if error else AnsiFore.KEEP
 | |
|         ret += color(col, str(conf))
 | |
|         multiline = "\n" in ret
 | |
| 
 | |
|     return ret, multiline
 | |
| 
 | |
| 
 | |
| def strip_default_ids(config):
 | |
|     if isinstance(config, list):
 | |
|         to_remove = []
 | |
|         for i, x in enumerate(config):
 | |
|             x = config[i] = strip_default_ids(x)
 | |
|             if (isinstance(x, core.ID) and not x.is_manual) or isinstance(
 | |
|                 x, core.AutoLoad
 | |
|             ):
 | |
|                 to_remove.append(x)
 | |
|         for x in to_remove:
 | |
|             config.remove(x)
 | |
|     elif isinstance(config, dict):
 | |
|         to_remove = []
 | |
|         for k, v in config.items():
 | |
|             v = config[k] = strip_default_ids(v)
 | |
|             if (isinstance(v, core.ID) and not v.is_manual) or isinstance(
 | |
|                 v, core.AutoLoad
 | |
|             ):
 | |
|                 to_remove.append(k)
 | |
|         for k in to_remove:
 | |
|             config.pop(k)
 | |
|     return config
 | |
| 
 | |
| 
 | |
| def read_config(command_line_substitutions, skip_external_update=False):
 | |
|     _LOGGER.info("Reading configuration %s...", CORE.config_path)
 | |
|     try:
 | |
|         res = load_config(command_line_substitutions, skip_external_update)
 | |
|     except EsphomeError as err:
 | |
|         _LOGGER.error("Error while reading config: %s", err)
 | |
|         return None
 | |
|     if res.errors:
 | |
|         if not CORE.verbose:
 | |
|             res = strip_default_ids(res)
 | |
| 
 | |
|         safe_print(color(AnsiFore.BOLD_RED, "Failed config"))
 | |
|         safe_print("")
 | |
|         for path, domain in res.output_paths:
 | |
|             if not res.is_in_error_path(path):
 | |
|                 continue
 | |
| 
 | |
|             errstr = color(AnsiFore.BOLD_RED, f"{domain}:")
 | |
|             errline = line_info(res, path)
 | |
|             if errline:
 | |
|                 errstr += f" {errline}"
 | |
|             safe_print(errstr)
 | |
|             split_dump = dump_dict(res, path)[0].splitlines()
 | |
|             # find the last error message
 | |
|             i = len(split_dump) - 1
 | |
|             while i > 10 and "\033[" not in split_dump[i]:
 | |
|                 i = i - 1
 | |
|             # discard lines more than 4 beyond the last error
 | |
|             i = min(i + 4, len(split_dump))
 | |
|             safe_print(indent("\n".join(split_dump[:i])))
 | |
| 
 | |
|         for err in res.errors:
 | |
|             safe_print(color(AnsiFore.BOLD_RED, err.msg))
 | |
|             safe_print("")
 | |
| 
 | |
|         return None
 | |
|     return res
 |