mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-25 05:03:52 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			444 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			444 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| """Merge multiple component test configurations into a single test file.
 | |
| 
 | |
| This script combines multiple component test files that use the same common bus
 | |
| configurations into a single merged test file. This allows testing multiple
 | |
| compatible components together, reducing CI build time.
 | |
| 
 | |
| The merger handles:
 | |
| - Component-specific substitutions (prefixing to avoid conflicts)
 | |
| - Multiple instances of component configurations
 | |
| - Shared common bus packages (included only once)
 | |
| - Platform-specific configurations
 | |
| - Uses ESPHome's built-in merge_config for proper YAML merging
 | |
| """
 | |
| 
 | |
| from __future__ import annotations
 | |
| 
 | |
| import argparse
 | |
| from functools import lru_cache
 | |
| from pathlib import Path
 | |
| import re
 | |
| import sys
 | |
| from typing import Any
 | |
| 
 | |
| # Add esphome to path so we can import from it
 | |
| sys.path.insert(0, str(Path(__file__).parent.parent))
 | |
| 
 | |
| from esphome import yaml_util
 | |
| from esphome.config_helpers import merge_config
 | |
| from script.analyze_component_buses import PACKAGE_DEPENDENCIES, get_common_bus_packages
 | |
| 
 | |
| # Prefix for dependency markers in package tracking
 | |
| # Used to mark packages that are included transitively (e.g., uart via modbus)
 | |
| DEPENDENCY_MARKER_PREFIX = "_dep_"
 | |
| 
 | |
| 
 | |
| def load_yaml_file(yaml_file: Path) -> dict:
 | |
|     """Load YAML file using ESPHome's YAML loader.
 | |
| 
 | |
|     Args:
 | |
|         yaml_file: Path to the YAML file
 | |
| 
 | |
|     Returns:
 | |
|         Parsed YAML as dictionary
 | |
|     """
 | |
|     if not yaml_file.exists():
 | |
|         raise FileNotFoundError(f"YAML file not found: {yaml_file}")
 | |
| 
 | |
|     return yaml_util.load_yaml(yaml_file)
 | |
| 
 | |
| 
 | |
| @lru_cache(maxsize=256)
 | |
| def get_component_packages(
 | |
|     component_name: str, platform: str, tests_dir_str: str
 | |
| ) -> dict:
 | |
|     """Get packages dict from a component's test file with caching.
 | |
| 
 | |
|     This function is cached to avoid re-loading and re-parsing the same file
 | |
|     multiple times when extracting packages during cross-bus merging.
 | |
| 
 | |
|     Args:
 | |
|         component_name: Name of the component
 | |
|         platform: Platform name (e.g., "esp32-idf")
 | |
|         tests_dir_str: String path to tests/components directory (must be string for cache hashability)
 | |
| 
 | |
|     Returns:
 | |
|         Dictionary with 'packages' key containing the raw packages dict from the YAML,
 | |
|         or empty dict if no packages section exists
 | |
|     """
 | |
|     tests_dir = Path(tests_dir_str)
 | |
|     test_file = tests_dir / component_name / f"test.{platform}.yaml"
 | |
|     comp_data = load_yaml_file(test_file)
 | |
| 
 | |
|     if "packages" not in comp_data or not isinstance(comp_data["packages"], dict):
 | |
|         return {}
 | |
| 
 | |
|     return comp_data["packages"]
 | |
| 
 | |
| 
 | |
| def extract_packages_from_yaml(data: dict) -> dict[str, str]:
 | |
|     """Extract COMMON BUS package includes from parsed YAML.
 | |
| 
 | |
|     Only extracts packages that are from test_build_components/common/,
 | |
|     ignoring component-specific packages.
 | |
| 
 | |
|     Args:
 | |
|         data: Parsed YAML dictionary
 | |
| 
 | |
|     Returns:
 | |
|         Dictionary mapping package name to include path (as string representation)
 | |
|         Only includes common bus packages (i2c, spi, uart, etc.)
 | |
|     """
 | |
