From 024c87a80bd80a76bfae56e14ced9d1cc9f5a4bf Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 6 Feb 2026 14:16:23 +0100 Subject: [PATCH] bundles --- esphome/__main__.py | 61 +++ esphome/bundle.py | 660 ++++++++++++++++++++++++++++ esphome/components/wifi/wpa2_eap.py | 4 +- 3 files changed, 723 insertions(+), 2 deletions(-) create mode 100644 esphome/bundle.py diff --git a/esphome/__main__.py b/esphome/__main__.py index c86b5604e1..9e06e63d2d 100644 --- a/esphome/__main__.py +++ b/esphome/__main__.py @@ -965,6 +965,38 @@ def command_clean(args: ArgsProtocol, config: ConfigType) -> int | None: return 0 +def command_bundle(args: ArgsProtocol, config: ConfigType) -> int | None: + from esphome.bundle import ConfigBundleCreator + + creator = ConfigBundleCreator(config) + + if args.list_only: + files = creator.discover_files() + for bf in sorted(files, key=lambda f: f.path): + safe_print(f" {bf.path}") + _LOGGER.info("Found %d files", len(files)) + return 0 + + result = creator.create_bundle() + + if args.output: + output_path = Path(args.output) + else: + stem = CORE.config_path.stem + output_path = CORE.config_dir / f"{stem}.bundle.tar.gz" + + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(result.data) + + _LOGGER.info( + "Bundle created: %s (%d files, %.1f KB)", + output_path, + len(result.files), + len(result.data) / 1024, + ) + return 0 + + def command_dashboard(args: ArgsProtocol) -> int | None: from esphome.dashboard import dashboard @@ -1242,6 +1274,7 @@ POST_CONFIG_ACTIONS = { "rename": command_rename, "discover": command_discover, "analyze-memory": command_analyze_memory, + "bundle": command_bundle, } SIMPLE_CONFIG_ACTIONS = [ @@ -1545,6 +1578,24 @@ def parse_args(argv): "configuration", help="Your YAML configuration file(s).", nargs="+" ) + parser_bundle = subparsers.add_parser( + "bundle", + help="Create a self-contained config bundle for remote compilation.", + ) + parser_bundle.add_argument( + "configuration", help="Your YAML configuration file(s).", nargs="+" + ) + parser_bundle.add_argument( + "-o", + "--output", + help="Output path for the bundle archive.", + ) + parser_bundle.add_argument( + "--list-only", + help="List discovered files without creating the archive.", + action="store_true", + ) + # Keep backward compatibility with the old command line format of # esphome . # @@ -1623,6 +1674,16 @@ def run_esphome(argv): _LOGGER.warning("Skipping secrets file %s", conf_path) return 0 + # Bundle support: if the configuration is a .tar.gz bundle, extract it + # and rewrite conf_path to the extracted YAML config. + from esphome.bundle import is_bundle_path, prepare_bundle_for_compile + + if is_bundle_path(conf_path): + _LOGGER.info("Extracting config bundle %s...", conf_path) + conf_path = prepare_bundle_for_compile(conf_path) + # Update the argument so downstream code sees the extracted path + args.configuration[0] = str(conf_path) + CORE.config_path = conf_path CORE.dashboard = args.dashboard diff --git a/esphome/bundle.py b/esphome/bundle.py new file mode 100644 index 0000000000..a74e13858e --- /dev/null +++ b/esphome/bundle.py @@ -0,0 +1,660 @@ +"""Config bundle creator and extractor for ESPHome. + +A bundle is a self-contained .tar.gz archive containing a YAML config +and every local file it depends on. Bundles can be created from a config +and compiled directly: ``esphome compile my_device.bundle.tar.gz`` +""" + +from __future__ import annotations + +import contextlib +from contextlib import contextmanager +from dataclasses import dataclass +import io +import json +import logging +from pathlib import Path +import re +import shutil +import tarfile +from typing import Any + +from esphome import const, yaml_util +from esphome.core import CORE, EsphomeError + +_LOGGER = logging.getLogger(__name__) + +MANIFEST_FILENAME = "manifest.json" +CURRENT_MANIFEST_VERSION = 1 +MAX_DECOMPRESSED_SIZE = 500 * 1024 * 1024 # 500 MB + +# String prefixes that are never local file paths +_NON_PATH_PREFIXES = ("http://", "https://", "ftp://", "mdi:", "<") + +# File extensions recognized when resolving relative path strings. +# A relative string with one of these extensions is resolved against the +# config directory and included if the file exists. +_KNOWN_FILE_EXTENSIONS = frozenset( + { + # Fonts + ".ttf", + ".otf", + ".woff", + ".woff2", + ".pcf", + ".bdf", + # Images + ".png", + ".jpg", + ".jpeg", + ".bmp", + ".gif", + ".svg", + ".ico", + ".webp", + # Certificates + ".pem", + ".crt", + ".key", + ".der", + ".p12", + ".pfx", + # C/C++ includes + ".h", + ".hpp", + ".c", + ".cpp", + ".ino", + # Web assets + ".css", + ".js", + ".html", + } +) + + +_SECRET_RE = re.compile(r"!secret\s+(\S+)") + + +def _find_used_secret_keys(yaml_files: list[Path]) -> set[str]: + """Scan YAML files for ``!secret `` references.""" + keys: set[str] = set() + for fpath in yaml_files: + try: + text = fpath.read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError): + continue + for match in _SECRET_RE.finditer(text): + keys.add(match.group(1)) + return keys + + +@contextmanager +def _track_yaml_loads(): + """Context manager that tracks all files loaded by yaml_util. + + Yields a list that is populated with resolved Path objects + for every file loaded through _load_yaml_internal. + """ + loaded_files: list[Path] = [] + # pylint: disable=protected-access + original_loader = yaml_util._load_yaml_internal + + def tracking_loader(fname: Path) -> Any: + loaded_files.append(Path(fname).resolve()) + return original_loader(fname) + + yaml_util._load_yaml_internal = tracking_loader + try: + yield loaded_files + finally: + yaml_util._load_yaml_internal = original_loader + # pylint: enable=protected-access + + +@dataclass +class BundleFile: + """A file to include in the bundle.""" + + path: str # Relative path inside the archive + source: Path # Absolute path on disk + + +@dataclass +class BundleResult: + """Result of creating a bundle.""" + + data: bytes + manifest: dict[str, Any] + files: list[BundleFile] + + +@dataclass +class BundleManifest: + """Parsed and validated bundle manifest.""" + + manifest_version: int + esphome_version: str + config_filename: str + files: list[str] + has_secrets: bool + + +class ConfigBundleCreator: + """Creates a self-contained bundle from an ESPHome config.""" + + def __init__(self, config: dict[str, Any]) -> None: + self._config = config + self._config_dir = CORE.config_dir + self._config_path = CORE.config_path + self._files: list[BundleFile] = [] + self._seen_paths: set[Path] = set() + self._secrets_paths: list[Path] = [] + + def discover_files(self) -> list[BundleFile]: + """Discover all files needed for the bundle.""" + self._files = [] + self._seen_paths = set() + + # The main config file + self._add_file(self._config_path) + + # Phase 1: YAML includes (tracked during config loading) + self._discover_yaml_includes() + + # Phase 2: Component-referenced files from validated config + self._discover_component_files() + + return list(self._files) + + def create_bundle(self) -> BundleResult: + """Create the bundle archive.""" + files = self.discover_files() + + # Determine which secret keys are actually referenced by the + # bundled YAML files so we only ship those, not the entire + # secrets.yaml which may contain secrets for other devices. + yaml_sources = [ + bf.source for bf in files if bf.source.suffix in (".yaml", ".yml") + ] + used_secret_keys = _find_used_secret_keys(yaml_sources) + filtered_secrets = self._build_filtered_secrets(used_secret_keys) + + manifest = self._build_manifest(files, has_secrets=bool(filtered_secrets)) + + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + # Add manifest first + manifest_data = json.dumps(manifest, indent=2).encode("utf-8") + _add_bytes_to_tar(tar, MANIFEST_FILENAME, manifest_data) + + # Add filtered secrets files + for rel_path, data in sorted(filtered_secrets.items()): + _add_bytes_to_tar(tar, rel_path, data) + + # Add files in sorted order for determinism, skipping secrets + # files which were already added above with filtered content + secrets_resolved = {p.resolve() for p in self._secrets_paths} + for bf in sorted(files, key=lambda f: f.path): + if bf.source in secrets_resolved: + continue + self._add_to_tar(tar, bf) + + return BundleResult(data=buf.getvalue(), manifest=manifest, files=files) + + def _add_file(self, abs_path: Path) -> bool: + """Add a file to the bundle. Returns False if already added.""" + abs_path = abs_path.resolve() + if abs_path in self._seen_paths: + return False + if not abs_path.is_file(): + _LOGGER.warning("Bundle: skipping missing file %s", abs_path) + return False + + rel_path = self._relative_to_config_dir(abs_path) + if rel_path is None: + _LOGGER.warning( + "Bundle: skipping file outside config directory: %s", abs_path + ) + return False + + self._seen_paths.add(abs_path) + self._files.append(BundleFile(path=rel_path, source=abs_path)) + return True + + def _add_directory(self, abs_path: Path) -> None: + """Recursively add all files in a directory.""" + abs_path = abs_path.resolve() + if not abs_path.is_dir(): + _LOGGER.warning("Bundle: skipping missing directory %s", abs_path) + return + for child in sorted(abs_path.rglob("*")): + if child.is_file(): + self._add_file(child) + + def _relative_to_config_dir(self, abs_path: Path) -> str | None: + """Get a path relative to the config directory. Returns None if outside.""" + try: + return str(abs_path.relative_to(self._config_dir)) + except ValueError: + return None + + def _discover_yaml_includes(self) -> None: + """Discover YAML files loaded during config parsing. + + We track files by wrapping _load_yaml_internal. The config has already + been loaded at this point (bundle is a POST_CONFIG_ACTION), so we + re-load just to discover the file list. + + Secrets files are tracked separately so we can filter them to + only include the keys this config actually references. + """ + with _track_yaml_loads() as loaded_files, contextlib.suppress(Exception): + yaml_util.load_yaml(self._config_path) + + for fpath in loaded_files: + if fpath == self._config_path.resolve(): + continue # Already added as config + if fpath.name in const.SECRETS_FILES: + self._secrets_paths.append(fpath) + self._add_file(fpath) + + def _discover_component_files(self) -> None: + """Walk the validated config for file references. + + Uses a generic recursive walk to find file paths instead of + hardcoding per-component knowledge about config dict formats. + After validation, components typically resolve paths to absolute + using CORE.relative_config_path() or cv.file_(). Relative paths + with known file extensions are also resolved and checked. + + Core ESPHome concepts that use relative paths or directories + are handled explicitly. + """ + config = self._config + + # Generic walk: find all file paths in the validated config + self._walk_config_for_files(config) + + # --- Core ESPHome concepts needing explicit handling --- + + # esphome.includes / includes_c - can be relative paths and directories + esphome_conf = config.get(const.CONF_ESPHOME, {}) + for include_path in esphome_conf.get("includes", []): + resolved = _resolve_include_path(include_path) + if resolved is None: + continue + if resolved.is_dir(): + self._add_directory(resolved) + else: + self._add_file(resolved) + for include_path in esphome_conf.get("includes_c", []): + resolved = _resolve_include_path(include_path) + if resolved is not None: + self._add_file(resolved) + + # external_components with source: local - directories + for ext_conf in config.get("external_components", []): + source = ext_conf.get("source", {}) + if source.get("type") == "local": + path = source.get("path") + if path: + p = Path(path) + if not p.is_absolute(): + p = CORE.relative_config_path(p) + self._add_directory(p) + + # custom_components/ directory if present (deprecated but still supported) + custom_dir = self._config_dir / "custom_components" + if custom_dir.is_dir(): + self._add_directory(custom_dir) + + def _walk_config_for_files(self, obj: Any) -> None: + """Recursively walk the config dict looking for file path references.""" + if isinstance(obj, dict): + for value in obj.values(): + self._walk_config_for_files(value) + elif isinstance(obj, (list, tuple)): + for item in obj: + self._walk_config_for_files(item) + elif isinstance(obj, Path): + if obj.is_absolute() and obj.is_file(): + self._add_file(obj) + elif isinstance(obj, str): + self._check_string_path(obj) + + def _check_string_path(self, value: str) -> None: + """Check if a string value is a local file reference.""" + if value.startswith(_NON_PATH_PREFIXES): + return + # Skip multi-line strings (lambdas, templates) + if "\n" in value or len(value) < 2: + return + + p = Path(value) + + # Absolute path - check if it points to an existing file + if p.is_absolute(): + if p.is_file(): + self._add_file(p) + return + + # Relative path with a known file extension - likely a component + # validator that forgot to resolve to absolute via cv.file_() or + # CORE.relative_config_path(). Warn and try to resolve. + if p.suffix.lower() in _KNOWN_FILE_EXTENSIONS: + _LOGGER.warning( + "Bundle: non-absolute path in validated config: %s " + "(component validator should return absolute paths)", + value, + ) + resolved = CORE.relative_config_path(p) + if resolved.is_file(): + self._add_file(resolved) + + def _build_filtered_secrets(self, used_keys: set[str]) -> dict[str, bytes]: + """Build filtered secrets files containing only the referenced keys. + + Returns a dict mapping relative archive path to YAML bytes. + """ + if not used_keys or not self._secrets_paths: + return {} + + result: dict[str, bytes] = {} + for secrets_path in self._secrets_paths: + rel_path = self._relative_to_config_dir(secrets_path) + if rel_path is None: + continue + try: + all_secrets = yaml_util.load_yaml(secrets_path, clear_secrets=False) + except EsphomeError: + _LOGGER.warning("Bundle: failed to load secrets file %s", secrets_path) + continue + if not isinstance(all_secrets, dict): + continue + filtered = {k: v for k, v in all_secrets.items() if k in used_keys} + if filtered: + data = yaml_util.dump(filtered).encode("utf-8") + result[rel_path] = data + return result + + def _build_manifest( + self, files: list[BundleFile], *, has_secrets: bool + ) -> dict[str, Any]: + """Build the manifest.json content.""" + return { + "manifest_version": CURRENT_MANIFEST_VERSION, + "esphome_version": const.__version__, + "config_filename": self._config_path.name, + "files": [f.path for f in files], + "has_secrets": has_secrets, + } + + @staticmethod + def _add_to_tar(tar: tarfile.TarFile, bf: BundleFile) -> None: + """Add a BundleFile to the tar archive with deterministic metadata.""" + with open(bf.source, "rb") as f: + _add_bytes_to_tar(tar, bf.path, f.read()) + + +def extract_bundle( + bundle_path: Path, + target_dir: Path | None = None, +) -> Path: + """Extract a bundle archive and return the path to the config YAML. + + Security: validates no path traversal, no symlinks, no absolute paths, + and enforces size limits. + + Args: + bundle_path: Path to the .tar.gz bundle file. + target_dir: Directory to extract into. If None, extracts next to + the bundle file in a directory named after it. + + Returns: + Absolute path to the extracted config YAML file. + + Raises: + EsphomeError: If the bundle is invalid or extraction fails. + """ + + bundle_path = bundle_path.resolve() + if not bundle_path.is_file(): + raise EsphomeError(f"Bundle file not found: {bundle_path}") + + if target_dir is None: + target_dir = _default_target_dir(bundle_path) + + target_dir = target_dir.resolve() + target_dir.mkdir(parents=True, exist_ok=True) + + # Read and validate the archive + try: + with tarfile.open(bundle_path, "r:gz") as tar: + manifest = _read_manifest_from_tar(tar) + _validate_tar_members(tar, target_dir) + tar.extractall(path=target_dir, filter="data") + except tarfile.TarError as err: + raise EsphomeError(f"Failed to extract bundle: {err}") from err + + config_filename = manifest["config_filename"] + config_path = target_dir / config_filename + if not config_path.is_file(): + raise EsphomeError( + f"Bundle manifest references config '{config_filename}' " + f"but it was not found in the archive" + ) + + return config_path + + +def read_bundle_manifest(bundle_path: Path) -> BundleManifest: + """Read and validate the manifest from a bundle without full extraction. + + Args: + bundle_path: Path to the .tar.gz bundle file. + + Returns: + Parsed BundleManifest. + + Raises: + EsphomeError: If the manifest is missing, invalid, or version unsupported. + """ + + try: + with tarfile.open(bundle_path, "r:gz") as tar: + manifest = _read_manifest_from_tar(tar) + except tarfile.TarError as err: + raise EsphomeError(f"Failed to read bundle: {err}") from err + + return BundleManifest( + manifest_version=manifest["manifest_version"], + esphome_version=manifest.get("esphome_version", "unknown"), + config_filename=manifest["config_filename"], + files=manifest.get("files", []), + has_secrets=manifest.get("has_secrets", False), + ) + + +def _read_manifest_from_tar(tar: tarfile.TarFile) -> dict[str, Any]: + """Read and validate manifest.json from an open tar archive.""" + + try: + member = tar.getmember(MANIFEST_FILENAME) + except KeyError: + raise EsphomeError("Invalid bundle: missing manifest.json") from None + + f = tar.extractfile(member) + if f is None: + raise EsphomeError("Invalid bundle: manifest.json is not a regular file") + + try: + manifest = json.loads(f.read()) + except (json.JSONDecodeError, UnicodeDecodeError) as err: + raise EsphomeError(f"Invalid bundle: malformed manifest.json: {err}") from err + + # Version check + version = manifest.get("manifest_version") + if version is None: + raise EsphomeError("Invalid bundle: manifest.json missing 'manifest_version'") + if not isinstance(version, int) or version < 1: + raise EsphomeError( + f"Invalid bundle: manifest_version must be a positive integer, got {version!r}" + ) + if version > CURRENT_MANIFEST_VERSION: + raise EsphomeError( + f"Bundle manifest version {version} is newer than this ESPHome " + f"version supports (max {CURRENT_MANIFEST_VERSION}). " + f"Please upgrade ESPHome to compile this bundle." + ) + + # Required fields + if "config_filename" not in manifest: + raise EsphomeError("Invalid bundle: manifest.json missing 'config_filename'") + + return manifest + + +def _validate_tar_members(tar: tarfile.TarFile, target_dir: Path) -> None: + """Validate tar members for security issues.""" + + total_size = 0 + for member in tar.getmembers(): + # Reject absolute paths + if member.name.startswith("/") or member.name.startswith("\\"): + raise EsphomeError( + f"Invalid bundle: absolute path in archive: {member.name}" + ) + + # Reject path traversal + if ".." in member.name.split("/"): + raise EsphomeError( + f"Invalid bundle: path traversal in archive: {member.name}" + ) + + # Reject symlinks + if member.issym() or member.islnk(): + raise EsphomeError(f"Invalid bundle: symlink in archive: {member.name}") + + # Ensure extraction stays within target_dir + target_path = (target_dir / member.name).resolve() + if not str(target_path).startswith(str(target_dir)): + raise EsphomeError( + f"Invalid bundle: file would extract outside target: {member.name}" + ) + + # Track total decompressed size + total_size += member.size + if total_size > MAX_DECOMPRESSED_SIZE: + raise EsphomeError( + f"Invalid bundle: decompressed size exceeds " + f"{MAX_DECOMPRESSED_SIZE // (1024 * 1024)}MB limit" + ) + + +def is_bundle_path(path: Path) -> bool: + """Check if a path looks like a bundle file.""" + name = path.name.lower() + return name.endswith(".tar.gz") or name.endswith(".tgz") + + +def _add_bytes_to_tar(tar: tarfile.TarFile, name: str, data: bytes) -> None: + """Add in-memory bytes to a tar archive with deterministic metadata.""" + info = tarfile.TarInfo(name=name) + info.size = len(data) + info.mtime = 0 + info.uid = 0 + info.gid = 0 + info.mode = 0o644 + tar.addfile(info, io.BytesIO(data)) + + +def _resolve_include_path(include_path: Any) -> Path | None: + """Resolve an include path to absolute, skipping system includes.""" + if isinstance(include_path, str) and include_path.startswith("<"): + return None # System include, not a local file + p = Path(include_path) + if not p.is_absolute(): + p = CORE.relative_config_path(p) + return p + + +def _default_target_dir(bundle_path: Path) -> Path: + """Compute the default extraction directory for a bundle.""" + return bundle_path.parent / bundle_path.name.removesuffix(".tar.gz").removesuffix( + ".gz" + ) + + +def _restore_preserved_dirs(preserved: dict[str, Path], target_dir: Path) -> None: + """Move preserved build cache directories back into target_dir.""" + for dirname, src in preserved.items(): + dst = target_dir / dirname + if not dst.exists(): + shutil.move(str(src), str(dst)) + + +def prepare_bundle_for_compile( + bundle_path: Path, + target_dir: Path | None = None, +) -> Path: + """Extract a bundle for compilation, preserving build caches. + + Unlike extract_bundle(), this preserves .esphome/ and .pioenvs/ + directories in the target if they already exist (for incremental builds). + + Args: + bundle_path: Path to the .tar.gz bundle file. + target_dir: Directory to extract into. Must be specified for + build server use. + + Returns: + Absolute path to the extracted config YAML file. + """ + + bundle_path = bundle_path.resolve() + if not bundle_path.is_file(): + raise EsphomeError(f"Bundle file not found: {bundle_path}") + + if target_dir is None: + target_dir = _default_target_dir(bundle_path) + + target_dir = target_dir.resolve() + target_dir.mkdir(parents=True, exist_ok=True) + + # Identify directories to preserve (build caches) + preserve_dirs = [".esphome", ".pioenvs", ".pio"] + preserved: dict[str, Path] = {} + + # Temporarily move preserved dirs out of the way + staging = target_dir / ".bundle_staging" + for dirname in preserve_dirs: + src = target_dir / dirname + if src.is_dir(): + dst = staging / dirname + dst.parent.mkdir(parents=True, exist_ok=True) + shutil.move(str(src), str(dst)) + preserved[dirname] = dst + + try: + # Clean non-preserved content and extract fresh + for item in target_dir.iterdir(): + if item.name == ".bundle_staging": + continue + if item.is_dir(): + shutil.rmtree(item) + else: + item.unlink() + + config_path = extract_bundle(bundle_path, target_dir) + except Exception: + # On extraction failure, restore preserved dirs + _restore_preserved_dirs(preserved, target_dir) + raise + finally: + # Restore preserved dirs and clean staging + _restore_preserved_dirs(preserved, target_dir) + if staging.is_dir(): + shutil.rmtree(staging) + + return config_path diff --git a/esphome/components/wifi/wpa2_eap.py b/esphome/components/wifi/wpa2_eap.py index 5d5bd8dca3..986c88d0c6 100644 --- a/esphome/components/wifi/wpa2_eap.py +++ b/esphome/components/wifi/wpa2_eap.py @@ -72,8 +72,8 @@ def _validate_load_certificate(value): def validate_certificate(value): _validate_load_certificate(value) - # Validation result should be the path, not the loaded certificate - return value + # Return the resolved absolute path, not the loaded certificate + return str(cv.file_(value)) def _validate_load_private_key(key, cert_pw):