mirror of
https://github.com/esphome/esphome.git
synced 2026-02-09 01:01:56 +00:00
Compare commits
19 Commits
http_reque
...
esphome_bu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5a711e455a | ||
|
|
38b6746807 | ||
|
|
1b8153bd46 | ||
|
|
f660a62deb | ||
|
|
db92aca490 | ||
|
|
6a26136c34 | ||
|
|
ba07f39c05 | ||
|
|
d00af090eb | ||
|
|
49e7052562 | ||
|
|
805d335a5d | ||
|
|
001901631f | ||
|
|
0b2a8c9e27 | ||
|
|
ff783fd9fa | ||
|
|
b4c707b440 | ||
|
|
23d96bf196 | ||
|
|
51cbb3e6b2 | ||
|
|
475ece94ac | ||
|
|
136606a435 | ||
|
|
024c87a80b |
@@ -965,6 +965,38 @@ def command_clean(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||
return 0
|
||||
|
||||
|
||||
def command_bundle(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||
from esphome.bundle import BUNDLE_EXTENSION, ConfigBundleCreator
|
||||
|
||||
creator = ConfigBundleCreator(config)
|
||||
|
||||
if args.list_only:
|
||||
files = creator.discover_files()
|
||||
for bf in sorted(files, key=lambda f: f.path):
|
||||
safe_print(f" {bf.path}")
|
||||
_LOGGER.info("Found %d files", len(files))
|
||||
return 0
|
||||
|
||||
result = creator.create_bundle()
|
||||
|
||||
if args.output:
|
||||
output_path = Path(args.output)
|
||||
else:
|
||||
stem = CORE.config_path.stem
|
||||
output_path = CORE.config_dir / f"{stem}{BUNDLE_EXTENSION}"
|
||||
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_bytes(result.data)
|
||||
|
||||
_LOGGER.info(
|
||||
"Bundle created: %s (%d files, %.1f KB)",
|
||||
output_path,
|
||||
len(result.files),
|
||||
len(result.data) / 1024,
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
def command_dashboard(args: ArgsProtocol) -> int | None:
|
||||
from esphome.dashboard import dashboard
|
||||
|
||||
@@ -1242,6 +1274,7 @@ POST_CONFIG_ACTIONS = {
|
||||
"rename": command_rename,
|
||||
"discover": command_discover,
|
||||
"analyze-memory": command_analyze_memory,
|
||||
"bundle": command_bundle,
|
||||
}
|
||||
|
||||
SIMPLE_CONFIG_ACTIONS = [
|
||||
@@ -1545,6 +1578,24 @@ def parse_args(argv):
|
||||
"configuration", help="Your YAML configuration file(s).", nargs="+"
|
||||
)
|
||||
|
||||
parser_bundle = subparsers.add_parser(
|
||||
"bundle",
|
||||
help="Create a self-contained config bundle for remote compilation.",
|
||||
)
|
||||
parser_bundle.add_argument(
|
||||
"configuration", help="Your YAML configuration file(s).", nargs="+"
|
||||
)
|
||||
parser_bundle.add_argument(
|
||||
"-o",
|
||||
"--output",
|
||||
help="Output path for the bundle archive.",
|
||||
)
|
||||
parser_bundle.add_argument(
|
||||
"--list-only",
|
||||
help="List discovered files without creating the archive.",
|
||||
action="store_true",
|
||||
)
|
||||
|
||||
# Keep backward compatibility with the old command line format of
|
||||
# esphome <config> <command>.
|
||||
#
|
||||
@@ -1623,6 +1674,16 @@ def run_esphome(argv):
|
||||
_LOGGER.warning("Skipping secrets file %s", conf_path)
|
||||
return 0
|
||||
|
||||
# Bundle support: if the configuration is a .esphomebundle, extract it
|
||||
# and rewrite conf_path to the extracted YAML config.
|
||||
from esphome.bundle import is_bundle_path, prepare_bundle_for_compile
|
||||
|
||||
if is_bundle_path(conf_path):
|
||||
_LOGGER.info("Extracting config bundle %s...", conf_path)
|
||||
conf_path = prepare_bundle_for_compile(conf_path)
|
||||
# Update the argument so downstream code sees the extracted path
|
||||
args.configuration[0] = str(conf_path)
|
||||
|
||||
CORE.config_path = conf_path
|
||||
CORE.dashboard = args.dashboard
|
||||
|
||||
|
||||
699
esphome/bundle.py
Normal file
699
esphome/bundle.py
Normal file
@@ -0,0 +1,699 @@
|
||||
"""Config bundle creator and extractor for ESPHome.
|
||||
|
||||
A bundle is a self-contained .tar.gz archive containing a YAML config
|
||||
and every local file it depends on. Bundles can be created from a config
|
||||
and compiled directly: ``esphome compile my_device.esphomebundle.tar.gz``
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import StrEnum
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
import re
|
||||
import shutil
|
||||
import tarfile
|
||||
from typing import Any
|
||||
|
||||
from esphome import const, yaml_util
|
||||
from esphome.const import (
|
||||
CONF_ESPHOME,
|
||||
CONF_EXTERNAL_COMPONENTS,
|
||||
CONF_INCLUDES,
|
||||
CONF_INCLUDES_C,
|
||||
CONF_PATH,
|
||||
CONF_SOURCE,
|
||||
CONF_TYPE,
|
||||
)
|
||||
from esphome.core import CORE, EsphomeError
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
BUNDLE_EXTENSION = ".esphomebundle.tar.gz"
|
||||
MANIFEST_FILENAME = "manifest.json"
|
||||
CURRENT_MANIFEST_VERSION = 1
|
||||
MAX_DECOMPRESSED_SIZE = 500 * 1024 * 1024 # 500 MB
|
||||
MAX_MANIFEST_SIZE = 1024 * 1024 # 1 MB
|
||||
|
||||
# Directories preserved across bundle extractions (build caches)
|
||||
_PRESERVE_DIRS = (".esphome", ".pioenvs", ".pio")
|
||||
_BUNDLE_STAGING_DIR = ".bundle_staging"
|
||||
|
||||
|
||||
class ManifestKey(StrEnum):
|
||||
"""Keys used in bundle manifest.json."""
|
||||
|
||||
MANIFEST_VERSION = "manifest_version"
|
||||
ESPHOME_VERSION = "esphome_version"
|
||||
CONFIG_FILENAME = "config_filename"
|
||||
FILES = "files"
|
||||
HAS_SECRETS = "has_secrets"
|
||||
|
||||
|
||||
# String prefixes that are never local file paths
|
||||
_NON_PATH_PREFIXES = ("http://", "https://", "ftp://", "mdi:", "<")
|
||||
|
||||
# File extensions recognized when resolving relative path strings.
|
||||
# A relative string with one of these extensions is resolved against the
|
||||
# config directory and included if the file exists.
|
||||
_KNOWN_FILE_EXTENSIONS = frozenset(
|
||||
{
|
||||
# Fonts
|
||||
".ttf",
|
||||
".otf",
|
||||
".woff",
|
||||
".woff2",
|
||||
".pcf",
|
||||
".bdf",
|
||||
# Images
|
||||
".png",
|
||||
".jpg",
|
||||
".jpeg",
|
||||
".bmp",
|
||||
".gif",
|
||||
".svg",
|
||||
".ico",
|
||||
".webp",
|
||||
# Certificates
|
||||
".pem",
|
||||
".crt",
|
||||
".key",
|
||||
".der",
|
||||
".p12",
|
||||
".pfx",
|
||||
# C/C++ includes
|
||||
".h",
|
||||
".hpp",
|
||||
".c",
|
||||
".cpp",
|
||||
".ino",
|
||||
# Web assets
|
||||
".css",
|
||||
".js",
|
||||
".html",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
# Matches !secret references in YAML text. This is intentionally a simple
|
||||
# regex scan rather than a YAML parse — it may match inside comments or
|
||||
# multi-line strings, which is the conservative direction (include more
|
||||
# secrets rather than fewer).
|
||||
_SECRET_RE = re.compile(r"!secret\s+(\S+)")
|
||||
|
||||
|
||||
def _find_used_secret_keys(yaml_files: list[Path]) -> set[str]:
|
||||
"""Scan YAML files for ``!secret <key>`` references."""
|
||||
keys: set[str] = set()
|
||||
for fpath in yaml_files:
|
||||
try:
|
||||
text = fpath.read_text(encoding="utf-8")
|
||||
except (OSError, UnicodeDecodeError):
|
||||
continue
|
||||
for match in _SECRET_RE.finditer(text):
|
||||
keys.add(match.group(1))
|
||||
return keys
|
||||
|
||||
|
||||
@dataclass
|
||||
class BundleFile:
|
||||
"""A file to include in the bundle."""
|
||||
|
||||
path: str # Relative path inside the archive
|
||||
source: Path # Absolute path on disk
|
||||
|
||||
|
||||
@dataclass
|
||||
class BundleResult:
|
||||
"""Result of creating a bundle."""
|
||||
|
||||
data: bytes
|
||||
manifest: dict[str, Any]
|
||||
files: list[BundleFile]
|
||||
|
||||
|
||||
@dataclass
|
||||
class BundleManifest:
|
||||
"""Parsed and validated bundle manifest."""
|
||||
|
||||
manifest_version: int
|
||||
esphome_version: str
|
||||
config_filename: str
|
||||
files: list[str]
|
||||
has_secrets: bool
|
||||
|
||||
|
||||
class ConfigBundleCreator:
|
||||
"""Creates a self-contained bundle from an ESPHome config."""
|
||||
|
||||
def __init__(self, config: dict[str, Any]) -> None:
|
||||
self._config = config
|
||||
self._config_dir = CORE.config_dir
|
||||
self._config_path = CORE.config_path
|
||||
self._files: list[BundleFile] = []
|
||||
self._seen_paths: set[Path] = set()
|
||||
self._secrets_paths: set[Path] = set()
|
||||
|
||||
def discover_files(self) -> list[BundleFile]:
|
||||
"""Discover all files needed for the bundle."""
|
||||
self._files = []
|
||||
self._seen_paths = set()
|
||||
self._secrets_paths = set()
|
||||
|
||||
# The main config file
|
||||
self._add_file(self._config_path)
|
||||
|
||||
# Phase 1: YAML includes (tracked during config loading)
|
||||
self._discover_yaml_includes()
|
||||
|
||||
# Phase 2: Component-referenced files from validated config
|
||||
self._discover_component_files()
|
||||
|
||||
return list(self._files)
|
||||
|
||||
def create_bundle(self) -> BundleResult:
|
||||
"""Create the bundle archive."""
|
||||
files = self.discover_files()
|
||||
|
||||
# Determine which secret keys are actually referenced by the
|
||||
# bundled YAML files so we only ship those, not the entire
|
||||
# secrets.yaml which may contain secrets for other devices.
|
||||
yaml_sources = [
|
||||
bf.source for bf in files if bf.source.suffix in (".yaml", ".yml")
|
||||
]
|
||||
used_secret_keys = _find_used_secret_keys(yaml_sources)
|
||||
filtered_secrets = self._build_filtered_secrets(used_secret_keys)
|
||||
|
||||
has_secrets = bool(filtered_secrets)
|
||||
if has_secrets:
|
||||
_LOGGER.warning(
|
||||
"Bundle contains secrets (e.g. Wi-Fi passwords). "
|
||||
"Do not share it with untrusted parties."
|
||||
)
|
||||
|
||||
manifest = self._build_manifest(files, has_secrets=has_secrets)
|
||||
|
||||
buf = io.BytesIO()
|
||||
with tarfile.open(fileobj=buf, mode="w:gz") as tar:
|
||||
# Add manifest first
|
||||
manifest_data = json.dumps(manifest, indent=2).encode("utf-8")
|
||||
_add_bytes_to_tar(tar, MANIFEST_FILENAME, manifest_data)
|
||||
|
||||
# Add filtered secrets files
|
||||
for rel_path, data in sorted(filtered_secrets.items()):
|
||||
_add_bytes_to_tar(tar, rel_path, data)
|
||||
|
||||
# Add files in sorted order for determinism, skipping secrets
|
||||
# files which were already added above with filtered content
|
||||
for bf in sorted(files, key=lambda f: f.path):
|
||||
if bf.source in self._secrets_paths:
|
||||
continue
|
||||
self._add_to_tar(tar, bf)
|
||||
|
||||
return BundleResult(data=buf.getvalue(), manifest=manifest, files=files)
|
||||
|
||||
def _add_file(self, abs_path: Path) -> bool:
|
||||
"""Add a file to the bundle. Returns False if already added."""
|
||||
abs_path = abs_path.resolve()
|
||||
if abs_path in self._seen_paths:
|
||||
return False
|
||||
if not abs_path.is_file():
|
||||
_LOGGER.warning("Bundle: skipping missing file %s", abs_path)
|
||||
return False
|
||||
|
||||
rel_path = self._relative_to_config_dir(abs_path)
|
||||
if rel_path is None:
|
||||
_LOGGER.warning(
|
||||
"Bundle: skipping file outside config directory: %s", abs_path
|
||||
)
|
||||
return False
|
||||
|
||||
self._seen_paths.add(abs_path)
|
||||
self._files.append(BundleFile(path=rel_path, source=abs_path))
|
||||
return True
|
||||
|
||||
def _add_directory(self, abs_path: Path) -> None:
|
||||
"""Recursively add all files in a directory."""
|
||||
abs_path = abs_path.resolve()
|
||||
if not abs_path.is_dir():
|
||||
_LOGGER.warning("Bundle: skipping missing directory %s", abs_path)
|
||||
return
|
||||
for child in sorted(abs_path.rglob("*")):
|
||||
if child.is_file() and "__pycache__" not in child.parts:
|
||||
self._add_file(child)
|
||||
|
||||
def _relative_to_config_dir(self, abs_path: Path) -> str | None:
|
||||
"""Get a path relative to the config directory. Returns None if outside.
|
||||
|
||||
Always uses forward slashes for consistency in tar archives.
|
||||
"""
|
||||
try:
|
||||
return abs_path.relative_to(self._config_dir).as_posix()
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def _discover_yaml_includes(self) -> None:
|
||||
"""Discover YAML files loaded during config parsing.
|
||||
|
||||
We track files by wrapping _load_yaml_internal. The config has already
|
||||
been loaded at this point (bundle is a POST_CONFIG_ACTION), so we
|
||||
re-load just to discover the file list.
|
||||
|
||||
Secrets files are tracked separately so we can filter them to
|
||||
only include the keys this config actually references.
|
||||
"""
|
||||
with yaml_util.track_yaml_loads() as loaded_files:
|
||||
try:
|
||||
yaml_util.load_yaml(self._config_path)
|
||||
except EsphomeError:
|
||||
_LOGGER.debug(
|
||||
"Bundle: re-loading YAML for include discovery failed, "
|
||||
"proceeding with partial file list"
|
||||
)
|
||||
|
||||
for fpath in loaded_files:
|
||||
if fpath == self._config_path.resolve():
|
||||
continue # Already added as config
|
||||
if fpath.name in const.SECRETS_FILES:
|
||||
self._secrets_paths.add(fpath)
|
||||
self._add_file(fpath)
|
||||
|
||||
def _discover_component_files(self) -> None:
|
||||
"""Walk the validated config for file references.
|
||||
|
||||
Uses a generic recursive walk to find file paths instead of
|
||||
hardcoding per-component knowledge about config dict formats.
|
||||
After validation, components typically resolve paths to absolute
|
||||
using CORE.relative_config_path() or cv.file_(). Relative paths
|
||||
with known file extensions are also resolved and checked.
|
||||
|
||||
Core ESPHome concepts that use relative paths or directories
|
||||
are handled explicitly.
|
||||
"""
|
||||
config = self._config
|
||||
|
||||
# Generic walk: find all file paths in the validated config
|
||||
self._walk_config_for_files(config)
|
||||
|
||||
# --- Core ESPHome concepts needing explicit handling ---
|
||||
|
||||
# esphome.includes / includes_c - can be relative paths and directories
|
||||
esphome_conf = config.get(CONF_ESPHOME, {})
|
||||
for include_path in esphome_conf.get(CONF_INCLUDES, []):
|
||||
resolved = _resolve_include_path(include_path)
|
||||
if resolved is None:
|
||||
continue
|
||||
if resolved.is_dir():
|
||||
self._add_directory(resolved)
|
||||
else:
|
||||
self._add_file(resolved)
|
||||
for include_path in esphome_conf.get(CONF_INCLUDES_C, []):
|
||||
resolved = _resolve_include_path(include_path)
|
||||
if resolved is not None:
|
||||
self._add_file(resolved)
|
||||
|
||||
# external_components with source: local - directories
|
||||
for ext_conf in config.get(CONF_EXTERNAL_COMPONENTS, []):
|
||||
source = ext_conf.get(CONF_SOURCE, {})
|
||||
if not isinstance(source, dict):
|
||||
continue
|
||||
if source.get(CONF_TYPE) != "local":
|
||||
continue
|
||||
path = source.get(CONF_PATH)
|
||||
if not path:
|
||||
continue
|
||||
p = Path(path)
|
||||
if not p.is_absolute():
|
||||
p = CORE.relative_config_path(p)
|
||||
self._add_directory(p)
|
||||
|
||||
def _walk_config_for_files(self, obj: Any) -> None:
|
||||
"""Recursively walk the config dict looking for file path references."""
|
||||
if isinstance(obj, dict):
|
||||
for value in obj.values():
|
||||
self._walk_config_for_files(value)
|
||||
elif isinstance(obj, (list, tuple)):
|
||||
for item in obj:
|
||||
self._walk_config_for_files(item)
|
||||
elif isinstance(obj, Path):
|
||||
if obj.is_absolute() and obj.is_file():
|
||||
self._add_file(obj)
|
||||
elif isinstance(obj, str):
|
||||
self._check_string_path(obj)
|
||||
|
||||
def _check_string_path(self, value: str) -> None:
|
||||
"""Check if a string value is a local file reference."""
|
||||
# Fast exits for strings that cannot be file paths
|
||||
if len(value) < 2 or "\n" in value:
|
||||
return
|
||||
if value.startswith(_NON_PATH_PREFIXES):
|
||||
return
|
||||
# File paths must contain a path separator or a dot (for extension)
|
||||
if "/" not in value and "\\" not in value and "." not in value:
|
||||
return
|
||||
|
||||
p = Path(value)
|
||||
|
||||
# Absolute path - check if it points to an existing file
|
||||
if p.is_absolute():
|
||||
if p.is_file():
|
||||
self._add_file(p)
|
||||
return
|
||||
|
||||
# Relative path with a known file extension - likely a component
|
||||
# validator that forgot to resolve to absolute via cv.file_() or
|
||||
# CORE.relative_config_path(). Warn and try to resolve.
|
||||
if p.suffix.lower() in _KNOWN_FILE_EXTENSIONS:
|
||||
_LOGGER.warning(
|
||||
"Bundle: non-absolute path in validated config: %s "
|
||||
"(component validator should return absolute paths)",
|
||||
value,
|
||||
)
|
||||
resolved = CORE.relative_config_path(p)
|
||||
if resolved.is_file():
|
||||
self._add_file(resolved)
|
||||
|
||||
def _build_filtered_secrets(self, used_keys: set[str]) -> dict[str, bytes]:
|
||||
"""Build filtered secrets files containing only the referenced keys.
|
||||
|
||||
Returns a dict mapping relative archive path to YAML bytes.
|
||||
"""
|
||||
if not used_keys or not self._secrets_paths:
|
||||
return {}
|
||||
|
||||
result: dict[str, bytes] = {}
|
||||
for secrets_path in self._secrets_paths:
|
||||
rel_path = self._relative_to_config_dir(secrets_path)
|
||||
if rel_path is None:
|
||||
continue
|
||||
try:
|
||||
all_secrets = yaml_util.load_yaml(secrets_path, clear_secrets=False)
|
||||
except EsphomeError:
|
||||
_LOGGER.warning("Bundle: failed to load secrets file %s", secrets_path)
|
||||
continue
|
||||
if not isinstance(all_secrets, dict):
|
||||
continue
|
||||
filtered = {k: v for k, v in all_secrets.items() if k in used_keys}
|
||||
if filtered:
|
||||
data = yaml_util.dump(filtered, show_secrets=True).encode("utf-8")
|
||||
result[rel_path] = data
|
||||
return result
|
||||
|
||||
def _build_manifest(
|
||||
self, files: list[BundleFile], *, has_secrets: bool
|
||||
) -> dict[str, Any]:
|
||||
"""Build the manifest.json content."""
|
||||
return {
|
||||
ManifestKey.MANIFEST_VERSION: CURRENT_MANIFEST_VERSION,
|
||||
ManifestKey.ESPHOME_VERSION: const.__version__,
|
||||
ManifestKey.CONFIG_FILENAME: self._config_path.name,
|
||||
ManifestKey.FILES: [f.path for f in files],
|
||||
ManifestKey.HAS_SECRETS: has_secrets,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _add_to_tar(tar: tarfile.TarFile, bf: BundleFile) -> None:
|
||||
"""Add a BundleFile to the tar archive with deterministic metadata."""
|
||||
with open(bf.source, "rb") as f:
|
||||
_add_bytes_to_tar(tar, bf.path, f.read())
|
||||
|
||||
|
||||
def extract_bundle(
|
||||
bundle_path: Path,
|
||||
target_dir: Path | None = None,
|
||||
) -> Path:
|
||||
"""Extract a bundle archive and return the path to the config YAML.
|
||||
|
||||
Sanity checks reject path traversal, symlinks, absolute paths, and
|
||||
oversized archives to prevent accidental file overwrites or extraction
|
||||
outside the target directory. These are **not** a security boundary —
|
||||
bundles are assumed to come from the user's own machine or a trusted
|
||||
build pipeline.
|
||||
|
||||
Args:
|
||||
bundle_path: Path to the .tar.gz bundle file.
|
||||
target_dir: Directory to extract into. If None, extracts next to
|
||||
the bundle file in a directory named after it.
|
||||
|
||||
Returns:
|
||||
Absolute path to the extracted config YAML file.
|
||||
|
||||
Raises:
|
||||
EsphomeError: If the bundle is invalid or extraction fails.
|
||||
"""
|
||||
|
||||
bundle_path = bundle_path.resolve()
|
||||
if not bundle_path.is_file():
|
||||
raise EsphomeError(f"Bundle file not found: {bundle_path}")
|
||||
|
||||
if target_dir is None:
|
||||
target_dir = _default_target_dir(bundle_path)
|
||||
|
||||
target_dir = target_dir.resolve()
|
||||
target_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Read and validate the archive
|
||||
try:
|
||||
with tarfile.open(bundle_path, "r:gz") as tar:
|
||||
manifest = _read_manifest_from_tar(tar)
|
||||
_validate_tar_members(tar, target_dir)
|
||||
tar.extractall(path=target_dir, filter="data")
|
||||
except tarfile.TarError as err:
|
||||
raise EsphomeError(f"Failed to extract bundle: {err}") from err
|
||||
|
||||
config_filename = manifest[ManifestKey.CONFIG_FILENAME]
|
||||
config_path = target_dir / config_filename
|
||||
if not config_path.is_file():
|
||||
raise EsphomeError(
|
||||
f"Bundle manifest references config '{config_filename}' "
|
||||
f"but it was not found in the archive"
|
||||
)
|
||||
|
||||
return config_path
|
||||
|
||||
|
||||
def read_bundle_manifest(bundle_path: Path) -> BundleManifest:
|
||||
"""Read and validate the manifest from a bundle without full extraction.
|
||||
|
||||
Args:
|
||||
bundle_path: Path to the .tar.gz bundle file.
|
||||
|
||||
Returns:
|
||||
Parsed BundleManifest.
|
||||
|
||||
Raises:
|
||||
EsphomeError: If the manifest is missing, invalid, or version unsupported.
|
||||
"""
|
||||
|
||||
try:
|
||||
with tarfile.open(bundle_path, "r:gz") as tar:
|
||||
manifest = _read_manifest_from_tar(tar)
|
||||
except tarfile.TarError as err:
|
||||
raise EsphomeError(f"Failed to read bundle: {err}") from err
|
||||
|
||||
return BundleManifest(
|
||||
manifest_version=manifest[ManifestKey.MANIFEST_VERSION],
|
||||
esphome_version=manifest.get(ManifestKey.ESPHOME_VERSION, "unknown"),
|
||||
config_filename=manifest[ManifestKey.CONFIG_FILENAME],
|
||||
files=manifest.get(ManifestKey.FILES, []),
|
||||
has_secrets=manifest.get(ManifestKey.HAS_SECRETS, False),
|
||||
)
|
||||
|
||||
|
||||
def _read_manifest_from_tar(tar: tarfile.TarFile) -> dict[str, Any]:
|
||||
"""Read and validate manifest.json from an open tar archive."""
|
||||
|
||||
try:
|
||||
member = tar.getmember(MANIFEST_FILENAME)
|
||||
except KeyError:
|
||||
raise EsphomeError("Invalid bundle: missing manifest.json") from None
|
||||
|
||||
f = tar.extractfile(member)
|
||||
if f is None:
|
||||
raise EsphomeError("Invalid bundle: manifest.json is not a regular file")
|
||||
|
||||
if member.size > MAX_MANIFEST_SIZE:
|
||||
raise EsphomeError(
|
||||
f"Invalid bundle: manifest.json too large "
|
||||
f"({member.size} bytes, max {MAX_MANIFEST_SIZE})"
|
||||
)
|
||||
|
||||
try:
|
||||
manifest = json.loads(f.read())
|
||||
except (json.JSONDecodeError, UnicodeDecodeError) as err:
|
||||
raise EsphomeError(f"Invalid bundle: malformed manifest.json: {err}") from err
|
||||
|
||||
# Version check
|
||||
version = manifest.get(ManifestKey.MANIFEST_VERSION)
|
||||
if version is None:
|
||||
raise EsphomeError("Invalid bundle: manifest.json missing 'manifest_version'")
|
||||
if not isinstance(version, int) or version < 1:
|
||||
raise EsphomeError(
|
||||
f"Invalid bundle: manifest_version must be a positive integer, got {version!r}"
|
||||
)
|
||||
if version > CURRENT_MANIFEST_VERSION:
|
||||
raise EsphomeError(
|
||||
f"Bundle manifest version {version} is newer than this ESPHome "
|
||||
f"version supports (max {CURRENT_MANIFEST_VERSION}). "
|
||||
f"Please upgrade ESPHome to compile this bundle."
|
||||
)
|
||||
|
||||
# Required fields
|
||||
if ManifestKey.CONFIG_FILENAME not in manifest:
|
||||
raise EsphomeError("Invalid bundle: manifest.json missing 'config_filename'")
|
||||
|
||||
return manifest
|
||||
|
||||
|
||||
def _validate_tar_members(tar: tarfile.TarFile, target_dir: Path) -> None:
|
||||
"""Sanity-check tar members to prevent mistakes and accidental overwrites.
|
||||
|
||||
This is not a security boundary — bundles are created locally or come
|
||||
from a trusted build pipeline. The checks catch malformed archives
|
||||
and common mistakes (stray absolute paths, ``..`` components) that
|
||||
could silently overwrite unrelated files.
|
||||
"""
|
||||
|
||||
total_size = 0
|
||||
for member in tar.getmembers():
|
||||
# Reject absolute paths (Unix and Windows)
|
||||
if member.name.startswith(("/", "\\")):
|
||||
raise EsphomeError(
|
||||
f"Invalid bundle: absolute path in archive: {member.name}"
|
||||
)
|
||||
|
||||
# Reject path traversal (split on both / and \ for cross-platform)
|
||||
parts = re.split(r"[/\\]", member.name)
|
||||
if ".." in parts:
|
||||
raise EsphomeError(
|
||||
f"Invalid bundle: path traversal in archive: {member.name}"
|
||||
)
|
||||
|
||||
# Reject symlinks
|
||||
if member.issym() or member.islnk():
|
||||
raise EsphomeError(f"Invalid bundle: symlink in archive: {member.name}")
|
||||
|
||||
# Ensure extraction stays within target_dir
|
||||
target_path = (target_dir / member.name).resolve()
|
||||
if not target_path.is_relative_to(target_dir):
|
||||
raise EsphomeError(
|
||||
f"Invalid bundle: file would extract outside target: {member.name}"
|
||||
)
|
||||
|
||||
# Track total decompressed size
|
||||
total_size += member.size
|
||||
if total_size > MAX_DECOMPRESSED_SIZE:
|
||||
raise EsphomeError(
|
||||
f"Invalid bundle: decompressed size exceeds "
|
||||
f"{MAX_DECOMPRESSED_SIZE // (1024 * 1024)}MB limit"
|
||||
)
|
||||
|
||||
|
||||
def is_bundle_path(path: Path) -> bool:
|
||||
"""Check if a path looks like a bundle file."""
|
||||
return path.name.lower().endswith(BUNDLE_EXTENSION)
|
||||
|
||||
|
||||
def _add_bytes_to_tar(tar: tarfile.TarFile, name: str, data: bytes) -> None:
|
||||
"""Add in-memory bytes to a tar archive with deterministic metadata."""
|
||||
info = tarfile.TarInfo(name=name)
|
||||
info.size = len(data)
|
||||
info.mtime = 0
|
||||
info.uid = 0
|
||||
info.gid = 0
|
||||
info.mode = 0o644
|
||||
tar.addfile(info, io.BytesIO(data))
|
||||
|
||||
|
||||
def _resolve_include_path(include_path: Any) -> Path | None:
|
||||
"""Resolve an include path to absolute, skipping system includes."""
|
||||
if isinstance(include_path, str) and include_path.startswith("<"):
|
||||
return None # System include, not a local file
|
||||
p = Path(include_path)
|
||||
if not p.is_absolute():
|
||||
p = CORE.relative_config_path(p)
|
||||
return p
|
||||
|
||||
|
||||
def _default_target_dir(bundle_path: Path) -> Path:
|
||||
"""Compute the default extraction directory for a bundle."""
|
||||
name = bundle_path.name
|
||||
if name.lower().endswith(BUNDLE_EXTENSION):
|
||||
name = name[: -len(BUNDLE_EXTENSION)]
|
||||
return bundle_path.parent / name
|
||||
|
||||
|
||||
def _restore_preserved_dirs(preserved: dict[str, Path], target_dir: Path) -> None:
|
||||
"""Move preserved build cache directories back into target_dir.
|
||||
|
||||
If the bundle contained entries under a preserved directory name,
|
||||
the extracted copy is removed so the original cache always wins.
|
||||
"""
|
||||
for dirname, src in preserved.items():
|
||||
dst = target_dir / dirname
|
||||
if dst.exists():
|
||||
shutil.rmtree(dst)
|
||||
shutil.move(str(src), str(dst))
|
||||
|
||||
|
||||
def prepare_bundle_for_compile(
|
||||
bundle_path: Path,
|
||||
target_dir: Path | None = None,
|
||||
) -> Path:
|
||||
"""Extract a bundle for compilation, preserving build caches.
|
||||
|
||||
Unlike extract_bundle(), this preserves .esphome/ and .pioenvs/
|
||||
directories in the target if they already exist (for incremental builds).
|
||||
|
||||
Args:
|
||||
bundle_path: Path to the .tar.gz bundle file.
|
||||
target_dir: Directory to extract into. Must be specified for
|
||||
build server use.
|
||||
|
||||
Returns:
|
||||
Absolute path to the extracted config YAML file.
|
||||
"""
|
||||
|
||||
bundle_path = bundle_path.resolve()
|
||||
if not bundle_path.is_file():
|
||||
raise EsphomeError(f"Bundle file not found: {bundle_path}")
|
||||
|
||||
if target_dir is None:
|
||||
target_dir = _default_target_dir(bundle_path)
|
||||
|
||||
target_dir = target_dir.resolve()
|
||||
target_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
preserved: dict[str, Path] = {}
|
||||
|
||||
# Temporarily move preserved dirs out of the way
|
||||
staging = target_dir / _BUNDLE_STAGING_DIR
|
||||
for dirname in _PRESERVE_DIRS:
|
||||
src = target_dir / dirname
|
||||
if src.is_dir():
|
||||
dst = staging / dirname
|
||||
dst.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.move(str(src), str(dst))
|
||||
preserved[dirname] = dst
|
||||
|
||||
try:
|
||||
# Clean non-preserved content and extract fresh
|
||||
for item in target_dir.iterdir():
|
||||
if item.name == _BUNDLE_STAGING_DIR:
|
||||
continue
|
||||
if item.is_dir():
|
||||
shutil.rmtree(item)
|
||||
else:
|
||||
item.unlink()
|
||||
|
||||
config_path = extract_bundle(bundle_path, target_dir)
|
||||
finally:
|
||||
# Restore preserved dirs (idempotent) and clean staging
|
||||
_restore_preserved_dirs(preserved, target_dir)
|
||||
if staging.is_dir():
|
||||
shutil.rmtree(staging)
|
||||
|
||||
return config_path
|
||||
@@ -71,9 +71,11 @@ def _validate_load_certificate(value):
|
||||
|
||||
|
||||
def validate_certificate(value):
|
||||
# _validate_load_certificate already calls cv.file_() internally,
|
||||
# but returns the parsed certificate object. We re-call cv.file_()
|
||||
# to get the resolved path string that the bundle walker can discover.
|
||||
_validate_load_certificate(value)
|
||||
# Validation result should be the path, not the loaded certificate
|
||||
return value
|
||||
return str(cv.file_(value))
|
||||
|
||||
|
||||
def _validate_load_private_key(key, cert_pw):
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
from contextlib import suppress
|
||||
from collections.abc import Callable, Generator
|
||||
from contextlib import contextmanager, suppress
|
||||
import functools
|
||||
import inspect
|
||||
from io import BytesIO, TextIOBase, TextIOWrapper
|
||||
@@ -43,6 +43,27 @@ _LOGGER = logging.getLogger(__name__)
|
||||
SECRET_YAML = "secrets.yaml"
|
||||
_SECRET_CACHE = {}
|
||||
_SECRET_VALUES = {}
|
||||
# Not thread-safe — config processing is single-threaded today.
|
||||
_load_listeners: list[Callable[[Path], None]] = []
|
||||
|
||||
|
||||
@contextmanager
|
||||
def track_yaml_loads() -> Generator[list[Path]]:
|
||||
"""Context manager that records every file loaded by the YAML loader.
|
||||
|
||||
Yields a list that is populated with resolved Path objects for every
|
||||
file loaded through ``_load_yaml_internal`` while the context is active.
|
||||
"""
|
||||
loaded: list[Path] = []
|
||||
|
||||
def _on_load(fname: Path) -> None:
|
||||
loaded.append(Path(fname).resolve())
|
||||
|
||||
_load_listeners.append(_on_load)
|
||||
try:
|
||||
yield loaded
|
||||
finally:
|
||||
_load_listeners.remove(_on_load)
|
||||
|
||||
|
||||
class ESPHomeDataBase:
|
||||
@@ -428,6 +449,8 @@ def load_yaml(fname: Path, clear_secrets: bool = True) -> Any:
|
||||
|
||||
def _load_yaml_internal(fname: Path) -> Any:
|
||||
"""Load a YAML file."""
|
||||
for listener in _load_listeners:
|
||||
listener(fname)
|
||||
try:
|
||||
with fname.open(encoding="utf-8") as f_handle:
|
||||
return parse_yaml(fname, f_handle)
|
||||
@@ -435,10 +458,10 @@ def _load_yaml_internal(fname: Path) -> Any:
|
||||
raise EsphomeError(f"Error reading file {fname}: {err}") from err
|
||||
|
||||
|
||||
def parse_yaml(
|
||||
file_name: Path, file_handle: TextIOWrapper, yaml_loader=_load_yaml_internal
|
||||
) -> Any:
|
||||
def parse_yaml(file_name: Path, file_handle: TextIOWrapper, yaml_loader=None) -> Any:
|
||||
"""Parse a YAML file."""
|
||||
if yaml_loader is None:
|
||||
yaml_loader = _load_yaml_internal
|
||||
try:
|
||||
return _load_yaml_internal_with_type(
|
||||
ESPHomeLoader, file_name, file_handle, yaml_loader
|
||||
|
||||
18
tests/unit_tests/fixtures/bundle/assets/certs/ca_cert.pem
Normal file
18
tests/unit_tests/fixtures/bundle/assets/certs/ca_cert.pem
Normal file
@@ -0,0 +1,18 @@
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIICzjCCAbagAwIBAgIUW3BzjtekVgMj12/oeXawSswGyXMwDQYJKoZIhvcNAQEL
|
||||
BQAwITEfMB0GA1UEAwwWRVNQSG9tZSBCdW5kbGUgVGVzdCBDQTAeFw0yNjAyMDYx
|
||||
MzMxMTZaFw0yNzAyMDYxMzMxMTZaMCExHzAdBgNVBAMMFkVTUEhvbWUgQnVuZGxl
|
||||
IFRlc3QgQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDG62vBFkGn
|
||||
hEu54gh2A7b1ZwesVadZ6u0iaVO7GSWiI0o4nb6xv7ULZbGrgsKNIO6qCV4VSR3p
|
||||
BfMhF5dFy8kkMzA8dKZMk16tygzocdNum2QQ8BHyIsATL7SGZ33si9Alp30gXv6h
|
||||
XSlEKYDKHFavkDhWPFNa5+oeHbMS/MxjpOUXIpq32VaFpJr427d9Y9wGjuK8B7Gp
|
||||
CI5Ub1g2dpC9xSHqQKD3JZokmtc70+mD74AcNWbyxWp0bkW9wOfNJJnAoiwhJxQ8
|
||||
yfE37UsUIVc8014NhdhU1K/S0iQuOKfGX1L/GAshv8syQIcDfzJuJdX+5E/leAYD
|
||||
UEKqRkcLT+D5AgMBAAEwDQYJKoZIhvcNAQELBQADggEBAF1HpJ6d+W5WrzOQrGej
|
||||
41pxCDeJ9tSiSj/KtvJfjEVIpg0hMRTY7nSL7OAg9KGESfx4u1jMwVnyOv34br5B
|
||||
DTlRl+wF2k7Ip8CNnyZfCC+1SVQZpUt1mVNz8BhIZZ9/a830wCILNQQrVKkSeNBk
|
||||
SEc1qTt4mIhQZ+M422qAswluv4fz/FW1f4oB9KhCpzUCANjmyERnqTnImjnJu8h0
|
||||
jbPNnNsN+G+Roju8UD/7atWYfAUmDjHx72Ci/5G9SzoM5fhgxxu43XYd5RW5wBzt
|
||||
j4KdKdYlDtOL62mRPKWd40uGnJcieUjisU7noRn0ErMgbUlhLdbXT9X7aNborZcu
|
||||
x6I=
|
||||
-----END CERTIFICATE-----
|
||||
@@ -0,0 +1,18 @@
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIICzjCCAbagAwIBAgIUW3BzjtekVgMj12/oeXawSswGyXMwDQYJKoZIhvcNAQEL
|
||||
BQAwITEfMB0GA1UEAwwWRVNQSG9tZSBCdW5kbGUgVGVzdCBDQTAeFw0yNjAyMDYx
|
||||
MzMxMTZaFw0yNzAyMDYxMzMxMTZaMCExHzAdBgNVBAMMFkVTUEhvbWUgQnVuZGxl
|
||||
IFRlc3QgQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDG62vBFkGn
|
||||
hEu54gh2A7b1ZwesVadZ6u0iaVO7GSWiI0o4nb6xv7ULZbGrgsKNIO6qCV4VSR3p
|
||||
BfMhF5dFy8kkMzA8dKZMk16tygzocdNum2QQ8BHyIsATL7SGZ33si9Alp30gXv6h
|
||||
XSlEKYDKHFavkDhWPFNa5+oeHbMS/MxjpOUXIpq32VaFpJr427d9Y9wGjuK8B7Gp
|
||||
CI5Ub1g2dpC9xSHqQKD3JZokmtc70+mD74AcNWbyxWp0bkW9wOfNJJnAoiwhJxQ8
|
||||
yfE37UsUIVc8014NhdhU1K/S0iQuOKfGX1L/GAshv8syQIcDfzJuJdX+5E/leAYD
|
||||
UEKqRkcLT+D5AgMBAAEwDQYJKoZIhvcNAQELBQADggEBAF1HpJ6d+W5WrzOQrGej
|
||||
41pxCDeJ9tSiSj/KtvJfjEVIpg0hMRTY7nSL7OAg9KGESfx4u1jMwVnyOv34br5B
|
||||
DTlRl+wF2k7Ip8CNnyZfCC+1SVQZpUt1mVNz8BhIZZ9/a830wCILNQQrVKkSeNBk
|
||||
SEc1qTt4mIhQZ+M422qAswluv4fz/FW1f4oB9KhCpzUCANjmyERnqTnImjnJu8h0
|
||||
jbPNnNsN+G+Roju8UD/7atWYfAUmDjHx72Ci/5G9SzoM5fhgxxu43XYd5RW5wBzt
|
||||
j4KdKdYlDtOL62mRPKWd40uGnJcieUjisU7noRn0ErMgbUlhLdbXT9X7aNborZcu
|
||||
x6I=
|
||||
-----END CERTIFICATE-----
|
||||
27
tests/unit_tests/fixtures/bundle/assets/certs/client_key.pem
Normal file
27
tests/unit_tests/fixtures/bundle/assets/certs/client_key.pem
Normal file
@@ -0,0 +1,27 @@
|
||||
-----BEGIN RSA PRIVATE KEY-----
|
||||
MIIEowIBAAKCAQEAxutrwRZBp4RLueIIdgO29WcHrFWnWertImlTuxkloiNKOJ2+
|
||||
sb+1C2Wxq4LCjSDuqgleFUkd6QXzIReXRcvJJDMwPHSmTJNercoM6HHTbptkEPAR
|
||||
8iLAEy+0hmd97IvQJad9IF7+oV0pRCmAyhxWr5A4VjxTWufqHh2zEvzMY6TlFyKa
|
||||
t9lWhaSa+Nu3fWPcBo7ivAexqQiOVG9YNnaQvcUh6kCg9yWaJJrXO9Ppg++AHDVm
|
||||
8sVqdG5FvcDnzSSZwKIsIScUPMnxN+1LFCFXPNNeDYXYVNSv0tIkLjinxl9S/xgL
|
||||
Ib/LMkCHA38ybiXV/uRP5XgGA1BCqkZHC0/g+QIDAQABAoIBAEpsFwcJNCwf95MG
|
||||
qcK5lhCPaRQFgdTG68ylmoGUIXvddy3ies+W2X33oLb5958ElLaCRbRyBCJEKxgU
|
||||
8vBWk50bF69uty9MLa6YuyaWO5QUyCX8I8KzVKh4/zIP81F2Z7xGwy5CzEKED+Xk
|
||||
Hz6+xoHt094TuN34iaOV2gM/GJsok4Wp/lzsuT3X6i3Nad9YGrV2yL/wv5c542bw
|
||||
vrFDtYQ/+ADZZPW4+xK0ShiarSqV3iXB2cEjc4JX7yLX1hB4LY8VHRzl+Byjdl0/
|
||||
lheiIesl5htl82SFxquZDimDsbilTm7TLW2bbm3b3/oC7DchTx6COBjp90VJqk3R
|
||||
QrO5dicCgYEA80pyA7tCB0bGnJ7KWkteKddyOdakeYeM7Bpfv17qbCm9ciMw9nqt
|
||||
KJVZPtAuqZGTpfSJseOCIyz9zloB79hVJ3mdWpGJVvmNM5H+BJyCciXpwfqp64QG
|
||||
1gMqGlSy/MwsZHqNCsOIvrzH09GFN0LSPNKeXN7GNAtU1vI5s7Xf158CgYEA0U+Y
|
||||
Qe1qJY4m597spHNFfkGznoFXAjHOoWYHv95902cH6JD4GnYPfwFXxgFsrJhFaFMC
|
||||
jXlT0fRFAIe4NuUJhGD6TYSJqsFkH3xJkAepvKpfjM5qJ7+PQHRnED/E5OS2Nj0R
|
||||
+cxBhTEWTw9YiOFBRbj6hlphkj8izVGJZ2pL4GcCgYEApsjiYKx/F33tqnExR7Vj
|
||||
WEvagswi9S137mQmP4tSKdRzi0uUxWRUUP4RsH4HfzfNgHej7c+J55Nwa4ZIzaQA
|
||||
vI8i0HP1MyrhIflzqrWgt6BGIDU3R7268fw5YNOv4J4X0Moy5q4lkJzaYNvB96BX
|
||||
gFrjNceDGSqrfq+P3yNP0QECgYBNQfHTM8ygPA4EO/Zg5ONbrOidsuPovXWlgUGP
|
||||
ApKy+y6iGxBYxAcIO/in71KrijDkRu+ERKo5rs3hWjcWnAedQyZggnFGA8fvDzMf
|
||||
5JQ0PTazhGUOcthvVAfOqZsFWZ4f+v6tk0UD4pB3chSdwXcUQyjFeorVLlSsMFJl
|
||||
R4jmNQKBgG38YFR2bqIc7jJItr+34POXdJ4te8Dm1jJHbo8xXsnjVSaxjc5PGs3p
|
||||
OuJpwuMwzEuFEnE7XLkQxTJw54OBLMmDgK0XUOPDq6eLzrKkW5NlpejqaQV9Piyo
|
||||
q1kqbJan20jfJQUGTcX7FXHMUThzqJltHILR1GTW6I9z4k8xdsDY
|
||||
-----END RSA PRIVATE KEY-----
|
||||
BIN
tests/unit_tests/fixtures/bundle/assets/fonts/test_font.ttf
Normal file
BIN
tests/unit_tests/fixtures/bundle/assets/fonts/test_font.ttf
Normal file
Binary file not shown.
BIN
tests/unit_tests/fixtures/bundle/assets/images/animation.gif
Normal file
BIN
tests/unit_tests/fixtures/bundle/assets/images/animation.gif
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 9.5 KiB |
BIN
tests/unit_tests/fixtures/bundle/assets/images/logo.png
Normal file
BIN
tests/unit_tests/fixtures/bundle/assets/images/logo.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 685 B |
2
tests/unit_tests/fixtures/bundle/assets/web/custom.css
Normal file
2
tests/unit_tests/fixtures/bundle/assets/web/custom.css
Normal file
@@ -0,0 +1,2 @@
|
||||
/* Dummy CSS for bundle testing */
|
||||
body { color: red; }
|
||||
2
tests/unit_tests/fixtures/bundle/assets/web/custom.js
Normal file
2
tests/unit_tests/fixtures/bundle/assets/web/custom.js
Normal file
@@ -0,0 +1,2 @@
|
||||
// Dummy JS for bundle testing
|
||||
console.log("test");
|
||||
60
tests/unit_tests/fixtures/bundle/bundle_test.yaml
Normal file
60
tests/unit_tests/fixtures/bundle/bundle_test.yaml
Normal file
@@ -0,0 +1,60 @@
|
||||
esphome:
|
||||
name: bundle-test
|
||||
includes:
|
||||
- includes/custom_sensor.h
|
||||
|
||||
esp32:
|
||||
board: esp32dev
|
||||
framework:
|
||||
type: esp-idf
|
||||
|
||||
logger:
|
||||
<<: !include common/base.yaml
|
||||
|
||||
wifi:
|
||||
ssid: !secret wifi_ssid
|
||||
password: !secret wifi_password
|
||||
|
||||
api:
|
||||
|
||||
ota:
|
||||
- platform: esphome
|
||||
password: !secret ota_password
|
||||
|
||||
web_server:
|
||||
port: 80
|
||||
css_include: assets/web/custom.css
|
||||
js_include: assets/web/custom.js
|
||||
|
||||
i2c:
|
||||
sda: GPIO21
|
||||
scl: GPIO22
|
||||
|
||||
font:
|
||||
- id: test_font
|
||||
file: assets/fonts/test_font.ttf
|
||||
size: 16
|
||||
|
||||
image:
|
||||
- id: test_image
|
||||
file: assets/images/logo.png
|
||||
type: BINARY
|
||||
resize: 16x16
|
||||
|
||||
animation:
|
||||
- id: test_animation
|
||||
file: assets/images/animation.gif
|
||||
type: BINARY
|
||||
resize: 16x16
|
||||
|
||||
display:
|
||||
- platform: ssd1306_i2c
|
||||
model: SSD1306_128X64
|
||||
address: 0x3C
|
||||
lambda: |-
|
||||
it.image(0, 0, id(test_image));
|
||||
|
||||
external_components:
|
||||
- source:
|
||||
type: local
|
||||
path: local_components
|
||||
1
tests/unit_tests/fixtures/bundle/common/base.yaml
Normal file
1
tests/unit_tests/fixtures/bundle/common/base.yaml
Normal file
@@ -0,0 +1 @@
|
||||
level: DEBUG
|
||||
@@ -0,0 +1,3 @@
|
||||
// Dummy custom sensor header for bundle testing
|
||||
#pragma once
|
||||
#include "esphome/core/component.h"
|
||||
@@ -0,0 +1 @@
|
||||
# Dummy local external component for bundle testing
|
||||
@@ -0,0 +1,2 @@
|
||||
// Dummy component header for bundle testing
|
||||
#pragma once
|
||||
4
tests/unit_tests/fixtures/bundle/secrets.yaml
Normal file
4
tests/unit_tests/fixtures/bundle/secrets.yaml
Normal file
@@ -0,0 +1,4 @@
|
||||
wifi_ssid: "TestNetwork"
|
||||
wifi_password: "TestPassword123"
|
||||
api_key: "unused_secret_should_not_appear"
|
||||
ota_password: "ota_test_password"
|
||||
1210
tests/unit_tests/test_bundle.py
Normal file
1210
tests/unit_tests/test_bundle.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -20,6 +20,7 @@ from esphome.__main__ import (
|
||||
Purpose,
|
||||
choose_upload_log_host,
|
||||
command_analyze_memory,
|
||||
command_bundle,
|
||||
command_clean_all,
|
||||
command_rename,
|
||||
command_update_all,
|
||||
@@ -41,6 +42,7 @@ from esphome.__main__ import (
|
||||
upload_program,
|
||||
upload_using_esptool,
|
||||
)
|
||||
from esphome.bundle import BUNDLE_EXTENSION, BundleFile, BundleResult
|
||||
from esphome.components.esp32 import KEY_ESP32, KEY_VARIANT, VARIANT_ESP32
|
||||
from esphome.const import (
|
||||
CONF_API,
|
||||
@@ -865,6 +867,8 @@ class MockArgs:
|
||||
name: str | None = None
|
||||
dashboard: bool = False
|
||||
reset: bool = False
|
||||
list_only: bool = False
|
||||
output: str | None = None
|
||||
|
||||
|
||||
def test_upload_program_serial_esp32(
|
||||
@@ -3291,3 +3295,195 @@ esp32:
|
||||
clean_output.split("SUMMARY")[1] if "SUMMARY" in clean_output else ""
|
||||
)
|
||||
assert "secrets.yaml" not in summary_section
|
||||
|
||||
|
||||
# --- command_bundle tests ---
|
||||
|
||||
|
||||
def test_command_bundle_list_only(
|
||||
tmp_path: Path,
|
||||
capsys: CaptureFixture[str],
|
||||
) -> None:
|
||||
"""Test command_bundle with --list-only prints files and returns 0."""
|
||||
mock_files = [
|
||||
BundleFile(path="device.yaml", source=tmp_path / "device.yaml"),
|
||||
BundleFile(path="secrets.yaml", source=tmp_path / "secrets.yaml"),
|
||||
BundleFile(path="common/base.yaml", source=tmp_path / "common" / "base.yaml"),
|
||||
]
|
||||
|
||||
args = MockArgs(list_only=True)
|
||||
config: dict[str, Any] = {}
|
||||
|
||||
mock_creator = MagicMock()
|
||||
mock_creator.discover_files.return_value = mock_files
|
||||
|
||||
with patch("esphome.bundle.ConfigBundleCreator", return_value=mock_creator):
|
||||
result = command_bundle(args, config)
|
||||
|
||||
assert result == 0
|
||||
captured = capsys.readouterr()
|
||||
# Files should be printed in sorted order
|
||||
assert "common/base.yaml" in captured.out
|
||||
assert "device.yaml" in captured.out
|
||||
assert "secrets.yaml" in captured.out
|
||||
|
||||
|
||||
def test_command_bundle_list_only_empty(
|
||||
tmp_path: Path,
|
||||
capsys: CaptureFixture[str],
|
||||
) -> None:
|
||||
"""Test command_bundle --list-only with no files discovered."""
|
||||
args = MockArgs(list_only=True)
|
||||
config: dict[str, Any] = {}
|
||||
|
||||
mock_creator = MagicMock()
|
||||
mock_creator.discover_files.return_value = []
|
||||
|
||||
with patch("esphome.bundle.ConfigBundleCreator", return_value=mock_creator):
|
||||
result = command_bundle(args, config)
|
||||
|
||||
assert result == 0
|
||||
|
||||
|
||||
def test_command_bundle_creates_archive(tmp_path: Path) -> None:
|
||||
"""Test command_bundle creates archive at default output path."""
|
||||
CORE.config_path = tmp_path / "mydevice.yaml"
|
||||
|
||||
mock_result = BundleResult(
|
||||
data=b"fake-tar-gz-data",
|
||||
manifest={"manifest_version": 1},
|
||||
files=[BundleFile(path="mydevice.yaml", source=tmp_path / "mydevice.yaml")],
|
||||
)
|
||||
|
||||
args = MockArgs()
|
||||
config: dict[str, Any] = {}
|
||||
|
||||
mock_creator = MagicMock()
|
||||
mock_creator.create_bundle.return_value = mock_result
|
||||
|
||||
with patch("esphome.bundle.ConfigBundleCreator", return_value=mock_creator):
|
||||
result = command_bundle(args, config)
|
||||
|
||||
assert result == 0
|
||||
output_path = tmp_path / f"mydevice{BUNDLE_EXTENSION}"
|
||||
assert output_path.exists()
|
||||
assert output_path.read_bytes() == b"fake-tar-gz-data"
|
||||
|
||||
|
||||
def test_command_bundle_custom_output(tmp_path: Path) -> None:
|
||||
"""Test command_bundle with -o custom output path."""
|
||||
custom_output = tmp_path / "output" / "custom.esphomebundle.tar.gz"
|
||||
mock_result = BundleResult(
|
||||
data=b"custom-output-data",
|
||||
manifest={"manifest_version": 1},
|
||||
files=[BundleFile(path="mydevice.yaml", source=tmp_path / "mydevice.yaml")],
|
||||
)
|
||||
|
||||
args = MockArgs(output=str(custom_output))
|
||||
config: dict[str, Any] = {}
|
||||
|
||||
mock_creator = MagicMock()
|
||||
mock_creator.create_bundle.return_value = mock_result
|
||||
|
||||
with patch("esphome.bundle.ConfigBundleCreator", return_value=mock_creator):
|
||||
result = command_bundle(args, config)
|
||||
|
||||
assert result == 0
|
||||
assert custom_output.exists()
|
||||
assert custom_output.read_bytes() == b"custom-output-data"
|
||||
|
||||
|
||||
def test_command_bundle_creates_parent_dirs(tmp_path: Path) -> None:
|
||||
"""Test command_bundle creates parent directories for output path."""
|
||||
nested_output = tmp_path / "deep" / "nested" / "dir" / "out.tar.gz"
|
||||
mock_result = BundleResult(
|
||||
data=b"data",
|
||||
manifest={"manifest_version": 1},
|
||||
files=[BundleFile(path="mydevice.yaml", source=tmp_path / "mydevice.yaml")],
|
||||
)
|
||||
|
||||
args = MockArgs(output=str(nested_output))
|
||||
config: dict[str, Any] = {}
|
||||
|
||||
mock_creator = MagicMock()
|
||||
mock_creator.create_bundle.return_value = mock_result
|
||||
|
||||
with patch("esphome.bundle.ConfigBundleCreator", return_value=mock_creator):
|
||||
result = command_bundle(args, config)
|
||||
|
||||
assert result == 0
|
||||
assert nested_output.exists()
|
||||
|
||||
|
||||
def test_command_bundle_logs_info(
|
||||
tmp_path: Path,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test command_bundle logs bundle creation info."""
|
||||
CORE.config_path = tmp_path / "mydevice.yaml"
|
||||
|
||||
mock_result = BundleResult(
|
||||
data=b"x" * 2048,
|
||||
manifest={"manifest_version": 1},
|
||||
files=[
|
||||
BundleFile(path="mydevice.yaml", source=tmp_path / "mydevice.yaml"),
|
||||
BundleFile(path="secrets.yaml", source=tmp_path / "secrets.yaml"),
|
||||
],
|
||||
)
|
||||
|
||||
args = MockArgs()
|
||||
config: dict[str, Any] = {}
|
||||
|
||||
mock_creator = MagicMock()
|
||||
mock_creator.create_bundle.return_value = mock_result
|
||||
|
||||
with (
|
||||
patch("esphome.bundle.ConfigBundleCreator", return_value=mock_creator),
|
||||
caplog.at_level(logging.INFO),
|
||||
):
|
||||
result = command_bundle(args, config)
|
||||
|
||||
assert result == 0
|
||||
assert "Bundle created" in caplog.text
|
||||
assert "2 files" in caplog.text
|
||||
assert "2.0 KB" in caplog.text
|
||||
|
||||
|
||||
def test_run_esphome_bundle_detection(tmp_path: Path) -> None:
|
||||
"""Test run_esphome detects .esphomebundle.tar.gz and extracts it."""
|
||||
bundle_path = tmp_path / f"device{BUNDLE_EXTENSION}"
|
||||
bundle_path.write_bytes(b"fake-bundle")
|
||||
|
||||
extracted_yaml = tmp_path / "extracted" / "device.yaml"
|
||||
|
||||
with (
|
||||
patch("esphome.bundle.is_bundle_path", return_value=True) as mock_is_bundle,
|
||||
patch(
|
||||
"esphome.bundle.prepare_bundle_for_compile",
|
||||
return_value=extracted_yaml,
|
||||
) as mock_prepare,
|
||||
patch("esphome.__main__.read_config", return_value=None),
|
||||
):
|
||||
result = run_esphome(["esphome", "compile", str(bundle_path)])
|
||||
|
||||
mock_is_bundle.assert_called_once()
|
||||
mock_prepare.assert_called_once_with(bundle_path)
|
||||
# read_config returns None → exit code 2
|
||||
assert result == 2
|
||||
|
||||
|
||||
def test_run_esphome_non_bundle_skips_extraction(tmp_path: Path) -> None:
|
||||
"""Test run_esphome does not extract for regular .yaml files."""
|
||||
yaml_file = tmp_path / "device.yaml"
|
||||
yaml_file.write_text("esphome:\n name: test\n")
|
||||
|
||||
with (
|
||||
patch("esphome.bundle.is_bundle_path", return_value=False) as mock_is_bundle,
|
||||
patch("esphome.bundle.prepare_bundle_for_compile") as mock_prepare,
|
||||
patch("esphome.__main__.read_config", return_value=None),
|
||||
):
|
||||
result = run_esphome(["esphome", "compile", str(yaml_file)])
|
||||
|
||||
mock_is_bundle.assert_called_once()
|
||||
mock_prepare.assert_not_called()
|
||||
assert result == 2
|
||||
|
||||
@@ -306,3 +306,57 @@ def test_dump_sort_keys() -> None:
|
||||
# nested keys should also be sorted
|
||||
assert "a_key:" in sorted_dump
|
||||
assert sorted_dump.index("a_key:") < sorted_dump.index("z_key:")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# track_yaml_loads
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_track_yaml_loads_records_files(tmp_path: Path) -> None:
|
||||
"""track_yaml_loads records every file loaded inside the context."""
|
||||
yaml_file = tmp_path / "test.yaml"
|
||||
yaml_file.write_text("key: value\n")
|
||||
|
||||
with yaml_util.track_yaml_loads() as loaded:
|
||||
yaml_util.load_yaml(yaml_file)
|
||||
|
||||
assert len(loaded) == 1
|
||||
assert loaded[0] == yaml_file.resolve()
|
||||
|
||||
|
||||
def test_track_yaml_loads_records_includes(tmp_path: Path) -> None:
|
||||
"""track_yaml_loads records nested !include files."""
|
||||
inc = tmp_path / "included.yaml"
|
||||
inc.write_text("included_key: 42\n")
|
||||
main = tmp_path / "main.yaml"
|
||||
main.write_text("child: !include included.yaml\n")
|
||||
|
||||
with yaml_util.track_yaml_loads() as loaded:
|
||||
yaml_util.load_yaml(main)
|
||||
|
||||
resolved = [p.name for p in loaded]
|
||||
assert "main.yaml" in resolved
|
||||
assert "included.yaml" in resolved
|
||||
|
||||
|
||||
def test_track_yaml_loads_empty_outside_context(tmp_path: Path) -> None:
|
||||
"""Files loaded outside the context are not recorded."""
|
||||
yaml_file = tmp_path / "test.yaml"
|
||||
yaml_file.write_text("key: value\n")
|
||||
|
||||
with yaml_util.track_yaml_loads() as loaded:
|
||||
pass # load nothing inside
|
||||
|
||||
yaml_util.load_yaml(yaml_file)
|
||||
assert loaded == []
|
||||
|
||||
|
||||
def test_track_yaml_loads_cleanup_on_exception(tmp_path: Path) -> None:
|
||||
"""Listener is removed even if the body raises."""
|
||||
before = len(yaml_util._load_listeners)
|
||||
|
||||
with pytest.raises(RuntimeError), yaml_util.track_yaml_loads():
|
||||
raise RuntimeError("boom")
|
||||
|
||||
assert len(yaml_util._load_listeners) == before
|
||||
|
||||
Reference in New Issue
Block a user