|     if "packages" not in data:
 | |
|         return {}
 | |
| 
 | |
|     packages_value = data["packages"]
 | |
|     if not isinstance(packages_value, dict):
 | |
|         # List format doesn't include common bus packages (those use dict format)
 | |
|         return {}
 | |
| 
 | |
|     # Get common bus package names (cached)
 | |
|     common_bus_packages = get_common_bus_packages()
 | |
|     packages = {}
 | |
| 
 | |
|     # Dictionary format: packages: {name: value}
 | |
|     for name, value in packages_value.items():
 | |
|         # Only include common bus packages, ignore component-specific ones
 | |
|         if name not in common_bus_packages:
 | |
|             continue
 | |
|         packages[name] = str(value)
 | |
|         # Also track package dependencies (e.g., modbus includes uart)
 | |
|         if name not in PACKAGE_DEPENDENCIES:
 | |
|             continue
 | |
|         for dep in PACKAGE_DEPENDENCIES[name]:
 | |
|             if dep not in common_bus_packages:
 | |
|                 continue
 | |
|             # Mark as included via dependency
 | |
|             packages[f"{DEPENDENCY_MARKER_PREFIX}{dep}"] = f"(included via {name})"
 | |
| 
 | |
|     return packages
 | |
| 
 | |
| 
 | |
| def prefix_substitutions_in_dict(
 | |
|     data: Any, prefix: str, exclude: set[str] | None = None
 | |
| ) -> Any:
 | |
|     """Recursively prefix all substitution references in a data structure.
 | |
| 
 | |
|     Args:
 | |
|         data: YAML data structure (dict, list, or scalar)
 | |
|         prefix: Prefix to add to substitution names
 | |
|         exclude: Set of substitution names to exclude from prefixing
 | |
| 
 | |
|     Returns:
 | |
|         Data structure with prefixed substitution references
 | |
|     """
 | |
|     if exclude is None:
 | |
|         exclude = set()
 | |
| 
 | |
|     def replace_sub(text: str) -> str:
 | |
|         """Replace substitution references in a string."""
 | |
| 
 | |
|         def replace_match(match):
 | |
|             sub_name = match.group(1)
 | |
|             if sub_name in exclude:
 | |
|                 return match.group(0)
 | |
|             # Always use braced format in output for consistency
 | |
|             return f"${{{prefix}_{sub_name}}}"
 | |
| 
 | |
|         # Match both ${substitution} and $substitution formats
 | |
|         return re.sub(r"\$\{?(\w+)\}?", replace_match, text)
 | |
| 
 | |
|     if isinstance(data, dict):
 | |
|         result = {}
 | |
|         for key, value in data.items():
 | |
|             result[key] = prefix_substitutions_in_dict(value, prefix, exclude)
 | |
|         return result
 | |
|     if isinstance(data, list):
 | |
|         return [prefix_substitutions_in_dict(item, prefix, exclude) for item in data]
 | |
|     if isinstance(data, str):
 | |
|         return replace_sub(data)
 | |
|     return data
 | |
| 
 | |
| 
 | |
| def deduplicate_by_id(data: dict) -> dict:
 | |
|     """Deduplicate list items with the same ID.
 | |
| 
 | |
|     Keeps only the first occurrence of each ID. If items with the same ID
 | |
|     are identical, this silently deduplicates. If they differ, the first
 | |
|     one is kept (ESPHome's validation will catch if this causes issues).
 | |
| 
 | |
|     Args:
 | |
|         data: Parsed config dictionary
 | |
| 
 | |
|     Returns:
 | |
|         Config with deduplicated lists
 | |
|     """
 | |
|     if not isinstance(data, dict):
 | |
|         return data
 | |
| 
 | |
|     result = {}
 | |
|     for key, value in data.items():
 | |
|         if isinstance(value, list):
 | |
|             # Check for items with 'id' field
 | |
