1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-24 04:33:49 +01:00

[ci] Add automated memory impact analysis for pull requests (#11242)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: pre-commit-ci-lite[bot] <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com>
This commit is contained in:
J. Nick Koston
2025-10-19 08:43:38 -10:00
committed by GitHub
parent f25af18655
commit a0922bc8b0
24 changed files with 3772 additions and 49 deletions

View File

@@ -34,6 +34,8 @@ from typing import Any
# Add esphome to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from helpers import BASE_BUS_COMPONENTS
from esphome import yaml_util
from esphome.config_helpers import Extend, Remove
@@ -67,18 +69,6 @@ NO_BUSES_SIGNATURE = "no_buses"
# Isolated components have unique signatures and cannot be merged with others
ISOLATED_SIGNATURE_PREFIX = "isolated_"
# Base bus components - these ARE the bus implementations and should not
# be flagged as needing migration since they are the platform/base components
BASE_BUS_COMPONENTS = {
"i2c",
"spi",
"uart",
"modbus",
"canbus",
"remote_transmitter",
"remote_receiver",
}
# Components that must be tested in isolation (not grouped or batched with others)
# These have known build issues that prevent grouping
# NOTE: This should be kept in sync with both test_build_components and split_components_for_ci.py

23
script/ci_helpers.py Executable file
View File

@@ -0,0 +1,23 @@
"""Common helper functions for CI scripts."""
from __future__ import annotations
import os
def write_github_output(outputs: dict[str, str | int]) -> None:
"""Write multiple outputs to GITHUB_OUTPUT or stdout.
When running in GitHub Actions, writes to the GITHUB_OUTPUT file.
When running locally, writes to stdout for debugging.
Args:
outputs: Dictionary of key-value pairs to write
"""
github_output = os.environ.get("GITHUB_OUTPUT")
if github_output:
with open(github_output, "a", encoding="utf-8") as f:
f.writelines(f"{key}={value}\n" for key, value in outputs.items())
else:
for key, value in outputs.items():
print(f"{key}={value}")

View File

@@ -0,0 +1,570 @@
#!/usr/bin/env python3
"""Post or update a PR comment with memory impact analysis results.
This script creates or updates a GitHub PR comment with memory usage changes.
It uses the GitHub CLI (gh) to manage comments and maintains a single comment
that gets updated on subsequent runs.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
import subprocess
import sys
from jinja2 import Environment, FileSystemLoader
# Add esphome to path for analyze_memory import
sys.path.insert(0, str(Path(__file__).parent.parent))
# pylint: disable=wrong-import-position
# Comment marker to identify our memory impact comments
COMMENT_MARKER = "<!-- esphome-memory-impact-analysis -->"
# Thresholds for emoji significance indicators (percentage)
OVERALL_CHANGE_THRESHOLD = 1.0 # Overall RAM/Flash changes
COMPONENT_CHANGE_THRESHOLD = 3.0 # Component breakdown changes
# Display limits for tables
MAX_COMPONENT_BREAKDOWN_ROWS = 20 # Maximum components to show in breakdown table
MAX_CHANGED_SYMBOLS_ROWS = 30 # Maximum changed symbols to show
MAX_NEW_SYMBOLS_ROWS = 15 # Maximum new symbols to show
MAX_REMOVED_SYMBOLS_ROWS = 15 # Maximum removed symbols to show
# Symbol display formatting
SYMBOL_DISPLAY_MAX_LENGTH = 100 # Max length before using <details> tag
SYMBOL_DISPLAY_TRUNCATE_LENGTH = 97 # Length to truncate in summary
# Component change noise threshold
COMPONENT_CHANGE_NOISE_THRESHOLD = 2 # Ignore component changes ≤ this many bytes
# Template directory
TEMPLATE_DIR = Path(__file__).parent / "templates"
def load_analysis_json(json_path: str) -> dict | None:
"""Load memory analysis results from JSON file.
Args:
json_path: Path to analysis JSON file
Returns:
Dictionary with analysis results or None if file doesn't exist/can't be loaded
"""
json_file = Path(json_path)
if not json_file.exists():
print(f"Analysis JSON not found: {json_path}", file=sys.stderr)
return None
try:
with open(json_file, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError) as e:
print(f"Failed to load analysis JSON: {e}", file=sys.stderr)
return None
def format_bytes(bytes_value: int) -> str:
"""Format bytes value with comma separators.
Args:
bytes_value: Number of bytes
Returns:
Formatted string with comma separators (e.g., "1,234 bytes")
"""
return f"{bytes_value:,} bytes"
def format_change(before: int, after: int, threshold: float | None = None) -> str:
"""Format memory change with delta and percentage.
Args:
before: Memory usage before change (in bytes)
after: Memory usage after change (in bytes)
threshold: Optional percentage threshold for "significant" change.
If provided, adds supplemental emoji (🎉/🚨/🔸/✅) to chart icons.
If None, only shows chart icons (📈/📉/➡️).
Returns:
Formatted string with delta and percentage
"""
delta = after - before
percentage = 0.0 if before == 0 else (delta / before) * 100
# Always use chart icons to show direction
if delta > 0:
delta_str = f"+{delta:,} bytes"
trend_icon = "📈"
# Add supplemental emoji based on threshold if provided
if threshold is not None:
significance = "🚨" if abs(percentage) > threshold else "🔸"
emoji = f"{trend_icon} {significance}"
else:
emoji = trend_icon
elif delta < 0:
delta_str = f"{delta:,} bytes"
trend_icon = "📉"
# Add supplemental emoji based on threshold if provided
if threshold is not None:
significance = "🎉" if abs(percentage) > threshold else ""
emoji = f"{trend_icon} {significance}"
else:
emoji = trend_icon
else:
delta_str = "+0 bytes"
emoji = "➡️"
# Format percentage with sign
if percentage > 0:
pct_str = f"+{percentage:.2f}%"
elif percentage < 0:
pct_str = f"{percentage:.2f}%"
else:
pct_str = "0.00%"
return f"{emoji} {delta_str} ({pct_str})"
def prepare_symbol_changes_data(
target_symbols: dict | None, pr_symbols: dict | None
) -> dict | None:
"""Prepare symbol changes data for template rendering.
Args:
target_symbols: Symbol name to size mapping for target branch
pr_symbols: Symbol name to size mapping for PR branch
Returns:
Dictionary with changed, new, and removed symbols, or None if no changes
"""
if not target_symbols or not pr_symbols:
return None
# Find all symbols that exist in both branches or only in one
all_symbols = set(target_symbols.keys()) | set(pr_symbols.keys())
# Track changes
changed_symbols: list[
tuple[str, int, int, int]
] = [] # (symbol, target_size, pr_size, delta)
new_symbols: list[tuple[str, int]] = [] # (symbol, size)
removed_symbols: list[tuple[str, int]] = [] # (symbol, size)
for symbol in all_symbols:
target_size = target_symbols.get(symbol, 0)
pr_size = pr_symbols.get(symbol, 0)
if target_size == 0 and pr_size > 0:
# New symbol
new_symbols.append((symbol, pr_size))
elif target_size > 0 and pr_size == 0:
# Removed symbol
removed_symbols.append((symbol, target_size))
elif target_size != pr_size:
# Changed symbol
delta = pr_size - target_size
changed_symbols.append((symbol, target_size, pr_size, delta))
if not changed_symbols and not new_symbols and not removed_symbols:
return None
# Sort by size/delta
changed_symbols.sort(key=lambda x: abs(x[3]), reverse=True)
new_symbols.sort(key=lambda x: x[1], reverse=True)
removed_symbols.sort(key=lambda x: x[1], reverse=True)
return {
"changed_symbols": changed_symbols,
"new_symbols": new_symbols,
"removed_symbols": removed_symbols,
}
def prepare_component_breakdown_data(
target_analysis: dict | None, pr_analysis: dict | None
) -> list[tuple[str, int, int, int]] | None:
"""Prepare component breakdown data for template rendering.
Args:
target_analysis: Component memory breakdown for target branch
pr_analysis: Component memory breakdown for PR branch
Returns:
List of tuples (component, target_flash, pr_flash, delta), or None if no changes
"""
if not target_analysis or not pr_analysis:
return None
# Combine all components from both analyses
all_components = set(target_analysis.keys()) | set(pr_analysis.keys())
# Filter to components that have changed (ignoring noise)
changed_components: list[
tuple[str, int, int, int]
] = [] # (comp, target_flash, pr_flash, delta)
for comp in all_components:
target_mem = target_analysis.get(comp, {})
pr_mem = pr_analysis.get(comp, {})
target_flash = target_mem.get("flash_total", 0)
pr_flash = pr_mem.get("flash_total", 0)
# Only include if component has meaningful change (above noise threshold)
delta = pr_flash - target_flash
if abs(delta) > COMPONENT_CHANGE_NOISE_THRESHOLD:
changed_components.append((comp, target_flash, pr_flash, delta))
if not changed_components:
return None
# Sort by absolute delta (largest changes first)
changed_components.sort(key=lambda x: abs(x[3]), reverse=True)
return changed_components
def create_comment_body(
components: list[str],
platform: str,
target_ram: int,
target_flash: int,
pr_ram: int,
pr_flash: int,
target_analysis: dict | None = None,
pr_analysis: dict | None = None,
target_symbols: dict | None = None,
pr_symbols: dict | None = None,
target_cache_hit: bool = False,
) -> str:
"""Create the comment body with memory impact analysis using Jinja2 templates.
Args:
components: List of component names (merged config)
platform: Platform name
target_ram: RAM usage in target branch
target_flash: Flash usage in target branch
pr_ram: RAM usage in PR branch
pr_flash: Flash usage in PR branch
target_analysis: Optional component breakdown for target branch
pr_analysis: Optional component breakdown for PR branch
target_symbols: Optional symbol map for target branch
pr_symbols: Optional symbol map for PR branch
target_cache_hit: Whether target branch analysis was loaded from cache
Returns:
Formatted comment body
"""
# Set up Jinja2 environment
env = Environment(
loader=FileSystemLoader(TEMPLATE_DIR),
trim_blocks=True,
lstrip_blocks=True,
)
# Register custom filters
env.filters["format_bytes"] = format_bytes
env.filters["format_change"] = format_change
# Prepare template context
context = {
"comment_marker": COMMENT_MARKER,
"platform": platform,
"target_ram": format_bytes(target_ram),
"pr_ram": format_bytes(pr_ram),
"target_flash": format_bytes(target_flash),
"pr_flash": format_bytes(pr_flash),
"ram_change": format_change(
target_ram, pr_ram, threshold=OVERALL_CHANGE_THRESHOLD
),
"flash_change": format_change(
target_flash, pr_flash, threshold=OVERALL_CHANGE_THRESHOLD
),
"target_cache_hit": target_cache_hit,
"component_change_threshold": COMPONENT_CHANGE_THRESHOLD,
}
# Format components list
if len(components) == 1:
context["components_str"] = f"`{components[0]}`"
context["config_note"] = "a representative test configuration"
else:
context["components_str"] = ", ".join(f"`{c}`" for c in sorted(components))
context["config_note"] = (
f"a merged configuration with {len(components)} components"
)
# Prepare component breakdown if available
component_breakdown = ""
if target_analysis and pr_analysis:
changed_components = prepare_component_breakdown_data(
target_analysis, pr_analysis
)
if changed_components:
template = env.get_template("ci_memory_impact_component_breakdown.j2")
component_breakdown = template.render(
changed_components=changed_components,
format_bytes=format_bytes,
format_change=format_change,
component_change_threshold=COMPONENT_CHANGE_THRESHOLD,
max_rows=MAX_COMPONENT_BREAKDOWN_ROWS,
)
# Prepare symbol changes if available
symbol_changes = ""
if target_symbols and pr_symbols:
symbol_data = prepare_symbol_changes_data(target_symbols, pr_symbols)
if symbol_data:
template = env.get_template("ci_memory_impact_symbol_changes.j2")
symbol_changes = template.render(
**symbol_data,
format_bytes=format_bytes,
format_change=format_change,
max_changed_rows=MAX_CHANGED_SYMBOLS_ROWS,
max_new_rows=MAX_NEW_SYMBOLS_ROWS,
max_removed_rows=MAX_REMOVED_SYMBOLS_ROWS,
symbol_max_length=SYMBOL_DISPLAY_MAX_LENGTH,
symbol_truncate_length=SYMBOL_DISPLAY_TRUNCATE_LENGTH,
)
if not target_analysis or not pr_analysis:
print("No ELF files provided, skipping detailed analysis", file=sys.stderr)
context["component_breakdown"] = component_breakdown
context["symbol_changes"] = symbol_changes
# Render main template
template = env.get_template("ci_memory_impact_comment_template.j2")
return template.render(**context)
def find_existing_comment(pr_number: str) -> str | None:
"""Find existing memory impact comment on the PR.
Args:
pr_number: PR number
Returns:
Comment numeric ID if found, None otherwise
Raises:
subprocess.CalledProcessError: If gh command fails
"""
print(f"DEBUG: Looking for existing comment on PR #{pr_number}", file=sys.stderr)
# Use gh api to get comments directly - this returns the numeric id field
result = subprocess.run(
[
"gh",
"api",
f"/repos/{{owner}}/{{repo}}/issues/{pr_number}/comments",
"--jq",
".[] | {id, body}",
],
capture_output=True,
text=True,
check=True,
)
print(
f"DEBUG: gh api comments output (first 500 chars):\n{result.stdout[:500]}",
file=sys.stderr,
)
# Parse comments and look for our marker
comment_count = 0
for line in result.stdout.strip().split("\n"):
if not line:
continue
try:
comment = json.loads(line)
comment_count += 1
comment_id = comment.get("id")
print(
f"DEBUG: Checking comment {comment_count}: id={comment_id}",
file=sys.stderr,
)
body = comment.get("body", "")
if COMMENT_MARKER in body:
print(
f"DEBUG: Found existing comment with id={comment_id}",
file=sys.stderr,
)
# Return the numeric id
return str(comment_id)
print("DEBUG: Comment does not contain marker", file=sys.stderr)
except json.JSONDecodeError as e:
print(f"DEBUG: JSON decode error: {e}", file=sys.stderr)
continue
print(
f"DEBUG: No existing comment found (checked {comment_count} comments)",
file=sys.stderr,
)
return None
def update_existing_comment(comment_id: str, comment_body: str) -> None:
"""Update an existing comment.
Args:
comment_id: Comment ID to update
comment_body: New comment body text
Raises:
subprocess.CalledProcessError: If gh command fails
"""
print(f"DEBUG: Updating existing comment {comment_id}", file=sys.stderr)
result = subprocess.run(
[
"gh",
"api",
f"/repos/{{owner}}/{{repo}}/issues/comments/{comment_id}",
"-X",
"PATCH",
"-f",
f"body={comment_body}",
],
check=True,
capture_output=True,
text=True,
)
print(f"DEBUG: Update response: {result.stdout}", file=sys.stderr)
def create_new_comment(pr_number: str, comment_body: str) -> None:
"""Create a new PR comment.
Args:
pr_number: PR number
comment_body: Comment body text
Raises:
subprocess.CalledProcessError: If gh command fails
"""
print(f"DEBUG: Posting new comment on PR #{pr_number}", file=sys.stderr)
result = subprocess.run(
["gh", "pr", "comment", pr_number, "--body", comment_body],
check=True,
capture_output=True,
text=True,
)
print(f"DEBUG: Post response: {result.stdout}", file=sys.stderr)
def post_or_update_comment(pr_number: str, comment_body: str) -> None:
"""Post a new comment or update existing one.
Args:
pr_number: PR number
comment_body: Comment body text
Raises:
subprocess.CalledProcessError: If gh command fails
"""
# Look for existing comment
existing_comment_id = find_existing_comment(pr_number)
if existing_comment_id and existing_comment_id != "None":
update_existing_comment(existing_comment_id, comment_body)
else:
create_new_comment(pr_number, comment_body)
print("Comment posted/updated successfully", file=sys.stderr)
def main() -> int:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Post or update PR comment with memory impact analysis"
)
parser.add_argument("--pr-number", required=True, help="PR number")
parser.add_argument(
"--components",
required=True,
help='JSON array of component names (e.g., \'["api", "wifi"]\')',
)
parser.add_argument("--platform", required=True, help="Platform name")
parser.add_argument(
"--target-ram", type=int, required=True, help="Target branch RAM usage"
)
parser.add_argument(
"--target-flash", type=int, required=True, help="Target branch flash usage"
)
parser.add_argument("--pr-ram", type=int, required=True, help="PR branch RAM usage")
parser.add_argument(
"--pr-flash", type=int, required=True, help="PR branch flash usage"
)
parser.add_argument(
"--target-json",
help="Optional path to target branch analysis JSON (for detailed analysis)",
)
parser.add_argument(
"--pr-json",
help="Optional path to PR branch analysis JSON (for detailed analysis)",
)
parser.add_argument(
"--target-cache-hit",
action="store_true",
help="Indicates that target branch analysis was loaded from cache",
)
args = parser.parse_args()
# Parse components from JSON
try:
components = json.loads(args.components)
if not isinstance(components, list):
print("Error: --components must be a JSON array", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error parsing --components JSON: {e}", file=sys.stderr)
sys.exit(1)
# Load analysis JSON files
target_analysis = None
pr_analysis = None
target_symbols = None
pr_symbols = None
if args.target_json:
target_data = load_analysis_json(args.target_json)
if target_data and target_data.get("detailed_analysis"):
target_analysis = target_data["detailed_analysis"].get("components")
target_symbols = target_data["detailed_analysis"].get("symbols")
if args.pr_json:
pr_data = load_analysis_json(args.pr_json)
if pr_data and pr_data.get("detailed_analysis"):
pr_analysis = pr_data["detailed_analysis"].get("components")
pr_symbols = pr_data["detailed_analysis"].get("symbols")
# Create comment body
# Note: Memory totals (RAM/Flash) are summed across all builds if multiple were run.
comment_body = create_comment_body(
components=components,
platform=args.platform,
target_ram=args.target_ram,
target_flash=args.target_flash,
pr_ram=args.pr_ram,
pr_flash=args.pr_flash,
target_analysis=target_analysis,
pr_analysis=pr_analysis,
target_symbols=target_symbols,
pr_symbols=pr_symbols,
target_cache_hit=args.target_cache_hit,
)
# Post or update comment
post_or_update_comment(args.pr_number, comment_body)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,281 @@
#!/usr/bin/env python3
"""Extract memory usage statistics from ESPHome build output.
This script parses the PlatformIO build output to extract RAM and flash
usage statistics for a compiled component. It's used by the CI workflow to
compare memory usage between branches.
The script reads compile output from stdin and looks for the standard
PlatformIO output format:
RAM: [==== ] 36.1% (used 29548 bytes from 81920 bytes)
Flash: [=== ] 34.0% (used 348511 bytes from 1023984 bytes)
Optionally performs detailed memory analysis if a build directory is provided.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
import re
import sys
# Add esphome to path
sys.path.insert(0, str(Path(__file__).parent.parent))
# pylint: disable=wrong-import-position
from esphome.analyze_memory import MemoryAnalyzer
from esphome.platformio_api import IDEData
from script.ci_helpers import write_github_output
# Regex patterns for extracting memory usage from PlatformIO output
_RAM_PATTERN = re.compile(r"RAM:\s+\[.*?\]\s+\d+\.\d+%\s+\(used\s+(\d+)\s+bytes")
_FLASH_PATTERN = re.compile(r"Flash:\s+\[.*?\]\s+\d+\.\d+%\s+\(used\s+(\d+)\s+bytes")
_BUILD_PATH_PATTERN = re.compile(r"Build path: (.+)")
def extract_from_compile_output(
output_text: str,
) -> tuple[int | None, int | None, str | None]:
"""Extract memory usage and build directory from PlatformIO compile output.
Supports multiple builds (for component groups or isolated components).
When test_build_components.py creates multiple builds, this sums the
memory usage across all builds.
Looks for lines like:
RAM: [==== ] 36.1% (used 29548 bytes from 81920 bytes)
Flash: [=== ] 34.0% (used 348511 bytes from 1023984 bytes)
Also extracts build directory from lines like:
INFO Compiling app... Build path: /path/to/build
Args:
output_text: Compile output text (may contain multiple builds)
Returns:
Tuple of (total_ram_bytes, total_flash_bytes, build_dir) or (None, None, None) if not found
"""
# Find all RAM and Flash matches (may be multiple builds)
ram_matches = _RAM_PATTERN.findall(output_text)
flash_matches = _FLASH_PATTERN.findall(output_text)
if not ram_matches or not flash_matches:
return None, None, None
# Sum all builds (handles multiple component groups)
total_ram = sum(int(match) for match in ram_matches)
total_flash = sum(int(match) for match in flash_matches)
# Extract build directory from ESPHome's explicit build path output
# Look for: INFO Compiling app... Build path: /path/to/build
# Note: Multiple builds reuse the same build path (each overwrites the previous)
build_dir = None
if match := _BUILD_PATH_PATTERN.search(output_text):
build_dir = match.group(1).strip()
return total_ram, total_flash, build_dir
def run_detailed_analysis(build_dir: str) -> dict | None:
"""Run detailed memory analysis on build directory.
Args:
build_dir: Path to ESPHome build directory
Returns:
Dictionary with analysis results or None if analysis fails
"""
build_path = Path(build_dir)
if not build_path.exists():
print(f"Build directory not found: {build_dir}", file=sys.stderr)
return None
# Find firmware.elf
elf_path = None
for elf_candidate in [
build_path / "firmware.elf",
build_path / ".pioenvs" / build_path.name / "firmware.elf",
]:
if elf_candidate.exists():
elf_path = str(elf_candidate)
break
if not elf_path:
print(f"firmware.elf not found in {build_dir}", file=sys.stderr)
return None
# Find idedata.json - check multiple locations
device_name = build_path.name
idedata_candidates = [
# In .pioenvs for test builds
build_path / ".pioenvs" / device_name / "idedata.json",
# In .esphome/idedata for regular builds
Path.home() / ".esphome" / "idedata" / f"{device_name}.json",
# Check parent directories for .esphome/idedata (for test_build_components)
build_path.parent.parent.parent / "idedata" / f"{device_name}.json",
]
idedata = None
for idedata_path in idedata_candidates:
if not idedata_path.exists():
continue
try:
with open(idedata_path, encoding="utf-8") as f:
raw_data = json.load(f)
idedata = IDEData(raw_data)
print(f"Loaded idedata from: {idedata_path}", file=sys.stderr)
break
except (json.JSONDecodeError, OSError) as e:
print(
f"Warning: Failed to load idedata from {idedata_path}: {e}",
file=sys.stderr,
)
analyzer = MemoryAnalyzer(elf_path, idedata=idedata)
components = analyzer.analyze()
# Convert to JSON-serializable format
result = {
"components": {
name: {
"text": mem.text_size,
"rodata": mem.rodata_size,
"data": mem.data_size,
"bss": mem.bss_size,
"flash_total": mem.flash_total,
"ram_total": mem.ram_total,
"symbol_count": mem.symbol_count,
}
for name, mem in components.items()
},
"symbols": {},
}
# Build symbol map
for section in analyzer.sections.values():
for symbol_name, size, _ in section.symbols:
if size > 0:
demangled = analyzer._demangle_symbol(symbol_name)
result["symbols"][demangled] = size
return result
def main() -> int:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Extract memory usage from ESPHome build output"
)
parser.add_argument(
"--output-env",
action="store_true",
help="Output to GITHUB_OUTPUT environment file",
)
parser.add_argument(
"--build-dir",
help="Optional build directory for detailed memory analysis (overrides auto-detection)",
)
parser.add_argument(
"--output-json",
help="Optional path to save detailed analysis JSON",
)
parser.add_argument(
"--output-build-dir",
help="Optional path to write the detected build directory",
)
args = parser.parse_args()
# Read compile output from stdin
compile_output = sys.stdin.read()
# Extract memory usage and build directory
ram_bytes, flash_bytes, detected_build_dir = extract_from_compile_output(
compile_output
)
if ram_bytes is None or flash_bytes is None:
print("Failed to extract memory usage from compile output", file=sys.stderr)
print("Expected lines like:", file=sys.stderr)
print(
" RAM: [==== ] 36.1% (used 29548 bytes from 81920 bytes)",
file=sys.stderr,
)
print(
" Flash: [=== ] 34.0% (used 348511 bytes from 1023984 bytes)",
file=sys.stderr,
)
return 1
# Count how many builds were found
num_builds = len(_RAM_PATTERN.findall(compile_output))
if num_builds > 1:
print(
f"Found {num_builds} builds - summing memory usage across all builds",
file=sys.stderr,
)
print(
"WARNING: Detailed analysis will only cover the last build",
file=sys.stderr,
)
print(f"Total RAM: {ram_bytes} bytes", file=sys.stderr)
print(f"Total Flash: {flash_bytes} bytes", file=sys.stderr)
# Determine which build directory to use (explicit arg overrides auto-detection)
build_dir = args.build_dir or detected_build_dir
if detected_build_dir:
print(f"Detected build directory: {detected_build_dir}", file=sys.stderr)
if num_builds > 1:
print(
f" (using last of {num_builds} builds for detailed analysis)",
file=sys.stderr,
)
# Write build directory to file if requested
if args.output_build_dir and build_dir:
build_dir_path = Path(args.output_build_dir)
build_dir_path.parent.mkdir(parents=True, exist_ok=True)
build_dir_path.write_text(build_dir)
print(f"Wrote build directory to {args.output_build_dir}", file=sys.stderr)
# Run detailed analysis if build directory available
detailed_analysis = None
if build_dir:
print(f"Running detailed analysis on {build_dir}", file=sys.stderr)
detailed_analysis = run_detailed_analysis(build_dir)
# Save JSON output if requested
if args.output_json:
output_data = {
"ram_bytes": ram_bytes,
"flash_bytes": flash_bytes,
"detailed_analysis": detailed_analysis,
}
output_path = Path(args.output_json)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(output_data, f, indent=2)
print(f"Saved analysis to {args.output_json}", file=sys.stderr)
if args.output_env:
# Output to GitHub Actions
write_github_output(
{
"ram_usage": ram_bytes,
"flash_usage": flash_bytes,
}
)
else:
print(f"{ram_bytes},{flash_bytes}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -10,7 +10,13 @@ what files have changed. It outputs JSON with the following structure:
"clang_format": true/false,
"python_linters": true/false,
"changed_components": ["component1", "component2", ...],
"component_test_count": 5
"component_test_count": 5,
"memory_impact": {
"should_run": "true/false",
"components": ["component1", "component2", ...],
"platform": "esp32-idf",
"use_merged_config": "true"
}
}
The CI workflow uses this information to:
@@ -20,6 +26,7 @@ The CI workflow uses this information to:
- Skip or run Python linters (ruff, flake8, pylint, pyupgrade)
- Determine which components to test individually
- Decide how to split component tests (if there are many)
- Run memory impact analysis whenever there are changed components (merged config), and also for core-only changes
Usage:
python script/determine-jobs.py [-b BRANCH]
@@ -31,6 +38,8 @@ Options:
from __future__ import annotations
import argparse
from collections import Counter
from enum import StrEnum
from functools import cache
import json
import os
@@ -40,16 +49,47 @@ import sys
from typing import Any
from helpers import (
BASE_BUS_COMPONENTS,
CPP_FILE_EXTENSIONS,
ESPHOME_COMPONENTS_PATH,
PYTHON_FILE_EXTENSIONS,
changed_files,
get_all_dependencies,
get_component_from_path,
get_component_test_files,
get_components_from_integration_fixtures,
parse_test_filename,
root_path,
)
class Platform(StrEnum):
"""Platform identifiers for memory impact analysis."""
ESP8266_ARD = "esp8266-ard"
ESP32_IDF = "esp32-idf"
ESP32_C3_IDF = "esp32-c3-idf"
ESP32_C6_IDF = "esp32-c6-idf"
ESP32_S2_IDF = "esp32-s2-idf"
ESP32_S3_IDF = "esp32-s3-idf"
# Memory impact analysis constants
MEMORY_IMPACT_FALLBACK_COMPONENT = "api" # Representative component for core changes
MEMORY_IMPACT_FALLBACK_PLATFORM = Platform.ESP32_IDF # Most representative platform
# Platform preference order for memory impact analysis
# Prefer newer platforms first as they represent the future of ESPHome
# ESP8266 is most constrained but many new features don't support it
MEMORY_IMPACT_PLATFORM_PREFERENCE = [
Platform.ESP32_C6_IDF, # ESP32-C6 IDF (newest, supports Thread/Zigbee)
Platform.ESP8266_ARD, # ESP8266 Arduino (most memory constrained - best for impact analysis)
Platform.ESP32_IDF, # ESP32 IDF platform (primary ESP32 platform, most representative)
Platform.ESP32_C3_IDF, # ESP32-C3 IDF
Platform.ESP32_S2_IDF, # ESP32-S2 IDF
Platform.ESP32_S3_IDF, # ESP32-S3 IDF
]
def should_run_integration_tests(branch: str | None = None) -> bool:
"""Determine if integration tests should run based on changed files.
@@ -105,12 +145,9 @@ def should_run_integration_tests(branch: str | None = None) -> bool:
# Check if any required components changed
for file in files:
if file.startswith(ESPHOME_COMPONENTS_PATH):
parts = file.split("/")
if len(parts) >= 3:
component = parts[2]
if component in all_required_components:
return True
component = get_component_from_path(file)
if component and component in all_required_components:
return True
return False
@@ -224,10 +261,136 @@ def _component_has_tests(component: str) -> bool:
Returns:
True if the component has test YAML files
"""
tests_dir = Path(root_path) / "tests" / "components" / component
if not tests_dir.exists():
return False
return any(tests_dir.glob("test.*.yaml"))
return bool(get_component_test_files(component))
def detect_memory_impact_config(
branch: str | None = None,
) -> dict[str, Any]:
"""Determine memory impact analysis configuration.
Always runs memory impact analysis when there are changed components,
building a merged configuration with all changed components (like
test_build_components.py does) to get comprehensive memory analysis.
Args:
branch: Branch to compare against
Returns:
Dictionary with memory impact analysis parameters:
- should_run: "true" or "false"
- components: list of component names to analyze
- platform: platform name for the merged build
- use_merged_config: "true" (always use merged config)
"""
# Get actually changed files (not dependencies)
files = changed_files(branch)
# Find all changed components (excluding core and base bus components)
changed_component_set: set[str] = set()
has_core_changes = False
for file in files:
component = get_component_from_path(file)
if component:
# Skip base bus components as they're used across many builds
if component not in BASE_BUS_COMPONENTS:
changed_component_set.add(component)
elif file.startswith("esphome/"):
# Core ESPHome files changed (not component-specific)
has_core_changes = True
# If no components changed but core changed, test representative component
force_fallback_platform = False
if not changed_component_set and has_core_changes:
print(
f"Memory impact: No components changed, but core files changed. "
f"Testing {MEMORY_IMPACT_FALLBACK_COMPONENT} component on {MEMORY_IMPACT_FALLBACK_PLATFORM}.",
file=sys.stderr,
)
changed_component_set.add(MEMORY_IMPACT_FALLBACK_COMPONENT)
force_fallback_platform = True # Use fallback platform (most representative)
elif not changed_component_set:
# No components and no core changes
return {"should_run": "false"}
# Find components that have tests and collect their supported platforms
components_with_tests: list[str] = []
component_platforms_map: dict[
str, set[Platform]
] = {} # Track which platforms each component supports
for component in sorted(changed_component_set):
# Look for test files on preferred platforms
test_files = get_component_test_files(component)
if not test_files:
continue
# Check if component has tests for any preferred platform
available_platforms = [
platform
for test_file in test_files
if (platform := parse_test_filename(test_file)[1]) != "all"
and platform in MEMORY_IMPACT_PLATFORM_PREFERENCE
]
if not available_platforms:
continue
component_platforms_map[component] = set(available_platforms)
components_with_tests.append(component)
# If no components have tests, don't run memory impact
if not components_with_tests:
return {"should_run": "false"}
# Find common platforms supported by ALL components
# This ensures we can build all components together in a merged config
common_platforms = set(MEMORY_IMPACT_PLATFORM_PREFERENCE)
for component, platforms in component_platforms_map.items():
common_platforms &= platforms
# Select the most preferred platform from the common set
# Exception: for core changes, use fallback platform (most representative of codebase)
if force_fallback_platform:
platform = MEMORY_IMPACT_FALLBACK_PLATFORM
elif common_platforms:
# Pick the most preferred platform that all components support
platform = min(common_platforms, key=MEMORY_IMPACT_PLATFORM_PREFERENCE.index)
else:
# No common platform - pick the most commonly supported platform
# This allows testing components individually even if they can't be merged
# Count how many components support each platform
platform_counts = Counter(
p for platforms in component_platforms_map.values() for p in platforms
)
# Pick the platform supported by most components, preferring earlier in MEMORY_IMPACT_PLATFORM_PREFERENCE
platform = max(
platform_counts.keys(),
key=lambda p: (
platform_counts[p],
-MEMORY_IMPACT_PLATFORM_PREFERENCE.index(p),
),
)
# Debug output
print("Memory impact analysis:", file=sys.stderr)
print(f" Changed components: {sorted(changed_component_set)}", file=sys.stderr)
print(f" Components with tests: {components_with_tests}", file=sys.stderr)
print(
f" Component platforms: {dict(sorted(component_platforms_map.items()))}",
file=sys.stderr,
)
print(f" Common platforms: {sorted(common_platforms)}", file=sys.stderr)
print(f" Selected platform: {platform}", file=sys.stderr)
return {
"should_run": "true",
"components": components_with_tests,
"platform": platform,
"use_merged_config": "true",
}
def main() -> None:
@@ -279,6 +442,9 @@ def main() -> None:
if component not in directly_changed_components
]
# Detect components for memory impact analysis (merged config)
memory_impact = detect_memory_impact_config(args.branch)
# Build output
output: dict[str, Any] = {
"integration_tests": run_integration,
@@ -292,6 +458,7 @@ def main() -> None:
"component_test_count": len(changed_components_with_tests),
"directly_changed_count": len(directly_changed_with_tests),
"dependency_only_count": len(dependency_only_components),
"memory_impact": memory_impact,
}
# Output as JSON