|             seen_ids = set()
 | |
|             deduped_list = []
 | |
| 
 | |
|             for item in value:
 | |
|                 if isinstance(item, dict) and "id" in item:
 | |
|                     item_id = item["id"]
 | |
|                     if item_id not in seen_ids:
 | |
|                         seen_ids.add(item_id)
 | |
|                         deduped_list.append(item)
 | |
|                     # else: skip duplicate ID (keep first occurrence)
 | |
|                 else:
 | |
|                     # No ID, just add it
 | |
|                     deduped_list.append(item)
 | |
| 
 | |
|             result[key] = deduped_list
 | |
|         elif isinstance(value, dict):
 | |
|             # Recursively deduplicate nested dicts
 | |
|             result[key] = deduplicate_by_id(value)
 | |
|         else:
 | |
|             result[key] = value
 | |
| 
 | |
|     return result
 | |
| 
 | |
| 
 | |
| def merge_component_configs(
 | |
|     component_names: list[str],
 | |
|     platform: str,
 | |
|     tests_dir: Path,
 | |
|     output_file: Path,
 | |
| ) -> None:
 | |
|     """Merge multiple component test configs into a single file.
 | |
| 
 | |
|     Args:
 | |
|         component_names: List of component names to merge
 | |
|         platform: Platform to merge for (e.g., "esp32-ard")
 | |
|         tests_dir: Path to tests/components directory
 | |
|         output_file: Path to output merged config file
 | |
|     """
 | |
|     if not component_names:
 | |
|         raise ValueError("No components specified")
 | |
| 
 | |
|     # Track packages to ensure they're identical
 | |
|     all_packages = None
 | |
| 
 | |
|     # Start with empty config
 | |
|     merged_config_data = {}
 | |
| 
 | |
|     # Convert tests_dir to string for caching
 | |
|     tests_dir_str = str(tests_dir)
 | |
| 
 | |
|     # Process each component
 | |
|     for comp_name in component_names:
 | |
|         comp_dir = tests_dir / comp_name
 | |
|         test_file = comp_dir / f"test.{platform}.yaml"
 | |
| 
 | |
|         if not test_file.exists():
 | |
|             raise FileNotFoundError(f"Test file not found: {test_file}")
 | |
| 
 | |
|         # Load the component's test file
 | |
|         comp_data = load_yaml_file(test_file)
 | |
| 
 | |
|         # Merge packages from all components (cross-bus merging)
 | |
|         # Components can have different packages (e.g., one with ble, another with uart)
 | |
|         # as long as they don't conflict (checked by are_buses_compatible before calling this)
 | |
|         comp_packages = extract_packages_from_yaml(comp_data)
 | |
| 
 | |
|         if all_packages is None:
 | |
|             # First component - initialize package dict
 | |
|             all_packages = comp_packages if comp_packages else {}
 | |
|         elif comp_packages:
 | |
|             # Merge packages - combine all unique package types
 | |
|             # If both have the same package type, verify they're identical
 | |
|             for pkg_name, pkg_config in comp_packages.items():
 | |
|                 if pkg_name in all_packages:
 | |
|                     # Same package type - verify config matches
 | |
|                     if all_packages[pkg_name] != pkg_config:
 | |
|                         raise ValueError(
 | |
|                             f"Component {comp_name} has conflicting config for package '{pkg_name}'. "
 | |
|                             f"Expected: {all_packages[pkg_name]}, Got: {pkg_config}. "
 | |
|                             f"Components with conflicting bus configs cannot be merged."
 | |
|                         )
 | |
|                 else:
 | |
|                     # New package type - add it
 | |
|                     all_packages[pkg_name] = pkg_config
 | |
| 
 | |
|         # Handle $component_dir by replacing with absolute path
 | |
|         # This allows components that use local file references to be grouped
 | |
|         comp_abs_dir = str(comp_dir.absolute())
 | |
| 
 | |
|         # Save top-level substitutions BEFORE expanding packages
 | |
|         # In ESPHome, top-level substitutions override package substitutions
 | |
|         top_level_subs = (
 | |
|             comp_data["substitutions"].copy()
 | |
|             if "substitutions" in comp_data and comp_data["substitutions"] is not None
 | |
|             else {}
 | |
|         )
 | |
| 
 | |
|         # Expand packages - but we'll restore substitution priority after
 | |
|         if "packages" in comp_data:
 | |
|             packages_value = comp_data["packages"]
 | |
| 
 | |
|             if isinstance(packages_value, dict):
 | |
|                 # Dict format - check each package
 | |
|                 common_bus_packages = get_common_bus_packages()
 | |
|                 for pkg_name, pkg_value in list(packages_value.items()):
 | |
|                     if pkg_name in common_bus_packages:
 | |
|                         continue
 | |
|                     if not isinstance(pkg_value, dict):
 | |
|                         continue
 | |
|                     # Component-specific package - expand its content into top level
 | |
|                     comp_data = merge_config(comp_data, pkg_value)
 | |
|             elif isinstance(packages_value, list):
 | |
|                 # List format - expand all package includes
 | |
|                 for pkg_value in packages_value:
 | |
|                     if not isinstance(pkg_value, dict):
 | |
|                         continue
 | |
|                     comp_data = merge_config(comp_data, pkg_value)
 | |
| 
 | |
|             # Remove all packages (common will be re-added at the end)
 | |
|             del comp_data["packages"]
 | |
| 
 | |
|         # Restore top-level substitution priority
 | |
|         # Top-level substitutions override any from packages
 | |
|         if "substitutions" not in comp_data or comp_data["substitutions"] is None:
 | |
|             comp_data["substitutions"] = {}
 | |
| 
 | |
|         # Merge: package subs as base, top-level subs override
 | |
|         comp_data["substitutions"].update(top_level_subs)
 | |
| 
 | |
|         # Now prefix the final merged substitutions
 | |
|         comp_data["substitutions"] = {
 | |
|             f"{comp_name}_{sub_name}": sub_value
 | |
|             for sub_name, sub_value in comp_data["substitutions"].items()
 | |
|         }
 | |
| 
 | |
|         # Add component_dir substitution with absolute path for this component
 | |
|         comp_data["substitutions"][f"{comp_name}_component_dir"] = comp_abs_dir
 | |
| 
 | |
|         # Prefix substitution references throughout the config
 | |
|         comp_data = prefix_substitutions_in_dict(comp_data, comp_name)
 | |
| 
 | |
|         # Use ESPHome's merge_config to merge this component into the result
 | |
|         # merge_config handles list merging with ID-based deduplication automatically
 | |
|         merged_config_data = merge_config(merged_config_data, comp_data)
 | |
| 
 | |
|     # Add merged packages back (union of all component packages)
 | |
|     # IMPORTANT: Only include common bus packages (spi, i2c, uart, etc.)
 | |
|     # Do NOT re-add component-specific packages as they contain unprefixed $component_dir refs
 | |
|     if all_packages:
 | |
|         # Build packages dict from merged all_packages
 | |
|         # all_packages is a dict mapping package_name -> str(package_value)
 | |
|         # We need to reconstruct the actual package values by loading them from any component
 | |
|         # Since packages with the same name must have identical configs (verified above),
 | |
|         # we can load the package value from the first component that has each package
 | |
|         common_bus_packages = get_common_bus_packages()
 | |
|         merged_packages: dict[str, Any] = {}
 | |
| 
 | |
|         # Collect packages that are included as dependencies
 | |
|         # If modbus is present, uart is included via modbus.packages.uart
 | |
|         packages_to_skip: set[str] = set()
 | |
|         for pkg_name in all_packages:
 | |
|             if pkg_name.startswith(DEPENDENCY_MARKER_PREFIX):
 | |
|                 # Extract the actual package name (remove _dep_ prefix)
 | |