View File

@@ -29,6 +29,18 @@ YAML_FILE_EXTENSIONS = (".yaml", ".yml")
# Component path prefix
ESPHOME_COMPONENTS_PATH = "esphome/components/"
# Base bus components - these ARE the bus implementations and should not
# be flagged as needing migration since they are the platform/base components
BASE_BUS_COMPONENTS = {
"i2c",
"spi",
"uart",
"modbus",
"canbus",
"remote_transmitter",
"remote_receiver",
}
def parse_list_components_output(output: str) -> list[str]:
"""Parse the output from list-components.py script.
@@ -46,6 +58,65 @@ def parse_list_components_output(output: str) -> list[str]:
return [c.strip() for c in output.strip().split("\n") if c.strip()]
def parse_test_filename(test_file: Path) -> tuple[str, str]:
"""Parse test filename to extract test name and platform.
Test files follow the naming pattern: test.<platform>.yaml or test-<variant>.<platform>.yaml
Args:
test_file: Path to test file
Returns:
Tuple of (test_name, platform)
"""
parts = test_file.stem.split(".")
if len(parts) == 2:
return parts[0], parts[1] # test, platform
return parts[0], "all"
def get_component_from_path(file_path: str) -> str | None:
"""Extract component name from a file path.
Args:
file_path: Path to a file (e.g., "esphome/components/wifi/wifi.cpp")
Returns:
Component name if path is in components directory, None otherwise
"""
if not file_path.startswith(ESPHOME_COMPONENTS_PATH):
return None
parts = file_path.split("/")
if len(parts) >= 3:
return parts[2]
return None
def get_component_test_files(
component: str, *, all_variants: bool = False
) -> list[Path]:
"""Get test files for a component.
Args:
component: Component name (e.g., "wifi")
all_variants: If True, returns all test files including variants (test-*.yaml).
If False, returns only base test files (test.*.yaml).
Default is False.
Returns:
List of test file paths for the component, or empty list if none exist
"""
tests_dir = Path(root_path) / "tests" / "components" / component
if not tests_dir.exists():
return []
if all_variants:
# Match both test.*.yaml and test-*.yaml patterns
return list(tests_dir.glob("test[.-]*.yaml"))
# Match only test.*.yaml (base tests)
return list(tests_dir.glob("test.*.yaml"))
def styled(color: str | tuple[str, ...], msg: str, reset: bool = True) -> str:
prefix = "".join(color) if isinstance(color, tuple) else color
suffix = colorama.Style.RESET_ALL if reset else ""
@@ -314,11 +385,9 @@ def _filter_changed_ci(files: list[str]) -> list[str]:
# because changes in one file can affect other files in the same component.
filtered_files = []
for f in files:
if f.startswith(ESPHOME_COMPONENTS_PATH):
# Check if file belongs to any of the changed components
parts = f.split("/")
if len(parts) >= 3 and parts[2] in component_set:
filtered_files.append(f)
component = get_component_from_path(f)
if component and component in component_set:
filtered_files.append(f)
return filtered_files

View File

@@ -4,7 +4,7 @@ from collections.abc import Callable
from pathlib import Path
import sys
from helpers import changed_files, git_ls_files
from helpers import changed_files, get_component_from_path, git_ls_files
from esphome.const import (
KEY_CORE,
@@ -30,11 +30,9 @@ def get_all_component_files() -> list[str]:
def extract_component_names_array_from_files_array(files):
components = []
for file in files:
file_parts = file.split("/")
if len(file_parts) >= 4:
component_name = file_parts[2]
if component_name not in components:
components.append(component_name)
component_name = get_component_from_path(file)
if component_name and component_name not in components:
components.append(component_name)
return components

View File

@@ -28,6 +28,7 @@ from script.analyze_component_buses import (
create_grouping_signature,
merge_compatible_bus_groups,
)
from script.helpers import get_component_test_files
# Weighting for batch creation
# Isolated components can't be grouped/merged, so they count as 10x
@@ -45,17 +46,12 @@ def has_test_files(component_name: str, tests_dir: Path) -> bool:
Args:
component_name: Name of the component
tests_dir: Path to tests/components directory
tests_dir: Path to tests/components directory (unused, kept for compatibility)
Returns:
True if the component has test.*.yaml files
"""
component_dir = tests_dir / component_name
if not component_dir.exists() or not component_dir.is_dir():
return False
# Check for test.*.yaml files
return any(component_dir.glob("test.*.yaml"))
return bool(get_component_test_files(component_name))
def create_intelligent_batches(

View File

@@ -0,0 +1,27 @@
{{ comment_marker }}
## Memory Impact Analysis
**Components:** {{ components_str }}
**Platform:** `{{ platform }}`
| Metric | Target Branch | This PR | Change |
|--------|--------------|---------|--------|
| **RAM** | {{ target_ram }} | {{ pr_ram }} | {{ ram_change }} |
| **Flash** | {{ target_flash }} | {{ pr_flash }} | {{ flash_change }} |
{% if component_breakdown %}
{{ component_breakdown }}
{% endif %}
{% if symbol_changes %}
{{ symbol_changes }}
{% endif %}
{%- if target_cache_hit %}
> ⚡ Target branch analysis was loaded from cache (build skipped for faster CI).
{%- endif %}
---
> **Note:** This analysis measures **static RAM and Flash usage** only (compile-time allocation).
> **Dynamic memory (heap)** cannot be measured automatically.
> **⚠️ You must test this PR on a real device** to measure free heap and ensure no runtime memory issues.
*This analysis runs automatically when components change. Memory usage is measured from {{ config_note }}.*

View File

@@ -0,0 +1,15 @@
<details open>
<summary>📊 Component Memory Breakdown</summary>
| Component | Target Flash | PR Flash | Change |
|-----------|--------------|----------|--------|
{% for comp, target_flash, pr_flash, delta in changed_components[:max_rows] -%}
{% set threshold = component_change_threshold if comp.startswith("[esphome]") else none -%}
| `{{ comp }}` | {{ target_flash|format_bytes }} | {{ pr_flash|format_bytes }} | {{ format_change(target_flash, pr_flash, threshold=threshold) }} |
{% endfor -%}
{% if changed_components|length > max_rows -%}
| ... | ... | ... | *({{ changed_components|length - max_rows }} more components not shown)* |
{% endif -%}
</details>

View File

@@ -0,0 +1,8 @@
{#- Macro for formatting symbol names in tables -#}
{%- macro format_symbol(symbol, max_length, truncate_length) -%}
{%- if symbol|length <= max_length -%}
`{{ symbol }}`
{%- else -%}
<details><summary><code>{{ symbol[:truncate_length] }}...</code></summary><code>{{ symbol }}</code></details>
{%- endif -%}
{%- endmacro -%}

View File

@@ -0,0 +1,51 @@
{%- from 'ci_memory_impact_macros.j2' import format_symbol -%}
<details>
<summary>🔍 Symbol-Level Changes (click to expand)</summary>
{% if changed_symbols %}
### Changed Symbols
| Symbol | Target Size | PR Size | Change |
|--------|-------------|---------|--------|
{% for symbol, target_size, pr_size, delta in changed_symbols[:max_changed_rows] -%}
| {{ format_symbol(symbol, symbol_max_length, symbol_truncate_length) }} | {{ target_size|format_bytes }} | {{ pr_size|format_bytes }} | {{ format_change(target_size, pr_size) }} |
{% endfor -%}
{% if changed_symbols|length > max_changed_rows -%}
| ... | ... | ... | *({{ changed_symbols|length - max_changed_rows }} more changed symbols not shown)* |
{% endif -%}
{% endif %}
{% if new_symbols %}
### New Symbols (top {{ max_new_rows }})
| Symbol | Size |
|--------|------|
{% for symbol, size in new_symbols[:max_new_rows] -%}
| {{ format_symbol(symbol, symbol_max_length, symbol_truncate_length) }} | {{ size|format_bytes }} |
{% endfor -%}
{% if new_symbols|length > max_new_rows -%}
{% set total_new_size = new_symbols|sum(attribute=1) -%}
| *{{ new_symbols|length - max_new_rows }} more new symbols...* | *Total: {{ total_new_size|format_bytes }}* |
{% endif -%}
{% endif %}
{% if removed_symbols %}
### Removed Symbols (top {{ max_removed_rows }})
| Symbol | Size |
|--------|------|
{% for symbol, size in removed_symbols[:max_removed_rows] -%}
| {{ format_symbol(symbol, symbol_max_length, symbol_truncate_length) }} | {{ size|format_bytes }} |
{% endfor -%}
{% if removed_symbols|length > max_removed_rows -%}
{% set total_removed_size = removed_symbols|sum(attribute=1) -%}
| *{{ removed_symbols|length - max_removed_rows }} more removed symbols...* | *Total: {{ total_removed_size|format_bytes }}* |
{% endif -%}
{% endif %}
</details>

View File

@@ -39,6 +39,7 @@ from script.analyze_component_buses import (
merge_compatible_bus_groups,
uses_local_file_references,
)
from script.helpers import get_component_test_files
from script.merge_component_configs import merge_component_configs
@@ -82,13 +83,14 @@ def show_disk_space_if_ci(esphome_command: str) -> None:
def find_component_tests(
components_dir: Path, component_pattern: str = "*"
components_dir: Path, component_pattern: str = "*", base_only: bool = False
) -> dict[str, list[Path]]:
"""Find all component test files.
Args:
components_dir: Path to tests/components directory
component_pattern: Glob pattern for component names
base_only: If True, only find base test files (test.*.yaml), not variant files (test-*.yaml)
Returns:
Dictionary mapping component name to list of test files
@@ -99,9 +101,10 @@ def find_component_tests(
if not comp_dir.is_dir():
continue
# Find test files matching test.*.yaml or test-*.yaml patterns
for test_file in comp_dir.glob("test[.-]*.yaml"):
component_tests[comp_dir.name].append(test_file)
# Get test files using helper function
test_files = get_component_test_files(comp_dir.name, all_variants=not base_only)
if test_files:
component_tests[comp_dir.name] = test_files
return dict(component_tests)
@@ -931,6 +934,7 @@ def test_components(
continue_on_fail: bool,
enable_grouping: bool = True,
isolated_components: set[str] | None = None,
base_only: bool = False,
) -> int:
"""Test components with optional intelligent grouping.
@@ -944,6 +948,7 @@ def test_components(
These are tested WITHOUT --testing-mode to enable full validation
(pin conflicts, etc). This is used in CI for directly changed components
to catch issues that would be missed with --testing-mode.
base_only: If True, only test base test files (test.*.yaml), not variant files (test-*.yaml)
Returns:
Exit code (0 for success, 1 for failure)
@@ -961,7 +966,7 @@ def test_components(
# Find all component tests
all_tests = {}
for pattern in component_patterns:
all_tests.update(find_component_tests(tests_dir, pattern))
all_tests.update(find_component_tests(tests_dir, pattern, base_only))
if not all_tests:
print(f"No components found matching: {component_patterns}")
@@ -1122,6 +1127,11 @@ def main() -> int:
"These are tested WITHOUT --testing-mode to enable full validation. "
"Used in CI for directly changed components to catch pin conflicts and other issues.",
)
parser.add_argument(
"--base-only",
action="store_true",
help="Only test base test files (test.*.yaml), not variant files (test-*.yaml)",
)
args = parser.parse_args()
@@ -1140,6 +1150,7 @@ def main() -> int:
continue_on_fail=args.continue_on_fail,
enable_grouping=not args.no_grouping,
isolated_components=isolated_components,
base_only=args.base_only,
)