|                 dep_name = pkg_name[len(DEPENDENCY_MARKER_PREFIX) :]
 | |
|                 packages_to_skip.add(dep_name)
 | |
| 
 | |
|         for pkg_name in all_packages:
 | |
|             # Skip dependency markers
 | |
|             if pkg_name.startswith(DEPENDENCY_MARKER_PREFIX):
 | |
|                 continue
 | |
|             # Skip non-common-bus packages
 | |
|             if pkg_name not in common_bus_packages:
 | |
|                 continue
 | |
|             # Skip packages that are included as dependencies of other packages
 | |
|             # This prevents duplicate definitions (e.g., uart via modbus + uart separately)
 | |
|             if pkg_name in packages_to_skip:
 | |
|                 continue
 | |
| 
 | |
|             # Find a component that has this package and extract its value
 | |
|             # Uses cached lookup to avoid re-loading the same files
 | |
|             for comp_name in component_names:
 | |
|                 comp_packages = get_component_packages(
 | |
|                     comp_name, platform, tests_dir_str
 | |
|                 )
 | |
|                 if pkg_name in comp_packages:
 | |
|                     merged_packages[pkg_name] = comp_packages[pkg_name]
 | |
|                     break
 | |
| 
 | |
|         if merged_packages:
 | |
|             merged_config_data["packages"] = merged_packages
 | |
| 
 | |
|     # Deduplicate items with same ID (keeps first occurrence)
 | |
|     merged_config_data = deduplicate_by_id(merged_config_data)
 | |
| 
 | |
|     # Remove esphome section since it will be provided by the wrapper file
 | |
|     # The wrapper file includes this merged config via packages and provides
 | |
|     # the proper esphome: section with name, platform, etc.
 | |
|     if "esphome" in merged_config_data:
 | |
|         del merged_config_data["esphome"]
 | |
| 
 | |
|     # Write merged config
 | |
|     output_file.parent.mkdir(parents=True, exist_ok=True)
 | |
|     yaml_content = yaml_util.dump(merged_config_data)
 | |
|     output_file.write_text(yaml_content)
 | |
| 
 | |
|     print(f"Successfully merged {len(component_names)} components into {output_file}")
 | |
| 
 | |
| 
 | |
| def main() -> None:
 | |
|     """Main entry point."""
 | |
|     parser = argparse.ArgumentParser(
 | |
|         description="Merge multiple component test configs into a single file"
 | |
|     )
 | |
|     parser.add_argument(
 | |
|         "--components",
 | |
|         "-c",
 | |
|         required=True,
 | |
|         help="Comma-separated list of component names to merge",
 | |
|     )
 | |
|     parser.add_argument(
 | |
|         "--platform",
 | |
|         "-p",
 | |
|         required=True,
 | |
|         help="Platform to merge for (e.g., esp32-ard)",
 | |
|     )
 | |
|     parser.add_argument(
 | |
|         "--output",
 | |
|         "-o",
 | |
|         required=True,
 | |
|         type=Path,
 | |
|         help="Output file path for merged config",
 | |
|     )
 | |
|     parser.add_argument(
 | |
|         "--tests-dir",
 | |
|         type=Path,
 | |
|         default=Path("tests/components"),
 | |
|         help="Path to tests/components directory",
 | |
|     )
 | |
| 
 | |
|     args = parser.parse_args()
 | |
| 
 | |
|     component_names = [c.strip() for c in args.components.split(",")]
 | |
| 
 | |
|     try:
 | |
|         merge_component_configs(
 | |
|             component_names=component_names,
 | |
|             platform=args.platform,
 | |
|             tests_dir=args.tests_dir,
 | |
|             output_file=args.output,
 | |
|         )
 | |
|     except Exception as e:
 | |
|         print(f"Error merging configs: {e}", file=sys.stderr)
 | |
|         import traceback
 | |
| 
 | |
|         traceback.print_exc()
 | |
|         sys.exit(1)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 |