1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-20 10:43:48 +01:00

Merge branch 'ci_impact_analysis' into integration

Resolved conflict in esphome/platformio_api.py:
- Kept refactored objdump_path and readelf_path from ci_impact_analysis
- Preserved analyze_memory_usage function from integration branch
This commit is contained in:
J. Nick Koston
2025-10-17 21:44:21 -10:00
24 changed files with 3764 additions and 61 deletions

View File

@@ -1,4 +1,5 @@
[run]
omit =
esphome/components/*
esphome/analyze_memory/*
tests/integration/*

View File

@@ -175,6 +175,7 @@ jobs:
changed-components-with-tests: ${{ steps.determine.outputs.changed-components-with-tests }}
directly-changed-components-with-tests: ${{ steps.determine.outputs.directly-changed-components-with-tests }}
component-test-count: ${{ steps.determine.outputs.component-test-count }}
memory_impact: ${{ steps.determine.outputs.memory-impact }}
steps:
- name: Check out code from GitHub
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
@@ -204,6 +205,7 @@ jobs:
echo "changed-components-with-tests=$(echo "$output" | jq -c '.changed_components_with_tests')" >> $GITHUB_OUTPUT
echo "directly-changed-components-with-tests=$(echo "$output" | jq -c '.directly_changed_components_with_tests')" >> $GITHUB_OUTPUT
echo "component-test-count=$(echo "$output" | jq -r '.component_test_count')" >> $GITHUB_OUTPUT
echo "memory-impact=$(echo "$output" | jq -c '.memory_impact')" >> $GITHUB_OUTPUT
integration-tests:
name: Run integration tests
@@ -521,6 +523,292 @@ jobs:
- uses: pre-commit-ci/lite-action@5d6cc0eb514c891a40562a58a8e71576c5c7fb43 # v1.1.0
if: always()
memory-impact-target-branch:
name: Build target branch for memory impact
runs-on: ubuntu-24.04
needs:
- common
- determine-jobs
if: github.event_name == 'pull_request' && fromJSON(needs.determine-jobs.outputs.memory_impact).should_run == 'true'
outputs:
ram_usage: ${{ steps.extract.outputs.ram_usage }}
flash_usage: ${{ steps.extract.outputs.flash_usage }}
cache_hit: ${{ steps.cache-memory-analysis.outputs.cache-hit }}
skip: ${{ steps.check-script.outputs.skip }}
steps:
- name: Check out target branch
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
with:
ref: ${{ github.base_ref }}
# Check if memory impact extraction script exists on target branch
# If not, skip the analysis (this handles older branches that don't have the feature)
- name: Check for memory impact script
id: check-script
run: |
if [ -f "script/ci_memory_impact_extract.py" ]; then
echo "skip=false" >> $GITHUB_OUTPUT
else
echo "skip=true" >> $GITHUB_OUTPUT
echo "::warning::ci_memory_impact_extract.py not found on target branch, skipping memory impact analysis"
fi
# All remaining steps only run if script exists
- name: Generate cache key
id: cache-key
if: steps.check-script.outputs.skip != 'true'
run: |
# Get the commit SHA of the target branch
target_sha=$(git rev-parse HEAD)
# Hash the build infrastructure files (all files that affect build/analysis)
infra_hash=$(cat \
script/test_build_components.py \
script/ci_memory_impact_extract.py \
script/analyze_component_buses.py \
script/merge_component_configs.py \
script/ci_helpers.py \
.github/workflows/ci.yml \
| sha256sum | cut -d' ' -f1)
# Get platform and components from job inputs
platform="${{ fromJSON(needs.determine-jobs.outputs.memory_impact).platform }}"
components='${{ toJSON(fromJSON(needs.determine-jobs.outputs.memory_impact).components) }}'
components_hash=$(echo "$components" | sha256sum | cut -d' ' -f1)
# Combine into cache key
cache_key="memory-analysis-target-${target_sha}-${infra_hash}-${platform}-${components_hash}"
echo "cache-key=${cache_key}" >> $GITHUB_OUTPUT
echo "Cache key: ${cache_key}"
- name: Restore cached memory analysis
id: cache-memory-analysis
if: steps.check-script.outputs.skip != 'true'
uses: actions/cache/restore@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
with:
path: memory-analysis-target.json
key: ${{ steps.cache-key.outputs.cache-key }}
- name: Cache status
if: steps.check-script.outputs.skip != 'true'
run: |
if [ "${{ steps.cache-memory-analysis.outputs.cache-hit }}" == "true" ]; then
echo "✓ Cache hit! Using cached memory analysis results."
echo " Skipping build step to save time."
else
echo "✗ Cache miss. Will build and analyze memory usage."
fi
- name: Restore Python
if: steps.check-script.outputs.skip != 'true' && steps.cache-memory-analysis.outputs.cache-hit != 'true'
uses: ./.github/actions/restore-python
with:
python-version: ${{ env.DEFAULT_PYTHON }}
cache-key: ${{ needs.common.outputs.cache-key }}
- name: Cache platformio
if: steps.check-script.outputs.skip != 'true' && steps.cache-memory-analysis.outputs.cache-hit != 'true'
uses: actions/cache/restore@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
with:
path: ~/.platformio
key: platformio-memory-${{ fromJSON(needs.determine-jobs.outputs.memory_impact).platform }}-${{ hashFiles('platformio.ini') }}
- name: Build, compile, and analyze memory
if: steps.check-script.outputs.skip != 'true' && steps.cache-memory-analysis.outputs.cache-hit != 'true'
id: build
run: |
. venv/bin/activate
components='${{ toJSON(fromJSON(needs.determine-jobs.outputs.memory_impact).components) }}'
platform="${{ fromJSON(needs.determine-jobs.outputs.memory_impact).platform }}"
echo "Building with test_build_components.py for $platform with components:"
echo "$components" | jq -r '.[]' | sed 's/^/ - /'
# Use test_build_components.py which handles grouping automatically
# Pass components as comma-separated list
component_list=$(echo "$components" | jq -r 'join(",")')
echo "Compiling with test_build_components.py..."
# Run build and extract memory with auto-detection of build directory for detailed analysis
# Use tee to show output in CI while also piping to extraction script
python script/test_build_components.py \
-e compile \
-c "$component_list" \
-t "$platform" 2>&1 | \
tee /dev/stderr | \
python script/ci_memory_impact_extract.py \
--output-env \
--output-json memory-analysis-target.json
- name: Save memory analysis to cache
if: steps.check-script.outputs.skip != 'true' && steps.cache-memory-analysis.outputs.cache-hit != 'true' && steps.build.outcome == 'success'
uses: actions/cache/save@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
with:
path: memory-analysis-target.json
key: ${{ steps.cache-key.outputs.cache-key }}
- name: Extract memory usage for outputs
id: extract
if: steps.check-script.outputs.skip != 'true'
run: |
if [ -f memory-analysis-target.json ]; then
ram=$(jq -r '.ram_bytes' memory-analysis-target.json)
flash=$(jq -r '.flash_bytes' memory-analysis-target.json)
echo "ram_usage=${ram}" >> $GITHUB_OUTPUT
echo "flash_usage=${flash}" >> $GITHUB_OUTPUT
echo "RAM: ${ram} bytes, Flash: ${flash} bytes"
else
echo "Error: memory-analysis-target.json not found"
exit 1
fi
- name: Upload memory analysis JSON
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
with:
name: memory-analysis-target
path: memory-analysis-target.json
if-no-files-found: warn
retention-days: 1
memory-impact-pr-branch:
name: Build PR branch for memory impact
runs-on: ubuntu-24.04
needs:
- common
- determine-jobs
if: github.event_name == 'pull_request' && fromJSON(needs.determine-jobs.outputs.memory_impact).should_run == 'true'
outputs:
ram_usage: ${{ steps.extract.outputs.ram_usage }}
flash_usage: ${{ steps.extract.outputs.flash_usage }}
steps:
- name: Check out PR branch
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Restore Python
uses: ./.github/actions/restore-python
with:
python-version: ${{ env.DEFAULT_PYTHON }}
cache-key: ${{ needs.common.outputs.cache-key }}
- name: Cache platformio
uses: actions/cache/restore@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
with:
path: ~/.platformio
key: platformio-memory-${{ fromJSON(needs.determine-jobs.outputs.memory_impact).platform }}-${{ hashFiles('platformio.ini') }}
- name: Build, compile, and analyze memory
id: extract
run: |
. venv/bin/activate
components='${{ toJSON(fromJSON(needs.determine-jobs.outputs.memory_impact).components) }}'
platform="${{ fromJSON(needs.determine-jobs.outputs.memory_impact).platform }}"
echo "Building with test_build_components.py for $platform with components:"
echo "$components" | jq -r '.[]' | sed 's/^/ - /'
# Use test_build_components.py which handles grouping automatically
# Pass components as comma-separated list
component_list=$(echo "$components" | jq -r 'join(",")')
echo "Compiling with test_build_components.py..."
# Run build and extract memory with auto-detection of build directory for detailed analysis
# Use tee to show output in CI while also piping to extraction script
python script/test_build_components.py \
-e compile \
-c "$component_list" \
-t "$platform" 2>&1 | \
tee /dev/stderr | \
python script/ci_memory_impact_extract.py \
--output-env \
--output-json memory-analysis-pr.json
- name: Upload memory analysis JSON
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
with:
name: memory-analysis-pr
path: memory-analysis-pr.json
if-no-files-found: warn
retention-days: 1
memory-impact-comment:
name: Comment memory impact
runs-on: ubuntu-24.04
needs:
- common
- determine-jobs
- memory-impact-target-branch
- memory-impact-pr-branch
if: github.event_name == 'pull_request' && fromJSON(needs.determine-jobs.outputs.memory_impact).should_run == 'true' && needs.memory-impact-target-branch.outputs.skip != 'true'
permissions:
contents: read
pull-requests: write
steps:
- name: Check out code
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Restore Python
uses: ./.github/actions/restore-python
with:
python-version: ${{ env.DEFAULT_PYTHON }}
cache-key: ${{ needs.common.outputs.cache-key }}
- name: Download target analysis JSON
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
with:
name: memory-analysis-target
path: ./memory-analysis
continue-on-error: true
- name: Download PR analysis JSON
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
with:
name: memory-analysis-pr
path: ./memory-analysis
continue-on-error: true
- name: Post or update PR comment
env:
GH_TOKEN: ${{ github.token }}
COMPONENTS: ${{ toJSON(fromJSON(needs.determine-jobs.outputs.memory_impact).components) }}
PLATFORM: ${{ fromJSON(needs.determine-jobs.outputs.memory_impact).platform }}
TARGET_RAM: ${{ needs.memory-impact-target-branch.outputs.ram_usage }}
TARGET_FLASH: ${{ needs.memory-impact-target-branch.outputs.flash_usage }}
PR_RAM: ${{ needs.memory-impact-pr-branch.outputs.ram_usage }}
PR_FLASH: ${{ needs.memory-impact-pr-branch.outputs.flash_usage }}
TARGET_CACHE_HIT: ${{ needs.memory-impact-target-branch.outputs.cache_hit }}
run: |
. venv/bin/activate
# Check if analysis JSON files exist
target_json_arg=""
pr_json_arg=""
if [ -f ./memory-analysis/memory-analysis-target.json ]; then
echo "Found target analysis JSON"
target_json_arg="--target-json ./memory-analysis/memory-analysis-target.json"
else
echo "No target analysis JSON found"
fi
if [ -f ./memory-analysis/memory-analysis-pr.json ]; then
echo "Found PR analysis JSON"
pr_json_arg="--pr-json ./memory-analysis/memory-analysis-pr.json"
else
echo "No PR analysis JSON found"
fi
# Add cache flag if target was cached
cache_flag=""
if [ "$TARGET_CACHE_HIT" == "true" ]; then
cache_flag="--target-cache-hit"
fi
python script/ci_memory_impact_comment.py \
--pr-number "${{ github.event.pull_request.number }}" \
--components "$COMPONENTS" \
--platform "$PLATFORM" \
--target-ram "$TARGET_RAM" \
--target-flash "$TARGET_FLASH" \
--pr-ram "$PR_RAM" \
--pr-flash "$PR_FLASH" \
$target_json_arg \
$pr_json_arg \
$cache_flag
ci-status:
name: CI Status
runs-on: ubuntu-24.04
@@ -535,6 +823,9 @@ jobs:
- test-build-components-splitter
- test-build-components-split
- pre-commit-ci-lite
- memory-impact-target-branch
- memory-impact-pr-branch
- memory-impact-comment
if: always()
steps:
- name: Success

View File

@@ -468,7 +468,9 @@ def write_cpp_file() -> int:
def compile_program(args: ArgsProtocol, config: ConfigType) -> int:
from esphome import platformio_api
_LOGGER.info("Compiling app...")
# NOTE: "Build path:" format is parsed by script/ci_memory_impact_extract.py
# If you change this format, update the regex in that script as well
_LOGGER.info("Compiling app... Build path: %s", CORE.build_path)
rc = platformio_api.run_compile(config, CORE.verbose)
if rc != 0:
return rc

View File

@@ -0,0 +1,502 @@
"""Memory usage analyzer for ESPHome compiled binaries."""
from collections import defaultdict
from dataclasses import dataclass, field
import logging
from pathlib import Path
import re
import subprocess
from typing import TYPE_CHECKING
from .const import (
CORE_SUBCATEGORY_PATTERNS,
DEMANGLED_PATTERNS,
ESPHOME_COMPONENT_PATTERN,
SECTION_TO_ATTR,
SYMBOL_PATTERNS,
)
from .helpers import (
get_component_class_patterns,
get_esphome_components,
map_section_name,
parse_symbol_line,
)
if TYPE_CHECKING:
from esphome.platformio_api import IDEData
_LOGGER = logging.getLogger(__name__)
# GCC global constructor/destructor prefix annotations
_GCC_PREFIX_ANNOTATIONS = {
"_GLOBAL__sub_I_": "global constructor for",
"_GLOBAL__sub_D_": "global destructor for",
}
# GCC optimization suffix pattern (e.g., $isra$0, $part$1, $constprop$2)
_GCC_OPTIMIZATION_SUFFIX_PATTERN = re.compile(r"(\$(?:isra|part|constprop)\$\d+)")
# C++ runtime patterns for categorization
_CPP_RUNTIME_PATTERNS = frozenset(["vtable", "typeinfo", "thunk"])
# libc printf/scanf family base names (used to detect variants like _printf_r, vfprintf, etc.)
_LIBC_PRINTF_SCANF_FAMILY = frozenset(["printf", "fprintf", "sprintf", "scanf"])
# Regex pattern for parsing readelf section headers
# Format: [ #] name type addr off size
_READELF_SECTION_PATTERN = re.compile(
r"\s*\[\s*\d+\]\s+([\.\w]+)\s+\w+\s+[\da-fA-F]+\s+[\da-fA-F]+\s+([\da-fA-F]+)"
)
# Component category prefixes
_COMPONENT_PREFIX_ESPHOME = "[esphome]"
_COMPONENT_PREFIX_EXTERNAL = "[external]"
_COMPONENT_CORE = f"{_COMPONENT_PREFIX_ESPHOME}core"
_COMPONENT_API = f"{_COMPONENT_PREFIX_ESPHOME}api"
# C++ namespace prefixes
_NAMESPACE_ESPHOME = "esphome::"
_NAMESPACE_STD = "std::"
# Type alias for symbol information: (symbol_name, size, component)
SymbolInfoType = tuple[str, int, str]
@dataclass
class MemorySection:
"""Represents a memory section with its symbols."""
name: str
symbols: list[SymbolInfoType] = field(default_factory=list)
total_size: int = 0
@dataclass
class ComponentMemory:
"""Tracks memory usage for a component."""
name: str
text_size: int = 0 # Code in flash
rodata_size: int = 0 # Read-only data in flash
data_size: int = 0 # Initialized data (flash + ram)
bss_size: int = 0 # Uninitialized data (ram only)
symbol_count: int = 0
@property
def flash_total(self) -> int:
"""Total flash usage (text + rodata + data)."""
return self.text_size + self.rodata_size + self.data_size
@property
def ram_total(self) -> int:
"""Total RAM usage (data + bss)."""
return self.data_size + self.bss_size
class MemoryAnalyzer:
"""Analyzes memory usage from ELF files."""
def __init__(
self,
elf_path: str,
objdump_path: str | None = None,
readelf_path: str | None = None,
external_components: set[str] | None = None,
idedata: "IDEData | None" = None,
) -> None:
"""Initialize memory analyzer.
Args:
elf_path: Path to ELF file to analyze
objdump_path: Path to objdump binary (auto-detected from idedata if not provided)
readelf_path: Path to readelf binary (auto-detected from idedata if not provided)
external_components: Set of external component names
idedata: Optional PlatformIO IDEData object to auto-detect toolchain paths
"""
self.elf_path = Path(elf_path)
if not self.elf_path.exists():
raise FileNotFoundError(f"ELF file not found: {elf_path}")
# Auto-detect toolchain paths from idedata if not provided
if idedata is not None and (objdump_path is None or readelf_path is None):
objdump_path = objdump_path or idedata.objdump_path
readelf_path = readelf_path or idedata.readelf_path
_LOGGER.debug("Using toolchain paths from PlatformIO idedata")
self.objdump_path = objdump_path or "objdump"
self.readelf_path = readelf_path or "readelf"
self.external_components = external_components or set()
self.sections: dict[str, MemorySection] = {}
self.components: dict[str, ComponentMemory] = defaultdict(
lambda: ComponentMemory("")
)
self._demangle_cache: dict[str, str] = {}
self._uncategorized_symbols: list[tuple[str, str, int]] = []
self._esphome_core_symbols: list[
tuple[str, str, int]
] = [] # Track core symbols
self._component_symbols: dict[str, list[tuple[str, str, int]]] = defaultdict(
list
) # Track symbols for all components
def analyze(self) -> dict[str, ComponentMemory]:
"""Analyze the ELF file and return component memory usage."""
self._parse_sections()
self._parse_symbols()
self._categorize_symbols()
return dict(self.components)
def _parse_sections(self) -> None:
"""Parse section headers from ELF file."""
result = subprocess.run(
[self.readelf_path, "-S", str(self.elf_path)],
capture_output=True,
text=True,
check=True,
)
# Parse section headers
for line in result.stdout.splitlines():
# Look for section entries
if not (match := _READELF_SECTION_PATTERN.match(line)):
continue
section_name = match.group(1)
size_hex = match.group(2)
size = int(size_hex, 16)
# Map to standard section name
mapped_section = map_section_name(section_name)
if not mapped_section:
continue
if mapped_section not in self.sections:
self.sections[mapped_section] = MemorySection(mapped_section)
self.sections[mapped_section].total_size += size
def _parse_symbols(self) -> None:
"""Parse symbols from ELF file."""
result = subprocess.run(
[self.objdump_path, "-t", str(self.elf_path)],
capture_output=True,
text=True,
check=True,
)
# Track seen addresses to avoid duplicates
seen_addresses: set[str] = set()
for line in result.stdout.splitlines():
if not (symbol_info := parse_symbol_line(line)):
continue
section, name, size, address = symbol_info
# Skip duplicate symbols at the same address (e.g., C1/C2 constructors)
if address in seen_addresses or section not in self.sections:
continue
self.sections[section].symbols.append((name, size, ""))
seen_addresses.add(address)
def _categorize_symbols(self) -> None:
"""Categorize symbols by component."""
# First, collect all unique symbol names for batch demangling
all_symbols = {
symbol_name
for section in self.sections.values()
for symbol_name, _, _ in section.symbols
}
# Batch demangle all symbols at once
self._batch_demangle_symbols(list(all_symbols))
# Now categorize with cached demangled names
for section_name, section in self.sections.items():
for symbol_name, size, _ in section.symbols:
component = self._identify_component(symbol_name)
if component not in self.components:
self.components[component] = ComponentMemory(component)
comp_mem = self.components[component]
comp_mem.symbol_count += 1
# Update the appropriate size attribute based on section
if attr_name := SECTION_TO_ATTR.get(section_name):
setattr(comp_mem, attr_name, getattr(comp_mem, attr_name) + size)
# Track uncategorized symbols
if component == "other" and size > 0:
demangled = self._demangle_symbol(symbol_name)
self._uncategorized_symbols.append((symbol_name, demangled, size))
# Track ESPHome core symbols for detailed analysis
if component == _COMPONENT_CORE and size > 0:
demangled = self._demangle_symbol(symbol_name)
self._esphome_core_symbols.append((symbol_name, demangled, size))
# Track all component symbols for detailed analysis
if size > 0:
demangled = self._demangle_symbol(symbol_name)
self._component_symbols[component].append(
(symbol_name, demangled, size)
)
def _identify_component(self, symbol_name: str) -> str:
"""Identify which component a symbol belongs to."""
# Demangle C++ names if needed
demangled = self._demangle_symbol(symbol_name)
# Check for special component classes first (before namespace pattern)
# This handles cases like esphome::ESPHomeOTAComponent which should map to ota
if _NAMESPACE_ESPHOME in demangled:
# Check for special component classes that include component name in the class
# For example: esphome::ESPHomeOTAComponent -> ota component
for component_name in get_esphome_components():
patterns = get_component_class_patterns(component_name)
if any(pattern in demangled for pattern in patterns):
return f"{_COMPONENT_PREFIX_ESPHOME}{component_name}"
# Check for ESPHome component namespaces
match = ESPHOME_COMPONENT_PATTERN.search(demangled)
if match:
component_name = match.group(1)
# Strip trailing underscore if present (e.g., switch_ -> switch)
component_name = component_name.rstrip("_")
# Check if this is an actual component in the components directory
if component_name in get_esphome_components():
return f"{_COMPONENT_PREFIX_ESPHOME}{component_name}"
# Check if this is a known external component from the config
if component_name in self.external_components:
return f"{_COMPONENT_PREFIX_EXTERNAL}{component_name}"
# Everything else in esphome:: namespace is core
return _COMPONENT_CORE
# Check for esphome core namespace (no component namespace)
if _NAMESPACE_ESPHOME in demangled:
# If no component match found, it's core
return _COMPONENT_CORE
# Check against symbol patterns
for component, patterns in SYMBOL_PATTERNS.items():
if any(pattern in symbol_name for pattern in patterns):
return component
# Check against demangled patterns
for component, patterns in DEMANGLED_PATTERNS.items():
if any(pattern in demangled for pattern in patterns):
return component
# Special cases that need more complex logic
# Check if spi_flash vs spi_driver
if "spi_" in symbol_name or "SPI" in symbol_name:
return "spi_flash" if "spi_flash" in symbol_name else "spi_driver"
# libc special printf variants
if (
symbol_name.startswith("_")
and symbol_name[1:].replace("_r", "").replace("v", "").replace("s", "")
in _LIBC_PRINTF_SCANF_FAMILY
):
return "libc"
# Track uncategorized symbols for analysis
return "other"
def _batch_demangle_symbols(self, symbols: list[str]) -> None:
"""Batch demangle C++ symbol names for efficiency."""
if not symbols:
return
# Try to find the appropriate c++filt for the platform
cppfilt_cmd = "c++filt"
_LOGGER.info("Demangling %d symbols", len(symbols))
_LOGGER.debug("objdump_path = %s", self.objdump_path)
# Check if we have a toolchain-specific c++filt
if self.objdump_path and self.objdump_path != "objdump":
# Replace objdump with c++filt in the path
potential_cppfilt = self.objdump_path.replace("objdump", "c++filt")
_LOGGER.info("Checking for toolchain c++filt at: %s", potential_cppfilt)
if Path(potential_cppfilt).exists():
cppfilt_cmd = potential_cppfilt
_LOGGER.info("✓ Using toolchain c++filt: %s", cppfilt_cmd)
else:
_LOGGER.info(
"✗ Toolchain c++filt not found at %s, using system c++filt",
potential_cppfilt,
)
else:
_LOGGER.info("✗ Using system c++filt (objdump_path=%s)", self.objdump_path)
# Strip GCC optimization suffixes and prefixes before demangling
# Suffixes like $isra$0, $part$0, $constprop$0 confuse c++filt
# Prefixes like _GLOBAL__sub_I_ need to be removed and tracked
symbols_stripped: list[str] = []
symbols_prefixes: list[str] = [] # Track removed prefixes
for symbol in symbols:
# Remove GCC optimization markers
stripped = _GCC_OPTIMIZATION_SUFFIX_PATTERN.sub("", symbol)
# Handle GCC global constructor/initializer prefixes
# _GLOBAL__sub_I_<mangled> -> extract <mangled> for demangling
prefix = ""
for gcc_prefix in _GCC_PREFIX_ANNOTATIONS:
if stripped.startswith(gcc_prefix):
prefix = gcc_prefix
stripped = stripped[len(prefix) :]
break
symbols_stripped.append(stripped)
symbols_prefixes.append(prefix)
try:
# Send all symbols to c++filt at once
result = subprocess.run(
[cppfilt_cmd],
input="\n".join(symbols_stripped),
capture_output=True,
text=True,
check=False,
)
except (subprocess.SubprocessError, OSError, UnicodeDecodeError) as e:
# On error, cache originals
_LOGGER.warning("Failed to batch demangle symbols: %s", e)
for symbol in symbols:
self._demangle_cache[symbol] = symbol
return
if result.returncode != 0:
_LOGGER.warning(
"c++filt exited with code %d: %s",
result.returncode,
result.stderr[:200] if result.stderr else "(no error output)",
)
# Cache originals on failure
for symbol in symbols:
self._demangle_cache[symbol] = symbol
return
# Process demangled output
self._process_demangled_output(
symbols, symbols_stripped, symbols_prefixes, result.stdout, cppfilt_cmd
)
def _process_demangled_output(
self,
symbols: list[str],
symbols_stripped: list[str],
symbols_prefixes: list[str],
demangled_output: str,
cppfilt_cmd: str,
) -> None:
"""Process demangled symbol output and populate cache.
Args:
symbols: Original symbol names
symbols_stripped: Stripped symbol names sent to c++filt
symbols_prefixes: Removed prefixes to restore
demangled_output: Output from c++filt
cppfilt_cmd: Path to c++filt command (for logging)
"""
demangled_lines = demangled_output.strip().split("\n")
failed_count = 0
for original, stripped, prefix, demangled in zip(
symbols, symbols_stripped, symbols_prefixes, demangled_lines
):
# Add back any prefix that was removed
demangled = self._restore_symbol_prefix(prefix, stripped, demangled)
# If we stripped a suffix, add it back to the demangled name for clarity
if original != stripped and not prefix:
demangled = self._restore_symbol_suffix(original, demangled)
self._demangle_cache[original] = demangled
# Log symbols that failed to demangle (stayed the same as stripped version)
if stripped == demangled and stripped.startswith("_Z"):
failed_count += 1
if failed_count <= 5: # Only log first 5 failures
_LOGGER.warning("Failed to demangle: %s", original)
if failed_count == 0:
_LOGGER.info("Successfully demangled all %d symbols", len(symbols))
return
_LOGGER.warning(
"Failed to demangle %d/%d symbols using %s",
failed_count,
len(symbols),
cppfilt_cmd,
)
@staticmethod
def _restore_symbol_prefix(prefix: str, stripped: str, demangled: str) -> str:
"""Restore prefix that was removed before demangling.
Args:
prefix: Prefix that was removed (e.g., "_GLOBAL__sub_I_")
stripped: Stripped symbol name
demangled: Demangled symbol name
Returns:
Demangled name with prefix restored/annotated
"""
if not prefix:
return demangled
# Successfully demangled - add descriptive prefix
if demangled != stripped and (
annotation := _GCC_PREFIX_ANNOTATIONS.get(prefix)
):
return f"[{annotation}: {demangled}]"
# Failed to demangle - restore original prefix
return prefix + demangled
@staticmethod
def _restore_symbol_suffix(original: str, demangled: str) -> str:
"""Restore GCC optimization suffix that was removed before demangling.
Args:
original: Original symbol name with suffix
demangled: Demangled symbol name without suffix
Returns:
Demangled name with suffix annotation
"""
if suffix_match := _GCC_OPTIMIZATION_SUFFIX_PATTERN.search(original):
return f"{demangled} [{suffix_match.group(1)}]"
return demangled
def _demangle_symbol(self, symbol: str) -> str:
"""Get demangled C++ symbol name from cache."""
return self._demangle_cache.get(symbol, symbol)
def _categorize_esphome_core_symbol(self, demangled: str) -> str:
"""Categorize ESPHome core symbols into subcategories."""
# Special patterns that need to be checked separately
if any(pattern in demangled for pattern in _CPP_RUNTIME_PATTERNS):
return "C++ Runtime (vtables/RTTI)"
if demangled.startswith(_NAMESPACE_STD):
return "C++ STL"
# Check against patterns from const.py
for category, patterns in CORE_SUBCATEGORY_PATTERNS.items():
if any(pattern in demangled for pattern in patterns):
return category
return "Other Core"
if __name__ == "__main__":
from .cli import main
main()

View File

@@ -0,0 +1,6 @@
"""Main entry point for running the memory analyzer as a module."""
from .cli import main
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,408 @@
"""CLI interface for memory analysis with report generation."""
from collections import defaultdict
import sys
from . import (
_COMPONENT_API,
_COMPONENT_CORE,
_COMPONENT_PREFIX_ESPHOME,
_COMPONENT_PREFIX_EXTERNAL,
MemoryAnalyzer,
)
class MemoryAnalyzerCLI(MemoryAnalyzer):
"""Memory analyzer with CLI-specific report generation."""
# Column width constants
COL_COMPONENT: int = 29
COL_FLASH_TEXT: int = 14
COL_FLASH_DATA: int = 14
COL_RAM_DATA: int = 12
COL_RAM_BSS: int = 12
COL_TOTAL_FLASH: int = 15
COL_TOTAL_RAM: int = 12
COL_SEPARATOR: int = 3 # " | "
# Core analysis column widths
COL_CORE_SUBCATEGORY: int = 30
COL_CORE_SIZE: int = 12
COL_CORE_COUNT: int = 6
COL_CORE_PERCENT: int = 10
# Calculate table width once at class level
TABLE_WIDTH: int = (
COL_COMPONENT
+ COL_SEPARATOR
+ COL_FLASH_TEXT
+ COL_SEPARATOR
+ COL_FLASH_DATA
+ COL_SEPARATOR
+ COL_RAM_DATA
+ COL_SEPARATOR
+ COL_RAM_BSS
+ COL_SEPARATOR
+ COL_TOTAL_FLASH
+ COL_SEPARATOR
+ COL_TOTAL_RAM
)
@staticmethod
def _make_separator_line(*widths: int) -> str:
"""Create a separator line with given column widths.
Args:
widths: Column widths to create separators for
Returns:
Separator line like "----+---------+-----"
"""
return "-+-".join("-" * width for width in widths)
# Pre-computed separator lines
MAIN_TABLE_SEPARATOR: str = _make_separator_line(
COL_COMPONENT,
COL_FLASH_TEXT,
COL_FLASH_DATA,
COL_RAM_DATA,
COL_RAM_BSS,
COL_TOTAL_FLASH,
COL_TOTAL_RAM,
)
CORE_TABLE_SEPARATOR: str = _make_separator_line(
COL_CORE_SUBCATEGORY,
COL_CORE_SIZE,
COL_CORE_COUNT,
COL_CORE_PERCENT,
)
def generate_report(self, detailed: bool = False) -> str:
"""Generate a formatted memory report."""
components = sorted(
self.components.items(), key=lambda x: x[1].flash_total, reverse=True
)
# Calculate totals
total_flash = sum(c.flash_total for _, c in components)
total_ram = sum(c.ram_total for _, c in components)
# Build report
lines: list[str] = []
lines.append("=" * self.TABLE_WIDTH)
lines.append("Component Memory Analysis".center(self.TABLE_WIDTH))
lines.append("=" * self.TABLE_WIDTH)
lines.append("")
# Main table - fixed column widths
lines.append(
f"{'Component':<{self.COL_COMPONENT}} | {'Flash (text)':>{self.COL_FLASH_TEXT}} | {'Flash (data)':>{self.COL_FLASH_DATA}} | {'RAM (data)':>{self.COL_RAM_DATA}} | {'RAM (bss)':>{self.COL_RAM_BSS}} | {'Total Flash':>{self.COL_TOTAL_FLASH}} | {'Total RAM':>{self.COL_TOTAL_RAM}}"
)
lines.append(self.MAIN_TABLE_SEPARATOR)
for name, mem in components:
if mem.flash_total > 0 or mem.ram_total > 0:
flash_rodata = mem.rodata_size + mem.data_size
lines.append(
f"{name:<{self.COL_COMPONENT}} | {mem.text_size:>{self.COL_FLASH_TEXT - 2},} B | {flash_rodata:>{self.COL_FLASH_DATA - 2},} B | "
f"{mem.data_size:>{self.COL_RAM_DATA - 2},} B | {mem.bss_size:>{self.COL_RAM_BSS - 2},} B | "
f"{mem.flash_total:>{self.COL_TOTAL_FLASH - 2},} B | {mem.ram_total:>{self.COL_TOTAL_RAM - 2},} B"
)
lines.append(self.MAIN_TABLE_SEPARATOR)
lines.append(
f"{'TOTAL':<{self.COL_COMPONENT}} | {' ':>{self.COL_FLASH_TEXT}} | {' ':>{self.COL_FLASH_DATA}} | "
f"{' ':>{self.COL_RAM_DATA}} | {' ':>{self.COL_RAM_BSS}} | "
f"{total_flash:>{self.COL_TOTAL_FLASH - 2},} B | {total_ram:>{self.COL_TOTAL_RAM - 2},} B"
)
# Top consumers
lines.append("")
lines.append("Top Flash Consumers:")
for i, (name, mem) in enumerate(components[:25]):
if mem.flash_total > 0:
percentage = (
(mem.flash_total / total_flash * 100) if total_flash > 0 else 0
)
lines.append(
f"{i + 1}. {name} ({mem.flash_total:,} B) - {percentage:.1f}% of analyzed flash"
)
lines.append("")
lines.append("Top RAM Consumers:")
ram_components = sorted(components, key=lambda x: x[1].ram_total, reverse=True)
for i, (name, mem) in enumerate(ram_components[:25]):
if mem.ram_total > 0:
percentage = (mem.ram_total / total_ram * 100) if total_ram > 0 else 0
lines.append(
f"{i + 1}. {name} ({mem.ram_total:,} B) - {percentage:.1f}% of analyzed RAM"
)
lines.append("")
lines.append(
"Note: This analysis covers symbols in the ELF file. Some runtime allocations may not be included."
)
lines.append("=" * self.TABLE_WIDTH)
# Add ESPHome core detailed analysis if there are core symbols
if self._esphome_core_symbols:
lines.append("")
lines.append("=" * self.TABLE_WIDTH)
lines.append(
f"{_COMPONENT_CORE} Detailed Analysis".center(self.TABLE_WIDTH)
)
lines.append("=" * self.TABLE_WIDTH)
lines.append("")
# Group core symbols by subcategory
core_subcategories: dict[str, list[tuple[str, str, int]]] = defaultdict(
list
)
for symbol, demangled, size in self._esphome_core_symbols:
# Categorize based on demangled name patterns
subcategory = self._categorize_esphome_core_symbol(demangled)
core_subcategories[subcategory].append((symbol, demangled, size))
# Sort subcategories by total size
sorted_subcategories = sorted(
[
(name, symbols, sum(s[2] for s in symbols))
for name, symbols in core_subcategories.items()
],
key=lambda x: x[2],
reverse=True,
)
lines.append(
f"{'Subcategory':<{self.COL_CORE_SUBCATEGORY}} | {'Size':>{self.COL_CORE_SIZE}} | "
f"{'Count':>{self.COL_CORE_COUNT}} | {'% of Core':>{self.COL_CORE_PERCENT}}"
)
lines.append(self.CORE_TABLE_SEPARATOR)
core_total = sum(size for _, _, size in self._esphome_core_symbols)
for subcategory, symbols, total_size in sorted_subcategories:
percentage = (total_size / core_total * 100) if core_total > 0 else 0
lines.append(
f"{subcategory:<{self.COL_CORE_SUBCATEGORY}} | {total_size:>{self.COL_CORE_SIZE - 2},} B | "
f"{len(symbols):>{self.COL_CORE_COUNT}} | {percentage:>{self.COL_CORE_PERCENT - 1}.1f}%"
)
# Top 15 largest core symbols
lines.append("")
lines.append(f"Top 15 Largest {_COMPONENT_CORE} Symbols:")
sorted_core_symbols = sorted(
self._esphome_core_symbols, key=lambda x: x[2], reverse=True
)
for i, (symbol, demangled, size) in enumerate(sorted_core_symbols[:15]):
lines.append(f"{i + 1}. {demangled} ({size:,} B)")
lines.append("=" * self.TABLE_WIDTH)
# Add detailed analysis for top ESPHome and external components
esphome_components = [
(name, mem)
for name, mem in components
if name.startswith(_COMPONENT_PREFIX_ESPHOME) and name != _COMPONENT_CORE
]
external_components = [
(name, mem)
for name, mem in components
if name.startswith(_COMPONENT_PREFIX_EXTERNAL)
]
top_esphome_components = sorted(
esphome_components, key=lambda x: x[1].flash_total, reverse=True
)[:30]
# Include all external components (they're usually important)
top_external_components = sorted(
external_components, key=lambda x: x[1].flash_total, reverse=True
)
# Check if API component exists and ensure it's included
api_component = None
for name, mem in components:
if name == _COMPONENT_API:
api_component = (name, mem)
break
# Combine all components to analyze: top ESPHome + all external + API if not already included
components_to_analyze = list(top_esphome_components) + list(
top_external_components
)
if api_component and api_component not in components_to_analyze:
components_to_analyze.append(api_component)
if components_to_analyze:
for comp_name, comp_mem in components_to_analyze:
if not (comp_symbols := self._component_symbols.get(comp_name, [])):
continue
lines.append("")
lines.append("=" * self.TABLE_WIDTH)
lines.append(f"{comp_name} Detailed Analysis".center(self.TABLE_WIDTH))
lines.append("=" * self.TABLE_WIDTH)
lines.append("")
# Sort symbols by size
sorted_symbols = sorted(comp_symbols, key=lambda x: x[2], reverse=True)
lines.append(f"Total symbols: {len(sorted_symbols)}")
lines.append(f"Total size: {comp_mem.flash_total:,} B")
lines.append("")
# Show all symbols > 100 bytes for better visibility
large_symbols = [
(sym, dem, size) for sym, dem, size in sorted_symbols if size > 100
]
lines.append(
f"{comp_name} Symbols > 100 B ({len(large_symbols)} symbols):"
)
for i, (symbol, demangled, size) in enumerate(large_symbols):
lines.append(f"{i + 1}. {demangled} ({size:,} B)")
lines.append("=" * self.TABLE_WIDTH)
return "\n".join(lines)
def dump_uncategorized_symbols(self, output_file: str | None = None) -> None:
"""Dump uncategorized symbols for analysis."""
# Sort by size descending
sorted_symbols = sorted(
self._uncategorized_symbols, key=lambda x: x[2], reverse=True
)
lines = ["Uncategorized Symbols Analysis", "=" * 80]
lines.append(f"Total uncategorized symbols: {len(sorted_symbols)}")
lines.append(
f"Total uncategorized size: {sum(s[2] for s in sorted_symbols):,} bytes"
)
lines.append("")
lines.append(f"{'Size':>10} | {'Symbol':<60} | Demangled")
lines.append("-" * 10 + "-+-" + "-" * 60 + "-+-" + "-" * 40)
for symbol, demangled, size in sorted_symbols[:100]: # Top 100
demangled_display = (
demangled[:100] if symbol != demangled else "[not demangled]"
)
lines.append(f"{size:>10,} | {symbol[:60]:<60} | {demangled_display}")
if len(sorted_symbols) > 100:
lines.append(f"\n... and {len(sorted_symbols) - 100} more symbols")
content = "\n".join(lines)
if output_file:
with open(output_file, "w", encoding="utf-8") as f:
f.write(content)
else:
print(content)
def analyze_elf(
elf_path: str,
objdump_path: str | None = None,
readelf_path: str | None = None,
detailed: bool = False,
external_components: set[str] | None = None,
) -> str:
"""Analyze an ELF file and return a memory report."""
analyzer = MemoryAnalyzerCLI(
elf_path, objdump_path, readelf_path, external_components
)
analyzer.analyze()
return analyzer.generate_report(detailed)
def main():
"""CLI entrypoint for memory analysis."""
if len(sys.argv) < 2:
print("Usage: python -m esphome.analyze_memory <build_directory>")
print("\nAnalyze memory usage from an ESPHome build directory.")
print("The build directory should contain firmware.elf and idedata will be")
print("loaded from ~/.esphome/.internal/idedata/<device>.json")
print("\nExamples:")
print(" python -m esphome.analyze_memory ~/.esphome/build/my-device")
print(" python -m esphome.analyze_memory .esphome/build/my-device")
print(" python -m esphome.analyze_memory my-device # Short form")
sys.exit(1)
build_dir = sys.argv[1]
# Load build directory
import json
from pathlib import Path
from esphome.platformio_api import IDEData
build_path = Path(build_dir)
# If no path separator in name, assume it's a device name
if "/" not in build_dir and not build_path.is_dir():
# Try current directory first
cwd_path = Path.cwd() / ".esphome" / "build" / build_dir
if cwd_path.is_dir():
build_path = cwd_path
print(f"Using build directory: {build_path}", file=sys.stderr)
else:
# Fall back to home directory
build_path = Path.home() / ".esphome" / "build" / build_dir
print(f"Using build directory: {build_path}", file=sys.stderr)
if not build_path.is_dir():
print(f"Error: {build_path} is not a directory", file=sys.stderr)
sys.exit(1)
# Find firmware.elf
elf_file = None
for elf_candidate in [
build_path / "firmware.elf",
build_path / ".pioenvs" / build_path.name / "firmware.elf",
]:
if elf_candidate.exists():
elf_file = str(elf_candidate)
break
if not elf_file:
print(f"Error: firmware.elf not found in {build_dir}", file=sys.stderr)
sys.exit(1)
# Find idedata.json - check current directory first, then home
device_name = build_path.name
idedata_candidates = [
Path.cwd() / ".esphome" / "idedata" / f"{device_name}.json",
Path.home() / ".esphome" / "idedata" / f"{device_name}.json",
]
idedata = None
for idedata_path in idedata_candidates:
if not idedata_path.exists():
continue
try:
with open(idedata_path, encoding="utf-8") as f:
raw_data = json.load(f)
idedata = IDEData(raw_data)
print(f"Loaded idedata from: {idedata_path}", file=sys.stderr)
break
except (json.JSONDecodeError, OSError) as e:
print(f"Warning: Failed to load idedata: {e}", file=sys.stderr)
if not idedata:
print(
f"Warning: idedata not found (searched {idedata_candidates[0]} and {idedata_candidates[1]})",
file=sys.stderr,
)
analyzer = MemoryAnalyzerCLI(elf_file, idedata=idedata)
analyzer.analyze()
report = analyzer.generate_report()
print(report)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,903 @@
"""Constants for memory analysis symbol pattern matching."""
import re
# Pattern to extract ESPHome component namespaces dynamically
ESPHOME_COMPONENT_PATTERN = re.compile(r"esphome::([a-zA-Z0-9_]+)::")
# Section mapping for ELF file sections
# Maps standard section names to their various platform-specific variants
SECTION_MAPPING = {
".text": frozenset([".text", ".iram"]),
".rodata": frozenset([".rodata"]),
".data": frozenset([".data", ".dram"]),
".bss": frozenset([".bss"]),
}
# Section to ComponentMemory attribute mapping
# Maps section names to the attribute name in ComponentMemory dataclass
SECTION_TO_ATTR = {
".text": "text_size",
".rodata": "rodata_size",
".data": "data_size",
".bss": "bss_size",
}
# Component identification rules
# Symbol patterns: patterns found in raw symbol names
SYMBOL_PATTERNS = {
"freertos": [
"vTask",
"xTask",
"xQueue",
"pvPort",
"vPort",
"uxTask",
"pcTask",
"prvTimerTask",
"prvAddNewTaskToReadyList",
"pxReadyTasksLists",
"prvAddCurrentTaskToDelayedList",
"xEventGroupWaitBits",
"xRingbufferSendFromISR",
"prvSendItemDoneNoSplit",
"prvReceiveGeneric",
"prvSendAcquireGeneric",
"prvCopyItemAllowSplit",
"xEventGroup",
"xRingbuffer",
"prvSend",
"prvReceive",
"prvCopy",
"xPort",
"ulTaskGenericNotifyTake",
"prvIdleTask",
"prvInitialiseNewTask",
"prvIsYieldRequiredSMP",
"prvGetItemByteBuf",
"prvInitializeNewRingbuffer",
"prvAcquireItemNoSplit",
"prvNotifyQueueSetContainer",
"ucStaticTimerQueueStorage",
"eTaskGetState",
"main_task",
"do_system_init_fn",
"xSemaphoreCreateGenericWithCaps",
"vListInsert",
"uxListRemove",
"vRingbufferReturnItem",
"vRingbufferReturnItemFromISR",
"prvCheckItemFitsByteBuffer",
"prvGetCurMaxSizeAllowSplit",
"tick_hook",
"sys_sem_new",
"sys_arch_mbox_fetch",
"sys_arch_sem_wait",
"prvDeleteTCB",
"vQueueDeleteWithCaps",
"vRingbufferDeleteWithCaps",
"vSemaphoreDeleteWithCaps",
"prvCheckItemAvail",
"prvCheckTaskCanBeScheduledSMP",
"prvGetCurMaxSizeNoSplit",
"prvResetNextTaskUnblockTime",
"prvReturnItemByteBuf",
"vApplicationStackOverflowHook",
"vApplicationGetIdleTaskMemory",
"sys_init",
"sys_mbox_new",
"sys_arch_mbox_tryfetch",
],
"xtensa": ["xt_", "_xt_", "xPortEnterCriticalTimeout"],
"heap": ["heap_", "multi_heap"],
"spi_flash": ["spi_flash"],
"rtc": ["rtc_", "rtcio_ll_"],
"gpio_driver": ["gpio_", "pins"],
"uart_driver": ["uart", "_uart", "UART"],
"timer": ["timer_", "esp_timer"],
"peripherals": ["periph_", "periman"],
"network_stack": [
"vj_compress",
"raw_sendto",
"raw_input",
"etharp_",
"icmp_input",
"socket_ipv6",
"ip_napt",
"socket_ipv4_multicast",
"socket_ipv6_multicast",
"netconn_",
"recv_raw",
"accept_function",
"netconn_recv_data",
"netconn_accept",
"netconn_write_vectors_partly",
"netconn_drain",
"raw_connect",
"raw_bind",
"icmp_send_response",
"sockets",
"icmp_dest_unreach",
"inet_chksum_pseudo",
"alloc_socket",
"done_socket",
"set_global_fd_sets",
"inet_chksum_pbuf",
"tryget_socket_unconn_locked",
"tryget_socket_unconn",
"cs_create_ctrl_sock",
"netbuf_alloc",
],
"ipv6_stack": ["nd6_", "ip6_", "mld6_", "icmp6_", "icmp6_input"],
"wifi_stack": [
"ieee80211",
"hostap",
"sta_",
"ap_",
"scan_",
"wifi_",
"wpa_",
"wps_",
"esp_wifi",
"cnx_",
"wpa3_",
"sae_",
"wDev_",
"ic_",
"mac_",
"esf_buf",
"gWpaSm",
"sm_WPA",
"eapol_",
"owe_",
"wifiLowLevelInit",
"s_do_mapping",
"gScanStruct",
"ppSearchTxframe",
"ppMapWaitTxq",
"ppFillAMPDUBar",
"ppCheckTxConnTrafficIdle",
"ppCalTkipMic",
],
"bluetooth": ["bt_", "ble_", "l2c_", "gatt_", "gap_", "hci_", "BT_init"],
"wifi_bt_coex": ["coex"],
"bluetooth_rom": ["r_ble", "r_lld", "r_llc", "r_llm"],
"bluedroid_bt": [
"bluedroid",
"btc_",
"bta_",
"btm_",
"btu_",
"BTM_",
"GATT",
"L2CA_",
"smp_",
"gatts_",
"attp_",
"l2cu_",
"l2cb",
"smp_cb",
"BTA_GATTC_",
"SMP_",
"BTU_",
"BTA_Dm",
"GAP_Ble",
"BT_tx_if",
"host_recv_pkt_cb",
"saved_local_oob_data",
"string_to_bdaddr",
"string_is_bdaddr",
"CalConnectParamTimeout",
"transmit_fragment",
"transmit_data",
"event_command_ready",
"read_command_complete_header",
"parse_read_local_extended_features_response",
"parse_read_local_version_info_response",
"should_request_high",
"btdm_wakeup_request",
"BTA_SetAttributeValue",
"BTA_EnableBluetooth",
"transmit_command_futured",
"transmit_command",
"get_waiting_command",
"make_command",
"transmit_downward",
"host_recv_adv_packet",
"copy_extra_byte_in_db",
"parse_read_local_supported_commands_response",
],
"crypto_math": [
"ecp_",
"bignum_",
"mpi_",
"sswu",
"modp",
"dragonfly_",
"gcm_mult",
"__multiply",
"quorem",
"__mdiff",
"__lshift",
"__mprec_tens",
"ECC_",
"multiprecision_",
"mix_sub_columns",
"sbox",
"gfm2_sbox",
"gfm3_sbox",
"curve_p256",
"curve",
"p_256_init_curve",
"shift_sub_rows",
"rshift",
],
"hw_crypto": ["esp_aes", "esp_sha", "esp_rsa", "esp_bignum", "esp_mpi"],
"libc": [
"printf",
"scanf",
"malloc",
"free",
"memcpy",
"memset",
"strcpy",
"strlen",
"_dtoa",
"_fopen",
"__sfvwrite_r",
"qsort",
"__sf",
"__sflush_r",
"__srefill_r",
"_impure_data",
"_reclaim_reent",
"_open_r",
"strncpy",
"_strtod_l",
"__gethex",
"__hexnan",
"_setenv_r",
"_tzset_unlocked_r",
"__tzcalc_limits",
"select",
"scalbnf",
"strtof",
"strtof_l",
"__d2b",
"__b2d",
"__s2b",
"_Balloc",
"__multadd",
"__lo0bits",
"__atexit0",
"__smakebuf_r",
"__swhatbuf_r",
"_sungetc_r",
"_close_r",
"_link_r",
"_unsetenv_r",
"_rename_r",
"__month_lengths",
"tzinfo",
"__ratio",
"__hi0bits",
"__ulp",
"__any_on",
"__copybits",
"L_shift",
"_fcntl_r",
"_lseek_r",
"_read_r",
"_write_r",
"_unlink_r",
"_fstat_r",
"access",
"fsync",
"tcsetattr",
"tcgetattr",
"tcflush",
"tcdrain",
"__ssrefill_r",
"_stat_r",
"__hexdig_fun",
"__mcmp",
"_fwalk_sglue",
"__fpclassifyf",
"_setlocale_r",
"_mbrtowc_r",
"fcntl",
"__match",
"_lock_close",
"__c$",
"__func__$",
"__FUNCTION__$",
"DAYS_IN_MONTH",
"_DAYS_BEFORE_MONTH",
"CSWTCH$",
"dst$",
"sulp",
],
"string_ops": ["strcmp", "strncmp", "strchr", "strstr", "strtok", "strdup"],
"memory_alloc": ["malloc", "calloc", "realloc", "free", "_sbrk"],
"file_io": [
"fread",
"fwrite",
"fopen",
"fclose",
"fseek",
"ftell",
"fflush",
"s_fd_table",
],
"string_formatting": [
"snprintf",
"vsnprintf",
"sprintf",
"vsprintf",
"sscanf",
"vsscanf",
],
"cpp_anonymous": ["_GLOBAL__N_", "n$"],
"cpp_runtime": ["__cxx", "_ZN", "_ZL", "_ZSt", "__gxx_personality", "_Z16"],
"exception_handling": ["__cxa_", "_Unwind_", "__gcc_personality", "uw_frame_state"],
"static_init": ["_GLOBAL__sub_I_"],
"mdns_lib": ["mdns"],
"phy_radio": [
"phy_",
"rf_",
"chip_",
"register_chipv7",
"pbus_",
"bb_",
"fe_",
"rfcal_",
"ram_rfcal",
"tx_pwctrl",
"rx_chan",
"set_rx_gain",
"set_chan",
"agc_reg",
"ram_txiq",
"ram_txdc",
"ram_gen_rx_gain",
"rx_11b_opt",
"set_rx_sense",
"set_rx_gain_cal",
"set_chan_dig_gain",
"tx_pwctrl_init_cal",
"rfcal_txiq",
"set_tx_gain_table",
"correct_rfpll_offset",
"pll_correct_dcap",
"txiq_cal_init",
"pwdet_sar",
"pwdet_sar2_init",
"ram_iq_est_enable",
"ram_rfpll_set_freq",
"ant_wifirx_cfg",
"ant_btrx_cfg",
"force_txrxoff",
"force_txrx_off",
"tx_paon_set",
"opt_11b_resart",
"rfpll_1p2_opt",
"ram_dc_iq_est",
"ram_start_tx_tone",
"ram_en_pwdet",
"ram_cbw2040_cfg",
"rxdc_est_min",
"i2cmst_reg_init",
"temprature_sens_read",
"ram_restart_cal",
"ram_write_gain_mem",
"ram_wait_rfpll_cal_end",
"txcal_debuge_mode",
"ant_wifitx_cfg",
"reg_init_begin",
],
"wifi_phy_pp": ["pp_", "ppT", "ppR", "ppP", "ppInstall", "ppCalTxAMPDULength"],
"wifi_lmac": ["lmac"],
"wifi_device": ["wdev", "wDev_"],
"power_mgmt": [
"pm_",
"sleep",
"rtc_sleep",
"light_sleep",
"deep_sleep",
"power_down",
"g_pm",
],
"memory_mgmt": [
"mem_",
"memory_",
"tlsf_",
"memp_",
"pbuf_",
"pbuf_alloc",
"pbuf_copy_partial_pbuf",
],
"hal_layer": ["hal_"],
"clock_mgmt": [
"clk_",
"clock_",
"rtc_clk",
"apb_",
"cpu_freq",
"setCpuFrequencyMhz",
],
"cache_mgmt": ["cache"],
"flash_ops": ["flash", "image_load"],
"interrupt_handlers": [
"isr",
"interrupt",
"intr_",
"exc_",
"exception",
"port_IntStack",
],
"wrapper_functions": ["_wrapper"],
"error_handling": ["panic", "abort", "assert", "error_", "fault"],
"authentication": ["auth"],
"ppp_protocol": ["ppp", "ipcp_", "lcp_", "chap_", "LcpEchoCheck"],
"dhcp": ["dhcp", "handle_dhcp"],
"ethernet_phy": [
"emac_",
"eth_phy_",
"phy_tlk110",
"phy_lan87",
"phy_ip101",
"phy_rtl",
"phy_dp83",
"phy_ksz",
"lan87xx_",
"rtl8201_",
"ip101_",
"ksz80xx_",
"jl1101_",
"dp83848_",
"eth_on_state_changed",
],
"threading": ["pthread_", "thread_", "_task_"],
"pthread": ["pthread"],
"synchronization": ["mutex", "semaphore", "spinlock", "portMUX"],
"math_lib": [
"sin",
"cos",
"tan",
"sqrt",
"pow",
"exp",
"log",
"atan",
"asin",
"acos",
"floor",
"ceil",
"fabs",
"round",
],
"random": ["rand", "random", "rng_", "prng"],
"time_lib": [
"time",
"clock",
"gettimeofday",
"settimeofday",
"localtime",
"gmtime",
"mktime",
"strftime",
],
"console_io": ["console_", "uart_tx", "uart_rx", "puts", "putchar", "getchar"],
"rom_functions": ["r_", "rom_"],
"compiler_runtime": [
"__divdi3",
"__udivdi3",
"__moddi3",
"__muldi3",
"__ashldi3",
"__ashrdi3",
"__lshrdi3",
"__cmpdi2",
"__fixdfdi",
"__floatdidf",
],
"libgcc": ["libgcc", "_divdi3", "_udivdi3"],
"boot_startup": ["boot", "start_cpu", "call_start", "startup", "bootloader"],
"bootloader": ["bootloader_", "esp_bootloader"],
"app_framework": ["app_", "initArduino", "setup", "loop", "Update"],
"weak_symbols": ["__weak_"],
"compiler_builtins": ["__builtin_"],
"vfs": ["vfs_", "VFS"],
"esp32_sdk": ["esp32_", "esp32c", "esp32s"],
"usb": ["usb_", "USB", "cdc_", "CDC"],
"i2c_driver": ["i2c_", "I2C"],
"i2s_driver": ["i2s_", "I2S"],
"spi_driver": ["spi_", "SPI"],
"adc_driver": ["adc_", "ADC"],
"dac_driver": ["dac_", "DAC"],
"touch_driver": ["touch_", "TOUCH"],
"pwm_driver": ["pwm_", "PWM", "ledc_", "LEDC"],
"rmt_driver": ["rmt_", "RMT"],
"pcnt_driver": ["pcnt_", "PCNT"],
"can_driver": ["can_", "CAN", "twai_", "TWAI"],
"sdmmc_driver": ["sdmmc_", "SDMMC", "sdcard", "sd_card"],
"temp_sensor": ["temp_sensor", "tsens_"],
"watchdog": ["wdt_", "WDT", "watchdog"],
"brownout": ["brownout", "bod_"],
"ulp": ["ulp_", "ULP"],
"psram": ["psram", "PSRAM", "spiram", "SPIRAM"],
"efuse": ["efuse", "EFUSE"],
"partition": ["partition", "esp_partition"],
"esp_event": ["esp_event", "event_loop", "event_callback"],
"esp_console": ["esp_console", "console_"],
"chip_specific": ["chip_", "esp_chip"],
"esp_system_utils": ["esp_system", "esp_hw", "esp_clk", "esp_sleep"],
"ipc": ["esp_ipc", "ipc_"],
"wifi_config": [
"g_cnxMgr",
"gChmCxt",
"g_ic",
"TxRxCxt",
"s_dp",
"s_ni",
"s_reg_dump",
"packet$",
"d_mult_table",
"K",
"fcstab",
],
"smartconfig": ["sc_ack_send"],
"rc_calibration": ["rc_cal", "rcUpdate"],
"noise_floor": ["noise_check"],
"rf_calibration": [
"set_rx_sense",
"set_rx_gain_cal",
"set_chan_dig_gain",
"tx_pwctrl_init_cal",
"rfcal_txiq",
"set_tx_gain_table",
"correct_rfpll_offset",
"pll_correct_dcap",
"txiq_cal_init",
"pwdet_sar",
"rx_11b_opt",
],
"wifi_crypto": [
"pk_use_ecparams",
"process_segments",
"ccmp_",
"rc4_",
"aria_",
"mgf_mask",
"dh_group",
"ccmp_aad_nonce",
"ccmp_encrypt",
"rc4_skip",
"aria_sb1",
"aria_sb2",
"aria_is1",
"aria_is2",
"aria_sl",
"aria_a",
],
"radio_control": ["fsm_input", "fsm_sconfreq"],
"pbuf": [
"pbuf_",
],
"event_group": ["xEventGroup"],
"ringbuffer": ["xRingbuffer", "prvSend", "prvReceive", "prvCopy"],
"provisioning": ["prov_", "prov_stop_and_notify"],
"scan": ["gScanStruct"],
"port": ["xPort"],
"elf_loader": [
"elf_add",
"elf_add_note",
"elf_add_segment",
"process_image",
"read_encoded",
"read_encoded_value",
"read_encoded_value_with_base",
"process_image_header",
],
"socket_api": [
"sockets",
"netconn_",
"accept_function",
"recv_raw",
"socket_ipv4_multicast",
"socket_ipv6_multicast",
],
"igmp": ["igmp_", "igmp_send", "igmp_input"],
"icmp6": ["icmp6_"],
"arp": ["arp_table"],
"ampdu": [
"ampdu_",
"rcAmpdu",
"trc_onAmpduOp",
"rcAmpduLowerRate",
"ampdu_dispatch_upto",
],
"ieee802_11": ["ieee802_11_", "ieee802_11_parse_elems"],
"rate_control": ["rssi_margin", "rcGetSched", "get_rate_fcc_index"],
"nan": ["nan_dp_", "nan_dp_post_tx", "nan_dp_delete_peer"],
"channel_mgmt": ["chm_init", "chm_set_current_channel"],
"trace": ["trc_init", "trc_onAmpduOp"],
"country_code": ["country_info", "country_info_24ghz"],
"multicore": ["do_multicore_settings"],
"Update_lib": ["Update"],
"stdio": [
"__sf",
"__sflush_r",
"__srefill_r",
"_impure_data",
"_reclaim_reent",
"_open_r",
],
"strncpy_ops": ["strncpy"],
"math_internal": ["__mdiff", "__lshift", "__mprec_tens", "quorem"],
"character_class": ["__chclass"],
"camellia": ["camellia_", "camellia_feistel"],
"crypto_tables": ["FSb", "FSb2", "FSb3", "FSb4"],
"event_buffer": ["g_eb_list_desc", "eb_space"],
"base_node": ["base_node_", "base_node_add_handler"],
"file_descriptor": ["s_fd_table"],
"tx_delay": ["tx_delay_cfg"],
"deinit": ["deinit_functions"],
"lcp_echo": ["LcpEchoCheck"],
"raw_api": ["raw_bind", "raw_connect"],
"checksum": ["process_checksum"],
"entry_management": ["add_entry"],
"esp_ota": ["esp_ota", "ota_", "read_otadata"],
"http_server": [
"httpd_",
"parse_url_char",
"cb_headers_complete",
"delete_entry",
"validate_structure",
"config_save",
"config_new",
"verify_url",
"cb_url",
],
"misc_system": [
"alarm_cbs",
"start_up",
"tokens",
"unhex",
"osi_funcs_ro",
"enum_function",
"fragment_and_dispatch",
"alarm_set",
"osi_alarm_new",
"config_set_string",
"config_update_newest_section",
"config_remove_key",
"method_strings",
"interop_match",
"interop_database",
"__state_table",
"__action_table",
"s_stub_table",
"s_context",
"s_mmu_ctx",
"s_get_bus_mask",
"hli_queue_put",
"list_remove",
"list_delete",
"lock_acquire_generic",
"is_vect_desc_usable",
"io_mode_str",
"__c$20233",
"interface",
"read_id_core",
"subscribe_idle",
"unsubscribe_idle",
"s_clkout_handle",
"lock_release_generic",
"config_set_int",
"config_get_int",
"config_get_string",
"config_has_key",
"config_remove_section",
"osi_alarm_init",
"osi_alarm_deinit",
"fixed_queue_enqueue",
"fixed_queue_dequeue",
"fixed_queue_new",
"fixed_pkt_queue_enqueue",
"fixed_pkt_queue_new",
"list_append",
"list_prepend",
"list_insert_after",
"list_contains",
"list_get_node",
"hash_function_blob",
"cb_no_body",
"cb_on_body",
"profile_tab",
"get_arg",
"trim",
"buf$",
"process_appended_hash_and_sig$constprop$0",
"uuidType",
"allocate_svc_db_buf",
"_hostname_is_ours",
"s_hli_handlers",
"tick_cb",
"idle_cb",
"input",
"entry_find",
"section_find",
"find_bucket_entry_",
"config_has_section",
"hli_queue_create",
"hli_queue_get",
"hli_c_handler",
"future_ready",
"future_await",
"future_new",
"pkt_queue_enqueue",
"pkt_queue_dequeue",
"pkt_queue_cleanup",
"pkt_queue_create",
"pkt_queue_destroy",
"fixed_pkt_queue_dequeue",
"osi_alarm_cancel",
"osi_alarm_is_active",
"osi_sem_take",
"osi_event_create",
"osi_event_bind",
"alarm_cb_handler",
"list_foreach",
"list_back",
"list_front",
"list_clear",
"fixed_queue_try_peek_first",
"translate_path",
"get_idx",
"find_key",
"init",
"end",
"start",
"set_read_value",
"copy_address_list",
"copy_and_key",
"sdk_cfg_opts",
"leftshift_onebit",
"config_section_end",
"config_section_begin",
"find_entry_and_check_all_reset",
"image_validate",
"xPendingReadyList",
"vListInitialise",
"lock_init_generic",
"ant_bttx_cfg",
"ant_dft_cfg",
"cs_send_to_ctrl_sock",
"config_llc_util_funcs_reset",
"make_set_adv_report_flow_control",
"make_set_event_mask",
"raw_new",
"raw_remove",
"BTE_InitStack",
"parse_read_local_supported_features_response",
"__math_invalidf",
"tinytens",
"__mprec_tinytens",
"__mprec_bigtens",
"vRingbufferDelete",
"vRingbufferDeleteWithCaps",
"vRingbufferReturnItem",
"vRingbufferReturnItemFromISR",
"get_acl_data_size_ble",
"get_features_ble",
"get_features_classic",
"get_acl_packet_size_ble",
"get_acl_packet_size_classic",
"supports_extended_inquiry_response",
"supports_rssi_with_inquiry_results",
"supports_interlaced_inquiry_scan",
"supports_reading_remote_extended_features",
],
"bluetooth_ll": [
"lld_pdu_",
"ld_acl_",
"lld_stop_ind_handler",
"lld_evt_winsize_change",
"config_lld_evt_funcs_reset",
"config_lld_funcs_reset",
"config_llm_funcs_reset",
"llm_set_long_adv_data",
"lld_retry_tx_prog",
"llc_link_sup_to_ind_handler",
"config_llc_funcs_reset",
"lld_evt_rxwin_compute",
"config_btdm_funcs_reset",
"config_ea_funcs_reset",
"llc_defalut_state_tab_reset",
"config_rwip_funcs_reset",
"ke_lmp_rx_flooding_detect",
],
}
# Demangled patterns: patterns found in demangled C++ names
DEMANGLED_PATTERNS = {
"gpio_driver": ["GPIO"],
"uart_driver": ["UART"],
"network_stack": [
"lwip",
"tcp",
"udp",
"ip4",
"ip6",
"dhcp",
"dns",
"netif",
"ethernet",
"ppp",
"slip",
],
"wifi_stack": ["NetworkInterface"],
"nimble_bt": [
"nimble",
"NimBLE",
"ble_hs",
"ble_gap",
"ble_gatt",
"ble_att",
"ble_l2cap",
"ble_sm",
],
"crypto": ["mbedtls", "crypto", "sha", "aes", "rsa", "ecc", "tls", "ssl"],
"cpp_stdlib": ["std::", "__gnu_cxx::", "__cxxabiv"],
"static_init": ["__static_initialization"],
"rtti": ["__type_info", "__class_type_info"],
"web_server_lib": ["AsyncWebServer", "AsyncWebHandler", "WebServer"],
"async_tcp": ["AsyncClient", "AsyncServer"],
"mdns_lib": ["mdns"],
"json_lib": [
"ArduinoJson",
"JsonDocument",
"JsonArray",
"JsonObject",
"deserialize",
"serialize",
],
"http_lib": ["HTTP", "http_", "Request", "Response", "Uri", "WebSocket"],
"logging": ["log", "Log", "print", "Print", "diag_"],
"authentication": ["checkDigestAuthentication"],
"libgcc": ["libgcc"],
"esp_system": ["esp_", "ESP"],
"arduino": ["arduino"],
"nvs": ["nvs_", "_ZTVN3nvs", "nvs::"],
"filesystem": ["spiffs", "vfs"],
"libc": ["newlib"],
}
# Patterns for categorizing ESPHome core symbols into subcategories
CORE_SUBCATEGORY_PATTERNS = {
"Component Framework": ["Component"],
"Application Core": ["Application"],
"Scheduler": ["Scheduler"],
"Component Iterator": ["ComponentIterator"],
"Helper Functions": ["Helpers", "helpers"],
"Preferences/Storage": ["Preferences", "ESPPreferences"],
"I/O Utilities": ["HighFrequencyLoopRequester"],
"String Utilities": ["str_"],
"Bit Utilities": ["reverse_bits"],
"Data Conversion": ["convert_"],
"Network Utilities": ["network", "IPAddress"],
"API Protocol": ["api::"],
"WiFi Manager": ["wifi::"],
"MQTT Client": ["mqtt::"],
"Logger": ["logger::"],
"OTA Updates": ["ota::"],
"Web Server": ["web_server::"],
"Time Management": ["time::"],
"Sensor Framework": ["sensor::"],
"Binary Sensor": ["binary_sensor::"],
"Switch Framework": ["switch_::"],
"Light Framework": ["light::"],
"Climate Framework": ["climate::"],
"Cover Framework": ["cover::"],
}

View File

@@ -0,0 +1,121 @@
"""Helper functions for memory analysis."""
from functools import cache
from pathlib import Path
from .const import SECTION_MAPPING
# Import namespace constant from parent module
# Note: This would create a circular import if done at module level,
# so we'll define it locally here as well
_NAMESPACE_ESPHOME = "esphome::"
# Get the list of actual ESPHome components by scanning the components directory
@cache
def get_esphome_components():
"""Get set of actual ESPHome components from the components directory."""
# Find the components directory relative to this file
# Go up two levels from analyze_memory/helpers.py to esphome/
current_dir = Path(__file__).parent.parent
components_dir = current_dir / "components"
if not components_dir.exists() or not components_dir.is_dir():
return frozenset()
return frozenset(
item.name
for item in components_dir.iterdir()
if item.is_dir()
and not item.name.startswith(".")
and not item.name.startswith("__")
)
@cache
def get_component_class_patterns(component_name: str) -> list[str]:
"""Generate component class name patterns for symbol matching.
Args:
component_name: The component name (e.g., "ota", "wifi", "api")
Returns:
List of pattern strings to match against demangled symbols
"""
component_upper = component_name.upper()
component_camel = component_name.replace("_", "").title()
return [
f"{_NAMESPACE_ESPHOME}{component_upper}Component", # e.g., esphome::OTAComponent
f"{_NAMESPACE_ESPHOME}ESPHome{component_upper}Component", # e.g., esphome::ESPHomeOTAComponent
f"{_NAMESPACE_ESPHOME}{component_camel}Component", # e.g., esphome::OtaComponent
f"{_NAMESPACE_ESPHOME}ESPHome{component_camel}Component", # e.g., esphome::ESPHomeOtaComponent
]
def map_section_name(raw_section: str) -> str | None:
"""Map raw section name to standard section.
Args:
raw_section: Raw section name from ELF file (e.g., ".iram0.text", ".rodata.str1.1")
Returns:
Standard section name (".text", ".rodata", ".data", ".bss") or None
"""
for standard_section, patterns in SECTION_MAPPING.items():
if any(pattern in raw_section for pattern in patterns):
return standard_section
return None
def parse_symbol_line(line: str) -> tuple[str, str, int, str] | None:
"""Parse a single symbol line from objdump output.
Args:
line: Line from objdump -t output
Returns:
Tuple of (section, name, size, address) or None if not a valid symbol.
Format: address l/g w/d F/O section size name
Example: 40084870 l F .iram0.text 00000000 _xt_user_exc
"""
parts = line.split()
if len(parts) < 5:
return None
try:
# Validate and extract address
address = parts[0]
int(address, 16)
except ValueError:
return None
# Look for F (function) or O (object) flag
if "F" not in parts and "O" not in parts:
return None
# Find section, size, and name
for i, part in enumerate(parts):
if not part.startswith("."):
continue
section = map_section_name(part)
if not section:
break
# Need at least size field after section
if i + 1 >= len(parts):
break
try:
size = int(parts[i + 1], 16)
except ValueError:
break
# Need symbol name and non-zero size
if i + 2 >= len(parts) or size == 0:
break
name = " ".join(parts[i + 2 :])
return (section, name, size, address)
return None

View File

@@ -387,22 +387,22 @@ class IDEData:
@property
def objdump_path(self) -> str:
# replace gcc at end with objdump
# Windows
if self.cc_path.endswith(".exe"):
return f"{self.cc_path[:-7]}objdump.exe"
return f"{self.cc_path[:-3]}objdump"
path = self.cc_path
return (
f"{path[:-7]}objdump.exe"
if path.endswith(".exe")
else f"{path[:-3]}objdump"
)
@property
def readelf_path(self) -> str:
# replace gcc at end with readelf
# Windows
if self.cc_path.endswith(".exe"):
return f"{self.cc_path[:-7]}readelf.exe"
return f"{self.cc_path[:-3]}readelf"
path = self.cc_path
return (
f"{path[:-7]}readelf.exe"
if path.endswith(".exe")
else f"{path[:-3]}readelf"
)
def analyze_memory_usage(config: dict[str, Any]) -> None:

View File

@@ -34,6 +34,8 @@ from typing import Any
# Add esphome to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from helpers import BASE_BUS_COMPONENTS
from esphome import yaml_util
from esphome.config_helpers import Extend, Remove
@@ -67,18 +69,6 @@ NO_BUSES_SIGNATURE = "no_buses"
# Isolated components have unique signatures and cannot be merged with others
ISOLATED_SIGNATURE_PREFIX = "isolated_"
# Base bus components - these ARE the bus implementations and should not
# be flagged as needing migration since they are the platform/base components
BASE_BUS_COMPONENTS = {
"i2c",
"spi",
"uart",
"modbus",
"canbus",
"remote_transmitter",
"remote_receiver",
}
# Components that must be tested in isolation (not grouped or batched with others)
# These have known build issues that prevent grouping
# NOTE: This should be kept in sync with both test_build_components and split_components_for_ci.py

23
script/ci_helpers.py Executable file
View File

@@ -0,0 +1,23 @@
"""Common helper functions for CI scripts."""
from __future__ import annotations
import os
def write_github_output(outputs: dict[str, str | int]) -> None:
"""Write multiple outputs to GITHUB_OUTPUT or stdout.
When running in GitHub Actions, writes to the GITHUB_OUTPUT file.
When running locally, writes to stdout for debugging.
Args:
outputs: Dictionary of key-value pairs to write
"""
github_output = os.environ.get("GITHUB_OUTPUT")
if github_output:
with open(github_output, "a", encoding="utf-8") as f:
f.writelines(f"{key}={value}\n" for key, value in outputs.items())
else:
for key, value in outputs.items():
print(f"{key}={value}")

View File

@@ -0,0 +1,570 @@
#!/usr/bin/env python3
"""Post or update a PR comment with memory impact analysis results.
This script creates or updates a GitHub PR comment with memory usage changes.
It uses the GitHub CLI (gh) to manage comments and maintains a single comment
that gets updated on subsequent runs.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
import subprocess
import sys
from jinja2 import Environment, FileSystemLoader
# Add esphome to path for analyze_memory import
sys.path.insert(0, str(Path(__file__).parent.parent))
# pylint: disable=wrong-import-position
# Comment marker to identify our memory impact comments
COMMENT_MARKER = "<!-- esphome-memory-impact-analysis -->"
# Thresholds for emoji significance indicators (percentage)
OVERALL_CHANGE_THRESHOLD = 1.0 # Overall RAM/Flash changes
COMPONENT_CHANGE_THRESHOLD = 3.0 # Component breakdown changes
# Display limits for tables
MAX_COMPONENT_BREAKDOWN_ROWS = 20 # Maximum components to show in breakdown table
MAX_CHANGED_SYMBOLS_ROWS = 30 # Maximum changed symbols to show
MAX_NEW_SYMBOLS_ROWS = 15 # Maximum new symbols to show
MAX_REMOVED_SYMBOLS_ROWS = 15 # Maximum removed symbols to show
# Symbol display formatting
SYMBOL_DISPLAY_MAX_LENGTH = 100 # Max length before using <details> tag
SYMBOL_DISPLAY_TRUNCATE_LENGTH = 97 # Length to truncate in summary
# Component change noise threshold
COMPONENT_CHANGE_NOISE_THRESHOLD = 2 # Ignore component changes ≤ this many bytes
# Template directory
TEMPLATE_DIR = Path(__file__).parent / "templates"
def load_analysis_json(json_path: str) -> dict | None:
"""Load memory analysis results from JSON file.
Args:
json_path: Path to analysis JSON file
Returns:
Dictionary with analysis results or None if file doesn't exist/can't be loaded
"""
json_file = Path(json_path)
if not json_file.exists():
print(f"Analysis JSON not found: {json_path}", file=sys.stderr)
return None
try:
with open(json_file, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError) as e:
print(f"Failed to load analysis JSON: {e}", file=sys.stderr)
return None
def format_bytes(bytes_value: int) -> str:
"""Format bytes value with comma separators.
Args:
bytes_value: Number of bytes
Returns:
Formatted string with comma separators (e.g., "1,234 bytes")
"""
return f"{bytes_value:,} bytes"
def format_change(before: int, after: int, threshold: float | None = None) -> str:
"""Format memory change with delta and percentage.
Args:
before: Memory usage before change (in bytes)
after: Memory usage after change (in bytes)
threshold: Optional percentage threshold for "significant" change.
If provided, adds supplemental emoji (🎉/🚨/🔸/✅) to chart icons.
If None, only shows chart icons (📈/📉/➡️).
Returns:
Formatted string with delta and percentage
"""
delta = after - before
percentage = 0.0 if before == 0 else (delta / before) * 100
# Always use chart icons to show direction
if delta > 0:
delta_str = f"+{delta:,} bytes"
trend_icon = "📈"
# Add supplemental emoji based on threshold if provided
if threshold is not None:
significance = "🚨" if abs(percentage) > threshold else "🔸"
emoji = f"{trend_icon} {significance}"
else:
emoji = trend_icon
elif delta < 0:
delta_str = f"{delta:,} bytes"
trend_icon = "📉"
# Add supplemental emoji based on threshold if provided
if threshold is not None:
significance = "🎉" if abs(percentage) > threshold else ""
emoji = f"{trend_icon} {significance}"
else:
emoji = trend_icon
else:
delta_str = "+0 bytes"
emoji = "➡️"
# Format percentage with sign
if percentage > 0:
pct_str = f"+{percentage:.2f}%"
elif percentage < 0:
pct_str = f"{percentage:.2f}%"
else:
pct_str = "0.00%"
return f"{emoji} {delta_str} ({pct_str})"
def prepare_symbol_changes_data(
target_symbols: dict | None, pr_symbols: dict | None
) -> dict | None:
"""Prepare symbol changes data for template rendering.
Args:
target_symbols: Symbol name to size mapping for target branch
pr_symbols: Symbol name to size mapping for PR branch
Returns:
Dictionary with changed, new, and removed symbols, or None if no changes
"""
if not target_symbols or not pr_symbols:
return None
# Find all symbols that exist in both branches or only in one
all_symbols = set(target_symbols.keys()) | set(pr_symbols.keys())
# Track changes
changed_symbols: list[
tuple[str, int, int, int]
] = [] # (symbol, target_size, pr_size, delta)
new_symbols: list[tuple[str, int]] = [] # (symbol, size)
removed_symbols: list[tuple[str, int]] = [] # (symbol, size)
for symbol in all_symbols:
target_size = target_symbols.get(symbol, 0)
pr_size = pr_symbols.get(symbol, 0)
if target_size == 0 and pr_size > 0:
# New symbol
new_symbols.append((symbol, pr_size))
elif target_size > 0 and pr_size == 0:
# Removed symbol
removed_symbols.append((symbol, target_size))
elif target_size != pr_size:
# Changed symbol
delta = pr_size - target_size
changed_symbols.append((symbol, target_size, pr_size, delta))
if not changed_symbols and not new_symbols and not removed_symbols:
return None
# Sort by size/delta
changed_symbols.sort(key=lambda x: abs(x[3]), reverse=True)
new_symbols.sort(key=lambda x: x[1], reverse=True)
removed_symbols.sort(key=lambda x: x[1], reverse=True)
return {
"changed_symbols": changed_symbols,
"new_symbols": new_symbols,
"removed_symbols": removed_symbols,
}
def prepare_component_breakdown_data(
target_analysis: dict | None, pr_analysis: dict | None
) -> list[tuple[str, int, int, int]] | None:
"""Prepare component breakdown data for template rendering.
Args:
target_analysis: Component memory breakdown for target branch
pr_analysis: Component memory breakdown for PR branch
Returns:
List of tuples (component, target_flash, pr_flash, delta), or None if no changes
"""
if not target_analysis or not pr_analysis:
return None
# Combine all components from both analyses
all_components = set(target_analysis.keys()) | set(pr_analysis.keys())
# Filter to components that have changed (ignoring noise)
changed_components: list[
tuple[str, int, int, int]
] = [] # (comp, target_flash, pr_flash, delta)
for comp in all_components:
target_mem = target_analysis.get(comp, {})
pr_mem = pr_analysis.get(comp, {})
target_flash = target_mem.get("flash_total", 0)
pr_flash = pr_mem.get("flash_total", 0)
# Only include if component has meaningful change (above noise threshold)
delta = pr_flash - target_flash
if abs(delta) > COMPONENT_CHANGE_NOISE_THRESHOLD:
changed_components.append((comp, target_flash, pr_flash, delta))
if not changed_components:
return None
# Sort by absolute delta (largest changes first)
changed_components.sort(key=lambda x: abs(x[3]), reverse=True)
return changed_components
def create_comment_body(
components: list[str],
platform: str,
target_ram: int,
target_flash: int,
pr_ram: int,
pr_flash: int,
target_analysis: dict | None = None,
pr_analysis: dict | None = None,
target_symbols: dict | None = None,
pr_symbols: dict | None = None,
target_cache_hit: bool = False,
) -> str:
"""Create the comment body with memory impact analysis using Jinja2 templates.
Args:
components: List of component names (merged config)
platform: Platform name
target_ram: RAM usage in target branch
target_flash: Flash usage in target branch
pr_ram: RAM usage in PR branch
pr_flash: Flash usage in PR branch
target_analysis: Optional component breakdown for target branch
pr_analysis: Optional component breakdown for PR branch
target_symbols: Optional symbol map for target branch
pr_symbols: Optional symbol map for PR branch
target_cache_hit: Whether target branch analysis was loaded from cache
Returns:
Formatted comment body
"""
# Set up Jinja2 environment
env = Environment(
loader=FileSystemLoader(TEMPLATE_DIR),
trim_blocks=True,
lstrip_blocks=True,
)
# Register custom filters
env.filters["format_bytes"] = format_bytes
env.filters["format_change"] = format_change
# Prepare template context
context = {
"comment_marker": COMMENT_MARKER,
"platform": platform,
"target_ram": format_bytes(target_ram),
"pr_ram": format_bytes(pr_ram),
"target_flash": format_bytes(target_flash),
"pr_flash": format_bytes(pr_flash),
"ram_change": format_change(
target_ram, pr_ram, threshold=OVERALL_CHANGE_THRESHOLD
),
"flash_change": format_change(
target_flash, pr_flash, threshold=OVERALL_CHANGE_THRESHOLD
),
"target_cache_hit": target_cache_hit,
"component_change_threshold": COMPONENT_CHANGE_THRESHOLD,
}
# Format components list
if len(components) == 1:
context["components_str"] = f"`{components[0]}`"
context["config_note"] = "a representative test configuration"
else:
context["components_str"] = ", ".join(f"`{c}`" for c in sorted(components))
context["config_note"] = (
f"a merged configuration with {len(components)} components"
)
# Prepare component breakdown if available
component_breakdown = ""
if target_analysis and pr_analysis:
changed_components = prepare_component_breakdown_data(
target_analysis, pr_analysis
)
if changed_components:
template = env.get_template("ci_memory_impact_component_breakdown.j2")
component_breakdown = template.render(
changed_components=changed_components,
format_bytes=format_bytes,
format_change=format_change,
component_change_threshold=COMPONENT_CHANGE_THRESHOLD,
max_rows=MAX_COMPONENT_BREAKDOWN_ROWS,
)
# Prepare symbol changes if available
symbol_changes = ""
if target_symbols and pr_symbols:
symbol_data = prepare_symbol_changes_data(target_symbols, pr_symbols)
if symbol_data:
template = env.get_template("ci_memory_impact_symbol_changes.j2")
symbol_changes = template.render(
**symbol_data,
format_bytes=format_bytes,
format_change=format_change,
max_changed_rows=MAX_CHANGED_SYMBOLS_ROWS,
max_new_rows=MAX_NEW_SYMBOLS_ROWS,
max_removed_rows=MAX_REMOVED_SYMBOLS_ROWS,
symbol_max_length=SYMBOL_DISPLAY_MAX_LENGTH,
symbol_truncate_length=SYMBOL_DISPLAY_TRUNCATE_LENGTH,
)
if not target_analysis or not pr_analysis:
print("No ELF files provided, skipping detailed analysis", file=sys.stderr)
context["component_breakdown"] = component_breakdown
context["symbol_changes"] = symbol_changes
# Render main template
template = env.get_template("ci_memory_impact_comment_template.j2")
return template.render(**context)
def find_existing_comment(pr_number: str) -> str | None:
"""Find existing memory impact comment on the PR.
Args:
pr_number: PR number
Returns:
Comment numeric ID if found, None otherwise
Raises:
subprocess.CalledProcessError: If gh command fails
"""
print(f"DEBUG: Looking for existing comment on PR #{pr_number}", file=sys.stderr)
# Use gh api to get comments directly - this returns the numeric id field
result = subprocess.run(
[
"gh",
"api",
f"/repos/{{owner}}/{{repo}}/issues/{pr_number}/comments",
"--jq",
".[] | {id, body}",
],
capture_output=True,
text=True,
check=True,
)
print(
f"DEBUG: gh api comments output (first 500 chars):\n{result.stdout[:500]}",
file=sys.stderr,
)
# Parse comments and look for our marker
comment_count = 0
for line in result.stdout.strip().split("\n"):
if not line:
continue
try:
comment = json.loads(line)
comment_count += 1
comment_id = comment.get("id")
print(
f"DEBUG: Checking comment {comment_count}: id={comment_id}",
file=sys.stderr,
)
body = comment.get("body", "")
if COMMENT_MARKER in body:
print(
f"DEBUG: Found existing comment with id={comment_id}",
file=sys.stderr,
)
# Return the numeric id
return str(comment_id)
print("DEBUG: Comment does not contain marker", file=sys.stderr)
except json.JSONDecodeError as e:
print(f"DEBUG: JSON decode error: {e}", file=sys.stderr)
continue
print(
f"DEBUG: No existing comment found (checked {comment_count} comments)",
file=sys.stderr,
)
return None
def update_existing_comment(comment_id: str, comment_body: str) -> None:
"""Update an existing comment.
Args:
comment_id: Comment ID to update
comment_body: New comment body text
Raises:
subprocess.CalledProcessError: If gh command fails
"""
print(f"DEBUG: Updating existing comment {comment_id}", file=sys.stderr)
result = subprocess.run(
[
"gh",
"api",
f"/repos/{{owner}}/{{repo}}/issues/comments/{comment_id}",
"-X",
"PATCH",
"-f",
f"body={comment_body}",
],
check=True,
capture_output=True,
text=True,
)
print(f"DEBUG: Update response: {result.stdout}", file=sys.stderr)
def create_new_comment(pr_number: str, comment_body: str) -> None:
"""Create a new PR comment.
Args:
pr_number: PR number
comment_body: Comment body text
Raises:
subprocess.CalledProcessError: If gh command fails
"""
print(f"DEBUG: Posting new comment on PR #{pr_number}", file=sys.stderr)
result = subprocess.run(
["gh", "pr", "comment", pr_number, "--body", comment_body],
check=True,
capture_output=True,
text=True,
)
print(f"DEBUG: Post response: {result.stdout}", file=sys.stderr)
def post_or_update_comment(pr_number: str, comment_body: str) -> None:
"""Post a new comment or update existing one.
Args:
pr_number: PR number
comment_body: Comment body text
Raises:
subprocess.CalledProcessError: If gh command fails
"""
# Look for existing comment
existing_comment_id = find_existing_comment(pr_number)
if existing_comment_id and existing_comment_id != "None":
update_existing_comment(existing_comment_id, comment_body)
else:
create_new_comment(pr_number, comment_body)
print("Comment posted/updated successfully", file=sys.stderr)
def main() -> int:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Post or update PR comment with memory impact analysis"
)
parser.add_argument("--pr-number", required=True, help="PR number")
parser.add_argument(
"--components",
required=True,
help='JSON array of component names (e.g., \'["api", "wifi"]\')',
)
parser.add_argument("--platform", required=True, help="Platform name")
parser.add_argument(
"--target-ram", type=int, required=True, help="Target branch RAM usage"
)
parser.add_argument(
"--target-flash", type=int, required=True, help="Target branch flash usage"
)
parser.add_argument("--pr-ram", type=int, required=True, help="PR branch RAM usage")
parser.add_argument(
"--pr-flash", type=int, required=True, help="PR branch flash usage"
)
parser.add_argument(
"--target-json",
help="Optional path to target branch analysis JSON (for detailed analysis)",
)
parser.add_argument(
"--pr-json",
help="Optional path to PR branch analysis JSON (for detailed analysis)",
)
parser.add_argument(
"--target-cache-hit",
action="store_true",
help="Indicates that target branch analysis was loaded from cache",
)
args = parser.parse_args()
# Parse components from JSON
try:
components = json.loads(args.components)
if not isinstance(components, list):
print("Error: --components must be a JSON array", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error parsing --components JSON: {e}", file=sys.stderr)
sys.exit(1)
# Load analysis JSON files
target_analysis = None
pr_analysis = None
target_symbols = None
pr_symbols = None
if args.target_json:
target_data = load_analysis_json(args.target_json)
if target_data and target_data.get("detailed_analysis"):
target_analysis = target_data["detailed_analysis"].get("components")
target_symbols = target_data["detailed_analysis"].get("symbols")
if args.pr_json:
pr_data = load_analysis_json(args.pr_json)
if pr_data and pr_data.get("detailed_analysis"):
pr_analysis = pr_data["detailed_analysis"].get("components")
pr_symbols = pr_data["detailed_analysis"].get("symbols")
# Create comment body
# Note: Memory totals (RAM/Flash) are summed across all builds if multiple were run.
comment_body = create_comment_body(
components=components,
platform=args.platform,
target_ram=args.target_ram,
target_flash=args.target_flash,
pr_ram=args.pr_ram,
pr_flash=args.pr_flash,
target_analysis=target_analysis,
pr_analysis=pr_analysis,
target_symbols=target_symbols,
pr_symbols=pr_symbols,
target_cache_hit=args.target_cache_hit,
)
# Post or update comment
post_or_update_comment(args.pr_number, comment_body)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,281 @@
#!/usr/bin/env python3
"""Extract memory usage statistics from ESPHome build output.
This script parses the PlatformIO build output to extract RAM and flash
usage statistics for a compiled component. It's used by the CI workflow to
compare memory usage between branches.
The script reads compile output from stdin and looks for the standard
PlatformIO output format:
RAM: [==== ] 36.1% (used 29548 bytes from 81920 bytes)
Flash: [=== ] 34.0% (used 348511 bytes from 1023984 bytes)
Optionally performs detailed memory analysis if a build directory is provided.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
import re
import sys
# Add esphome to path
sys.path.insert(0, str(Path(__file__).parent.parent))
# pylint: disable=wrong-import-position
from esphome.analyze_memory import MemoryAnalyzer
from esphome.platformio_api import IDEData
from script.ci_helpers import write_github_output
# Regex patterns for extracting memory usage from PlatformIO output
_RAM_PATTERN = re.compile(r"RAM:\s+\[.*?\]\s+\d+\.\d+%\s+\(used\s+(\d+)\s+bytes")
_FLASH_PATTERN = re.compile(r"Flash:\s+\[.*?\]\s+\d+\.\d+%\s+\(used\s+(\d+)\s+bytes")
_BUILD_PATH_PATTERN = re.compile(r"Build path: (.+)")
def extract_from_compile_output(
output_text: str,
) -> tuple[int | None, int | None, str | None]:
"""Extract memory usage and build directory from PlatformIO compile output.
Supports multiple builds (for component groups or isolated components).
When test_build_components.py creates multiple builds, this sums the
memory usage across all builds.
Looks for lines like:
RAM: [==== ] 36.1% (used 29548 bytes from 81920 bytes)
Flash: [=== ] 34.0% (used 348511 bytes from 1023984 bytes)
Also extracts build directory from lines like:
INFO Compiling app... Build path: /path/to/build
Args:
output_text: Compile output text (may contain multiple builds)
Returns:
Tuple of (total_ram_bytes, total_flash_bytes, build_dir) or (None, None, None) if not found
"""
# Find all RAM and Flash matches (may be multiple builds)
ram_matches = _RAM_PATTERN.findall(output_text)
flash_matches = _FLASH_PATTERN.findall(output_text)
if not ram_matches or not flash_matches:
return None, None, None
# Sum all builds (handles multiple component groups)
total_ram = sum(int(match) for match in ram_matches)
total_flash = sum(int(match) for match in flash_matches)
# Extract build directory from ESPHome's explicit build path output
# Look for: INFO Compiling app... Build path: /path/to/build
# Note: Multiple builds reuse the same build path (each overwrites the previous)
build_dir = None
if match := _BUILD_PATH_PATTERN.search(output_text):
build_dir = match.group(1).strip()
return total_ram, total_flash, build_dir
def run_detailed_analysis(build_dir: str) -> dict | None:
"""Run detailed memory analysis on build directory.
Args:
build_dir: Path to ESPHome build directory
Returns:
Dictionary with analysis results or None if analysis fails
"""
build_path = Path(build_dir)
if not build_path.exists():
print(f"Build directory not found: {build_dir}", file=sys.stderr)
return None
# Find firmware.elf
elf_path = None
for elf_candidate in [
build_path / "firmware.elf",
build_path / ".pioenvs" / build_path.name / "firmware.elf",
]:
if elf_candidate.exists():
elf_path = str(elf_candidate)
break
if not elf_path:
print(f"firmware.elf not found in {build_dir}", file=sys.stderr)
return None
# Find idedata.json - check multiple locations
device_name = build_path.name
idedata_candidates = [
# In .pioenvs for test builds
build_path / ".pioenvs" / device_name / "idedata.json",
# In .esphome/idedata for regular builds
Path.home() / ".esphome" / "idedata" / f"{device_name}.json",
# Check parent directories for .esphome/idedata (for test_build_components)
build_path.parent.parent.parent / "idedata" / f"{device_name}.json",
]
idedata = None
for idedata_path in idedata_candidates:
if not idedata_path.exists():
continue
try:
with open(idedata_path, encoding="utf-8") as f:
raw_data = json.load(f)
idedata = IDEData(raw_data)
print(f"Loaded idedata from: {idedata_path}", file=sys.stderr)
break
except (json.JSONDecodeError, OSError) as e:
print(
f"Warning: Failed to load idedata from {idedata_path}: {e}",
file=sys.stderr,
)
analyzer = MemoryAnalyzer(elf_path, idedata=idedata)
components = analyzer.analyze()
# Convert to JSON-serializable format
result = {
"components": {
name: {
"text": mem.text_size,
"rodata": mem.rodata_size,
"data": mem.data_size,
"bss": mem.bss_size,
"flash_total": mem.flash_total,
"ram_total": mem.ram_total,
"symbol_count": mem.symbol_count,
}
for name, mem in components.items()
},
"symbols": {},
}
# Build symbol map
for section in analyzer.sections.values():
for symbol_name, size, _ in section.symbols:
if size > 0:
demangled = analyzer._demangle_symbol(symbol_name)
result["symbols"][demangled] = size
return result
def main() -> int:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Extract memory usage from ESPHome build output"
)
parser.add_argument(
"--output-env",
action="store_true",
help="Output to GITHUB_OUTPUT environment file",
)
parser.add_argument(
"--build-dir",
help="Optional build directory for detailed memory analysis (overrides auto-detection)",
)
parser.add_argument(
"--output-json",
help="Optional path to save detailed analysis JSON",
)
parser.add_argument(
"--output-build-dir",
help="Optional path to write the detected build directory",
)
args = parser.parse_args()
# Read compile output from stdin
compile_output = sys.stdin.read()
# Extract memory usage and build directory
ram_bytes, flash_bytes, detected_build_dir = extract_from_compile_output(
compile_output
)
if ram_bytes is None or flash_bytes is None:
print("Failed to extract memory usage from compile output", file=sys.stderr)
print("Expected lines like:", file=sys.stderr)
print(
" RAM: [==== ] 36.1% (used 29548 bytes from 81920 bytes)",
file=sys.stderr,
)
print(
" Flash: [=== ] 34.0% (used 348511 bytes from 1023984 bytes)",
file=sys.stderr,
)
return 1
# Count how many builds were found
num_builds = len(_RAM_PATTERN.findall(compile_output))
if num_builds > 1:
print(
f"Found {num_builds} builds - summing memory usage across all builds",
file=sys.stderr,
)
print(
"WARNING: Detailed analysis will only cover the last build",
file=sys.stderr,
)
print(f"Total RAM: {ram_bytes} bytes", file=sys.stderr)
print(f"Total Flash: {flash_bytes} bytes", file=sys.stderr)
# Determine which build directory to use (explicit arg overrides auto-detection)
build_dir = args.build_dir or detected_build_dir
if detected_build_dir:
print(f"Detected build directory: {detected_build_dir}", file=sys.stderr)
if num_builds > 1:
print(
f" (using last of {num_builds} builds for detailed analysis)",
file=sys.stderr,
)
# Write build directory to file if requested
if args.output_build_dir and build_dir:
build_dir_path = Path(args.output_build_dir)
build_dir_path.parent.mkdir(parents=True, exist_ok=True)
build_dir_path.write_text(build_dir)
print(f"Wrote build directory to {args.output_build_dir}", file=sys.stderr)
# Run detailed analysis if build directory available
detailed_analysis = None
if build_dir:
print(f"Running detailed analysis on {build_dir}", file=sys.stderr)
detailed_analysis = run_detailed_analysis(build_dir)
# Save JSON output if requested
if args.output_json:
output_data = {
"ram_bytes": ram_bytes,
"flash_bytes": flash_bytes,
"detailed_analysis": detailed_analysis,
}
output_path = Path(args.output_json)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(output_data, f, indent=2)
print(f"Saved analysis to {args.output_json}", file=sys.stderr)
if args.output_env:
# Output to GitHub Actions
write_github_output(
{
"ram_usage": ram_bytes,
"flash_usage": flash_bytes,
}
)
else:
print(f"{ram_bytes},{flash_bytes}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -10,7 +10,13 @@ what files have changed. It outputs JSON with the following structure:
"clang_format": true/false,
"python_linters": true/false,
"changed_components": ["component1", "component2", ...],
"component_test_count": 5
"component_test_count": 5,
"memory_impact": {
"should_run": "true/false",
"components": ["component1", "component2", ...],
"platform": "esp32-idf",
"use_merged_config": "true"
}
}
The CI workflow uses this information to:
@@ -20,6 +26,7 @@ The CI workflow uses this information to:
- Skip or run Python linters (ruff, flake8, pylint, pyupgrade)
- Determine which components to test individually
- Decide how to split component tests (if there are many)
- Run memory impact analysis whenever there are changed components (merged config), and also for core-only changes
Usage:
python script/determine-jobs.py [-b BRANCH]
@@ -31,6 +38,8 @@ Options:
from __future__ import annotations
import argparse
from collections import Counter
from enum import StrEnum
from functools import cache
import json
import os
@@ -40,16 +49,47 @@ import sys
from typing import Any
from helpers import (
BASE_BUS_COMPONENTS,
CPP_FILE_EXTENSIONS,
ESPHOME_COMPONENTS_PATH,
PYTHON_FILE_EXTENSIONS,
changed_files,
get_all_dependencies,
get_component_from_path,
get_component_test_files,
get_components_from_integration_fixtures,
parse_test_filename,
root_path,
)
class Platform(StrEnum):
"""Platform identifiers for memory impact analysis."""
ESP8266_ARD = "esp8266-ard"
ESP32_IDF = "esp32-idf"
ESP32_C3_IDF = "esp32-c3-idf"
ESP32_C6_IDF = "esp32-c6-idf"
ESP32_S2_IDF = "esp32-s2-idf"
ESP32_S3_IDF = "esp32-s3-idf"
# Memory impact analysis constants
MEMORY_IMPACT_FALLBACK_COMPONENT = "api" # Representative component for core changes
MEMORY_IMPACT_FALLBACK_PLATFORM = Platform.ESP32_IDF # Most representative platform
# Platform preference order for memory impact analysis
# Prefer newer platforms first as they represent the future of ESPHome
# ESP8266 is most constrained but many new features don't support it
MEMORY_IMPACT_PLATFORM_PREFERENCE = [
Platform.ESP32_C6_IDF, # ESP32-C6 IDF (newest, supports Thread/Zigbee)
Platform.ESP8266_ARD, # ESP8266 Arduino (most memory constrained - best for impact analysis)
Platform.ESP32_IDF, # ESP32 IDF platform (primary ESP32 platform, most representative)
Platform.ESP32_C3_IDF, # ESP32-C3 IDF
Platform.ESP32_S2_IDF, # ESP32-S2 IDF
Platform.ESP32_S3_IDF, # ESP32-S3 IDF
]
def should_run_integration_tests(branch: str | None = None) -> bool:
"""Determine if integration tests should run based on changed files.
@@ -105,12 +145,9 @@ def should_run_integration_tests(branch: str | None = None) -> bool:
# Check if any required components changed
for file in files:
if file.startswith(ESPHOME_COMPONENTS_PATH):
parts = file.split("/")
if len(parts) >= 3:
component = parts[2]
if component in all_required_components:
return True
component = get_component_from_path(file)
if component and component in all_required_components:
return True
return False
@@ -224,10 +261,136 @@ def _component_has_tests(component: str) -> bool:
Returns:
True if the component has test YAML files
"""
tests_dir = Path(root_path) / "tests" / "components" / component
if not tests_dir.exists():
return False
return any(tests_dir.glob("test.*.yaml"))
return bool(get_component_test_files(component))
def detect_memory_impact_config(
branch: str | None = None,
) -> dict[str, Any]:
"""Determine memory impact analysis configuration.
Always runs memory impact analysis when there are changed components,
building a merged configuration with all changed components (like
test_build_components.py does) to get comprehensive memory analysis.
Args:
branch: Branch to compare against
Returns:
Dictionary with memory impact analysis parameters:
- should_run: "true" or "false"
- components: list of component names to analyze
- platform: platform name for the merged build
- use_merged_config: "true" (always use merged config)
"""
# Get actually changed files (not dependencies)
files = changed_files(branch)
# Find all changed components (excluding core and base bus components)
changed_component_set: set[str] = set()
has_core_changes = False
for file in files:
component = get_component_from_path(file)
if component:
# Skip base bus components as they're used across many builds
if component not in BASE_BUS_COMPONENTS:
changed_component_set.add(component)
elif file.startswith("esphome/"):
# Core ESPHome files changed (not component-specific)
has_core_changes = True
# If no components changed but core changed, test representative component
force_fallback_platform = False
if not changed_component_set and has_core_changes:
print(
f"Memory impact: No components changed, but core files changed. "
f"Testing {MEMORY_IMPACT_FALLBACK_COMPONENT} component on {MEMORY_IMPACT_FALLBACK_PLATFORM}.",
file=sys.stderr,
)
changed_component_set.add(MEMORY_IMPACT_FALLBACK_COMPONENT)
force_fallback_platform = True # Use fallback platform (most representative)
elif not changed_component_set:
# No components and no core changes
return {"should_run": "false"}
# Find components that have tests and collect their supported platforms
components_with_tests: list[str] = []
component_platforms_map: dict[
str, set[Platform]
] = {} # Track which platforms each component supports
for component in sorted(changed_component_set):
# Look for test files on preferred platforms
test_files = get_component_test_files(component)
if not test_files:
continue
# Check if component has tests for any preferred platform
available_platforms = [
platform
for test_file in test_files
if (platform := parse_test_filename(test_file)[1]) != "all"
and platform in MEMORY_IMPACT_PLATFORM_PREFERENCE
]
if not available_platforms:
continue
component_platforms_map[component] = set(available_platforms)
components_with_tests.append(component)
# If no components have tests, don't run memory impact
if not components_with_tests:
return {"should_run": "false"}
# Find common platforms supported by ALL components
# This ensures we can build all components together in a merged config
common_platforms = set(MEMORY_IMPACT_PLATFORM_PREFERENCE)
for component, platforms in component_platforms_map.items():
common_platforms &= platforms
# Select the most preferred platform from the common set
# Exception: for core changes, use fallback platform (most representative of codebase)
if force_fallback_platform:
platform = MEMORY_IMPACT_FALLBACK_PLATFORM
elif common_platforms:
# Pick the most preferred platform that all components support
platform = min(common_platforms, key=MEMORY_IMPACT_PLATFORM_PREFERENCE.index)
else:
# No common platform - pick the most commonly supported platform
# This allows testing components individually even if they can't be merged
# Count how many components support each platform
platform_counts = Counter(
p for platforms in component_platforms_map.values() for p in platforms
)
# Pick the platform supported by most components, preferring earlier in MEMORY_IMPACT_PLATFORM_PREFERENCE
platform = max(
platform_counts.keys(),
key=lambda p: (
platform_counts[p],
-MEMORY_IMPACT_PLATFORM_PREFERENCE.index(p),
),
)
# Debug output
print("Memory impact analysis:", file=sys.stderr)
print(f" Changed components: {sorted(changed_component_set)}", file=sys.stderr)
print(f" Components with tests: {components_with_tests}", file=sys.stderr)
print(
f" Component platforms: {dict(sorted(component_platforms_map.items()))}",
file=sys.stderr,
)
print(f" Common platforms: {sorted(common_platforms)}", file=sys.stderr)
print(f" Selected platform: {platform}", file=sys.stderr)
return {
"should_run": "true",
"components": components_with_tests,
"platform": platform,
"use_merged_config": "true",
}
def main() -> None:
@@ -279,6 +442,9 @@ def main() -> None:
if component not in directly_changed_components
]
# Detect components for memory impact analysis (merged config)
memory_impact = detect_memory_impact_config(args.branch)
# Build output
output: dict[str, Any] = {
"integration_tests": run_integration,
@@ -292,6 +458,7 @@ def main() -> None:
"component_test_count": len(changed_components_with_tests),
"directly_changed_count": len(directly_changed_with_tests),
"dependency_only_count": len(dependency_only_components),
"memory_impact": memory_impact,
}
# Output as JSON

View File

@@ -29,6 +29,18 @@ YAML_FILE_EXTENSIONS = (".yaml", ".yml")
# Component path prefix
ESPHOME_COMPONENTS_PATH = "esphome/components/"
# Base bus components - these ARE the bus implementations and should not
# be flagged as needing migration since they are the platform/base components
BASE_BUS_COMPONENTS = {
"i2c",
"spi",
"uart",
"modbus",
"canbus",
"remote_transmitter",
"remote_receiver",
}
def parse_list_components_output(output: str) -> list[str]:
"""Parse the output from list-components.py script.
@@ -46,6 +58,65 @@ def parse_list_components_output(output: str) -> list[str]:
return [c.strip() for c in output.strip().split("\n") if c.strip()]
def parse_test_filename(test_file: Path) -> tuple[str, str]:
"""Parse test filename to extract test name and platform.
Test files follow the naming pattern: test.<platform>.yaml or test-<variant>.<platform>.yaml
Args:
test_file: Path to test file
Returns:
Tuple of (test_name, platform)
"""
parts = test_file.stem.split(".")
if len(parts) == 2:
return parts[0], parts[1] # test, platform
return parts[0], "all"
def get_component_from_path(file_path: str) -> str | None:
"""Extract component name from a file path.
Args:
file_path: Path to a file (e.g., "esphome/components/wifi/wifi.cpp")
Returns:
Component name if path is in components directory, None otherwise
"""
if not file_path.startswith(ESPHOME_COMPONENTS_PATH):
return None
parts = file_path.split("/")
if len(parts) >= 3:
return parts[2]
return None
def get_component_test_files(
component: str, *, all_variants: bool = False
) -> list[Path]:
"""Get test files for a component.
Args:
component: Component name (e.g., "wifi")
all_variants: If True, returns all test files including variants (test-*.yaml).
If False, returns only base test files (test.*.yaml).
Default is False.
Returns:
List of test file paths for the component, or empty list if none exist
"""
tests_dir = Path(root_path) / "tests" / "components" / component
if not tests_dir.exists():
return []
if all_variants:
# Match both test.*.yaml and test-*.yaml patterns
return list(tests_dir.glob("test[.-]*.yaml"))
# Match only test.*.yaml (base tests)
return list(tests_dir.glob("test.*.yaml"))
def styled(color: str | tuple[str, ...], msg: str, reset: bool = True) -> str:
prefix = "".join(color) if isinstance(color, tuple) else color
suffix = colorama.Style.RESET_ALL if reset else ""
@@ -314,11 +385,9 @@ def _filter_changed_ci(files: list[str]) -> list[str]:
# because changes in one file can affect other files in the same component.
filtered_files = []
for f in files:
if f.startswith(ESPHOME_COMPONENTS_PATH):
# Check if file belongs to any of the changed components
parts = f.split("/")
if len(parts) >= 3 and parts[2] in component_set:
filtered_files.append(f)
component = get_component_from_path(f)
if component and component in component_set:
filtered_files.append(f)
return filtered_files

View File

@@ -4,7 +4,7 @@ from collections.abc import Callable
from pathlib import Path
import sys
from helpers import changed_files, git_ls_files
from helpers import changed_files, get_component_from_path, git_ls_files
from esphome.const import (
KEY_CORE,
@@ -30,11 +30,9 @@ def get_all_component_files() -> list[str]:
def extract_component_names_array_from_files_array(files):
components = []
for file in files:
file_parts = file.split("/")
if len(file_parts) >= 4:
component_name = file_parts[2]
if component_name not in components:
components.append(component_name)
component_name = get_component_from_path(file)
if component_name and component_name not in components:
components.append(component_name)
return components

View File

@@ -28,6 +28,7 @@ from script.analyze_component_buses import (
create_grouping_signature,
merge_compatible_bus_groups,
)
from script.helpers import get_component_test_files
# Weighting for batch creation
# Isolated components can't be grouped/merged, so they count as 10x
@@ -45,17 +46,12 @@ def has_test_files(component_name: str, tests_dir: Path) -> bool:
Args:
component_name: Name of the component
tests_dir: Path to tests/components directory
tests_dir: Path to tests/components directory (unused, kept for compatibility)
Returns:
True if the component has test.*.yaml files
"""
component_dir = tests_dir / component_name
if not component_dir.exists() or not component_dir.is_dir():
return False
# Check for test.*.yaml files
return any(component_dir.glob("test.*.yaml"))
return bool(get_component_test_files(component_name))
def create_intelligent_batches(

View File

@@ -0,0 +1,27 @@
{{ comment_marker }}
## Memory Impact Analysis
**Components:** {{ components_str }}
**Platform:** `{{ platform }}`
| Metric | Target Branch | This PR | Change |
|--------|--------------|---------|--------|
| **RAM** | {{ target_ram }} | {{ pr_ram }} | {{ ram_change }} |
| **Flash** | {{ target_flash }} | {{ pr_flash }} | {{ flash_change }} |
{% if component_breakdown %}
{{ component_breakdown }}
{% endif %}
{% if symbol_changes %}
{{ symbol_changes }}
{% endif %}
{%- if target_cache_hit %}
> ⚡ Target branch analysis was loaded from cache (build skipped for faster CI).
{%- endif %}
---
> **Note:** This analysis measures **static RAM and Flash usage** only (compile-time allocation).
> **Dynamic memory (heap)** cannot be measured automatically.
> **⚠️ You must test this PR on a real device** to measure free heap and ensure no runtime memory issues.
*This analysis runs automatically when components change. Memory usage is measured from {{ config_note }}.*

View File

@@ -0,0 +1,15 @@
<details open>
<summary>📊 Component Memory Breakdown</summary>
| Component | Target Flash | PR Flash | Change |
|-----------|--------------|----------|--------|
{% for comp, target_flash, pr_flash, delta in changed_components[:max_rows] -%}
{% set threshold = component_change_threshold if comp.startswith("[esphome]") else none -%}
| `{{ comp }}` | {{ target_flash|format_bytes }} | {{ pr_flash|format_bytes }} | {{ format_change(target_flash, pr_flash, threshold=threshold) }} |
{% endfor -%}
{% if changed_components|length > max_rows -%}
| ... | ... | ... | *({{ changed_components|length - max_rows }} more components not shown)* |
{% endif -%}
</details>

View File

@@ -0,0 +1,8 @@
{#- Macro for formatting symbol names in tables -#}
{%- macro format_symbol(symbol, max_length, truncate_length) -%}
{%- if symbol|length <= max_length -%}
`{{ symbol }}`
{%- else -%}
<details><summary><code>{{ symbol[:truncate_length] }}...</code></summary><code>{{ symbol }}</code></details>
{%- endif -%}
{%- endmacro -%}

View File

@@ -0,0 +1,51 @@
{%- from 'ci_memory_impact_macros.j2' import format_symbol -%}
<details>
<summary>🔍 Symbol-Level Changes (click to expand)</summary>
{% if changed_symbols %}
### Changed Symbols
| Symbol | Target Size | PR Size | Change |
|--------|-------------|---------|--------|
{% for symbol, target_size, pr_size, delta in changed_symbols[:max_changed_rows] -%}
| {{ format_symbol(symbol, symbol_max_length, symbol_truncate_length) }} | {{ target_size|format_bytes }} | {{ pr_size|format_bytes }} | {{ format_change(target_size, pr_size) }} |
{% endfor -%}
{% if changed_symbols|length > max_changed_rows -%}
| ... | ... | ... | *({{ changed_symbols|length - max_changed_rows }} more changed symbols not shown)* |
{% endif -%}
{% endif %}
{% if new_symbols %}
### New Symbols (top {{ max_new_rows }})
| Symbol | Size |
|--------|------|
{% for symbol, size in new_symbols[:max_new_rows] -%}
| {{ format_symbol(symbol, symbol_max_length, symbol_truncate_length) }} | {{ size|format_bytes }} |
{% endfor -%}
{% if new_symbols|length > max_new_rows -%}
{% set total_new_size = new_symbols|sum(attribute=1) -%}
| *{{ new_symbols|length - max_new_rows }} more new symbols...* | *Total: {{ total_new_size|format_bytes }}* |
{% endif -%}
{% endif %}
{% if removed_symbols %}
### Removed Symbols (top {{ max_removed_rows }})
| Symbol | Size |
|--------|------|
{% for symbol, size in removed_symbols[:max_removed_rows] -%}
| {{ format_symbol(symbol, symbol_max_length, symbol_truncate_length) }} | {{ size|format_bytes }} |
{% endfor -%}
{% if removed_symbols|length > max_removed_rows -%}
{% set total_removed_size = removed_symbols|sum(attribute=1) -%}
| *{{ removed_symbols|length - max_removed_rows }} more removed symbols...* | *Total: {{ total_removed_size|format_bytes }}* |
{% endif -%}
{% endif %}
</details>

View File

@@ -39,6 +39,7 @@ from script.analyze_component_buses import (
merge_compatible_bus_groups,
uses_local_file_references,
)
from script.helpers import get_component_test_files
from script.merge_component_configs import merge_component_configs
@@ -82,13 +83,14 @@ def show_disk_space_if_ci(esphome_command: str) -> None:
def find_component_tests(
components_dir: Path, component_pattern: str = "*"
components_dir: Path, component_pattern: str = "*", base_only: bool = False
) -> dict[str, list[Path]]:
"""Find all component test files.
Args:
components_dir: Path to tests/components directory
component_pattern: Glob pattern for component names
base_only: If True, only find base test files (test.*.yaml), not variant files (test-*.yaml)
Returns:
Dictionary mapping component name to list of test files
@@ -99,9 +101,10 @@ def find_component_tests(
if not comp_dir.is_dir():
continue
# Find test files matching test.*.yaml or test-*.yaml patterns
for test_file in comp_dir.glob("test[.-]*.yaml"):
component_tests[comp_dir.name].append(test_file)
# Get test files using helper function
test_files = get_component_test_files(comp_dir.name, all_variants=not base_only)
if test_files:
component_tests[comp_dir.name] = test_files
return dict(component_tests)
@@ -931,6 +934,7 @@ def test_components(
continue_on_fail: bool,
enable_grouping: bool = True,
isolated_components: set[str] | None = None,
base_only: bool = False,
) -> int:
"""Test components with optional intelligent grouping.
@@ -944,6 +948,7 @@ def test_components(
These are tested WITHOUT --testing-mode to enable full validation
(pin conflicts, etc). This is used in CI for directly changed components
to catch issues that would be missed with --testing-mode.
base_only: If True, only test base test files (test.*.yaml), not variant files (test-*.yaml)
Returns:
Exit code (0 for success, 1 for failure)
@@ -961,7 +966,7 @@ def test_components(
# Find all component tests
all_tests = {}
for pattern in component_patterns:
all_tests.update(find_component_tests(tests_dir, pattern))
all_tests.update(find_component_tests(tests_dir, pattern, base_only))
if not all_tests:
print(f"No components found matching: {component_patterns}")
@@ -1122,6 +1127,11 @@ def main() -> int:
"These are tested WITHOUT --testing-mode to enable full validation. "
"Used in CI for directly changed components to catch pin conflicts and other issues.",
)
parser.add_argument(
"--base-only",
action="store_true",
help="Only test base test files (test.*.yaml), not variant files (test-*.yaml)",
)
args = parser.parse_args()
@@ -1140,6 +1150,7 @@ def main() -> int:
continue_on_fail=args.continue_on_fail,
enable_grouping=not args.no_grouping,
isolated_components=isolated_components,
base_only=args.base_only,
)

View File

@@ -17,6 +17,9 @@ script_dir = os.path.abspath(
)
sys.path.insert(0, script_dir)
# Import helpers module for patching
import helpers # noqa: E402
spec = importlib.util.spec_from_file_location(
"determine_jobs", os.path.join(script_dir, "determine-jobs.py")
)
@@ -59,15 +62,29 @@ def mock_subprocess_run() -> Generator[Mock, None, None]:
yield mock
@pytest.fixture
def mock_changed_files() -> Generator[Mock, None, None]:
"""Mock changed_files for memory impact detection."""
with patch.object(determine_jobs, "changed_files") as mock:
# Default to empty list
mock.return_value = []
yield mock
def test_main_all_tests_should_run(
mock_should_run_integration_tests: Mock,
mock_should_run_clang_tidy: Mock,
mock_should_run_clang_format: Mock,
mock_should_run_python_linters: Mock,
mock_subprocess_run: Mock,
mock_changed_files: Mock,
capsys: pytest.CaptureFixture[str],
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test when all tests should run."""
# Ensure we're not in GITHUB_ACTIONS mode for this test
monkeypatch.delenv("GITHUB_ACTIONS", raising=False)
mock_should_run_integration_tests.return_value = True
mock_should_run_clang_tidy.return_value = True
mock_should_run_clang_format.return_value = True
@@ -100,6 +117,9 @@ def test_main_all_tests_should_run(
assert output["component_test_count"] == len(
output["changed_components_with_tests"]
)
# memory_impact should be present
assert "memory_impact" in output
assert output["memory_impact"]["should_run"] == "false" # No files changed
def test_main_no_tests_should_run(
@@ -108,9 +128,14 @@ def test_main_no_tests_should_run(
mock_should_run_clang_format: Mock,
mock_should_run_python_linters: Mock,
mock_subprocess_run: Mock,
mock_changed_files: Mock,
capsys: pytest.CaptureFixture[str],
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test when no tests should run."""
# Ensure we're not in GITHUB_ACTIONS mode for this test
monkeypatch.delenv("GITHUB_ACTIONS", raising=False)
mock_should_run_integration_tests.return_value = False
mock_should_run_clang_tidy.return_value = False
mock_should_run_clang_format.return_value = False
@@ -136,6 +161,9 @@ def test_main_no_tests_should_run(
assert output["changed_components"] == []
assert output["changed_components_with_tests"] == []
assert output["component_test_count"] == 0
# memory_impact should be present
assert "memory_impact" in output
assert output["memory_impact"]["should_run"] == "false"
def test_main_list_components_fails(
@@ -169,9 +197,14 @@ def test_main_with_branch_argument(
mock_should_run_clang_format: Mock,
mock_should_run_python_linters: Mock,
mock_subprocess_run: Mock,
mock_changed_files: Mock,
capsys: pytest.CaptureFixture[str],
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test with branch argument."""
# Ensure we're not in GITHUB_ACTIONS mode for this test
monkeypatch.delenv("GITHUB_ACTIONS", raising=False)
mock_should_run_integration_tests.return_value = False
mock_should_run_clang_tidy.return_value = True
mock_should_run_clang_format.return_value = False
@@ -216,6 +249,9 @@ def test_main_with_branch_argument(
assert output["component_test_count"] == len(
output["changed_components_with_tests"]
)
# memory_impact should be present
assert "memory_impact" in output
assert output["memory_impact"]["should_run"] == "false"
def test_should_run_integration_tests(
@@ -403,10 +439,15 @@ def test_main_filters_components_without_tests(
mock_should_run_clang_format: Mock,
mock_should_run_python_linters: Mock,
mock_subprocess_run: Mock,
mock_changed_files: Mock,
capsys: pytest.CaptureFixture[str],
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test that components without test files are filtered out."""
# Ensure we're not in GITHUB_ACTIONS mode for this test
monkeypatch.delenv("GITHUB_ACTIONS", raising=False)
mock_should_run_integration_tests.return_value = False
mock_should_run_clang_tidy.return_value = False
mock_should_run_clang_format.return_value = False
@@ -440,9 +481,10 @@ def test_main_filters_components_without_tests(
airthings_dir = tests_dir / "airthings_ble"
airthings_dir.mkdir(parents=True)
# Mock root_path to use tmp_path
# Mock root_path to use tmp_path (need to patch both determine_jobs and helpers)
with (
patch.object(determine_jobs, "root_path", str(tmp_path)),
patch.object(helpers, "root_path", str(tmp_path)),
patch("sys.argv", ["determine-jobs.py"]),
):
# Clear the cache since we're mocking root_path
@@ -459,3 +501,188 @@ def test_main_filters_components_without_tests(
assert set(output["changed_components_with_tests"]) == {"wifi", "sensor"}
# component_test_count should be based on components with tests
assert output["component_test_count"] == 2
# memory_impact should be present
assert "memory_impact" in output
assert output["memory_impact"]["should_run"] == "false"
# Tests for detect_memory_impact_config function
def test_detect_memory_impact_config_with_common_platform(tmp_path: Path) -> None:
"""Test memory impact detection when components share a common platform."""
# Create test directory structure
tests_dir = tmp_path / "tests" / "components"
# wifi component with esp32-idf test
wifi_dir = tests_dir / "wifi"
wifi_dir.mkdir(parents=True)
(wifi_dir / "test.esp32-idf.yaml").write_text("test: wifi")
# api component with esp32-idf test
api_dir = tests_dir / "api"
api_dir.mkdir(parents=True)
(api_dir / "test.esp32-idf.yaml").write_text("test: api")
# Mock changed_files to return wifi and api component changes
with (
patch.object(determine_jobs, "root_path", str(tmp_path)),
patch.object(helpers, "root_path", str(tmp_path)),
patch.object(determine_jobs, "changed_files") as mock_changed_files,
):
mock_changed_files.return_value = [
"esphome/components/wifi/wifi.cpp",
"esphome/components/api/api.cpp",
]
determine_jobs._component_has_tests.cache_clear()
result = determine_jobs.detect_memory_impact_config()
assert result["should_run"] == "true"
assert set(result["components"]) == {"wifi", "api"}
assert result["platform"] == "esp32-idf" # Common platform
assert result["use_merged_config"] == "true"
def test_detect_memory_impact_config_core_only_changes(tmp_path: Path) -> None:
"""Test memory impact detection with core-only changes (no component changes)."""
# Create test directory structure with fallback component
tests_dir = tmp_path / "tests" / "components"
# api component (fallback component) with esp32-idf test
api_dir = tests_dir / "api"
api_dir.mkdir(parents=True)
(api_dir / "test.esp32-idf.yaml").write_text("test: api")
# Mock changed_files to return only core files (no component files)
with (
patch.object(determine_jobs, "root_path", str(tmp_path)),
patch.object(helpers, "root_path", str(tmp_path)),
patch.object(determine_jobs, "changed_files") as mock_changed_files,
):
mock_changed_files.return_value = [
"esphome/core/application.cpp",
"esphome/core/component.h",
]
determine_jobs._component_has_tests.cache_clear()
result = determine_jobs.detect_memory_impact_config()
assert result["should_run"] == "true"
assert result["components"] == ["api"] # Fallback component
assert result["platform"] == "esp32-idf" # Fallback platform
assert result["use_merged_config"] == "true"
def test_detect_memory_impact_config_no_common_platform(tmp_path: Path) -> None:
"""Test memory impact detection when components have no common platform."""
# Create test directory structure
tests_dir = tmp_path / "tests" / "components"
# wifi component only has esp32-idf test
wifi_dir = tests_dir / "wifi"
wifi_dir.mkdir(parents=True)
(wifi_dir / "test.esp32-idf.yaml").write_text("test: wifi")
# logger component only has esp8266-ard test
logger_dir = tests_dir / "logger"
logger_dir.mkdir(parents=True)
(logger_dir / "test.esp8266-ard.yaml").write_text("test: logger")
# Mock changed_files to return both components
with (
patch.object(determine_jobs, "root_path", str(tmp_path)),
patch.object(helpers, "root_path", str(tmp_path)),
patch.object(determine_jobs, "changed_files") as mock_changed_files,
):
mock_changed_files.return_value = [
"esphome/components/wifi/wifi.cpp",
"esphome/components/logger/logger.cpp",
]
determine_jobs._component_has_tests.cache_clear()
result = determine_jobs.detect_memory_impact_config()
# Should pick the most frequently supported platform
assert result["should_run"] == "true"
assert set(result["components"]) == {"wifi", "logger"}
# When no common platform, picks most commonly supported
# esp8266-ard is preferred over esp32-idf in the preference list
assert result["platform"] in ["esp32-idf", "esp8266-ard"]
assert result["use_merged_config"] == "true"
def test_detect_memory_impact_config_no_changes(tmp_path: Path) -> None:
"""Test memory impact detection when no files changed."""
# Mock changed_files to return empty list
with (
patch.object(determine_jobs, "root_path", str(tmp_path)),
patch.object(helpers, "root_path", str(tmp_path)),
patch.object(determine_jobs, "changed_files") as mock_changed_files,
):
mock_changed_files.return_value = []
determine_jobs._component_has_tests.cache_clear()
result = determine_jobs.detect_memory_impact_config()
assert result["should_run"] == "false"
def test_detect_memory_impact_config_no_components_with_tests(tmp_path: Path) -> None:
"""Test memory impact detection when changed components have no tests."""
# Create test directory structure
tests_dir = tmp_path / "tests" / "components"
# Create component directory but no test files
custom_component_dir = tests_dir / "my_custom_component"
custom_component_dir.mkdir(parents=True)
# Mock changed_files to return component without tests
with (
patch.object(determine_jobs, "root_path", str(tmp_path)),
patch.object(helpers, "root_path", str(tmp_path)),
patch.object(determine_jobs, "changed_files") as mock_changed_files,
):
mock_changed_files.return_value = [
"esphome/components/my_custom_component/component.cpp",
]
determine_jobs._component_has_tests.cache_clear()
result = determine_jobs.detect_memory_impact_config()
assert result["should_run"] == "false"
def test_detect_memory_impact_config_skips_base_bus_components(tmp_path: Path) -> None:
"""Test that base bus components (i2c, spi, uart) are skipped."""
# Create test directory structure
tests_dir = tmp_path / "tests" / "components"
# i2c component (should be skipped as it's a base bus component)
i2c_dir = tests_dir / "i2c"
i2c_dir.mkdir(parents=True)
(i2c_dir / "test.esp32-idf.yaml").write_text("test: i2c")
# wifi component (should not be skipped)
wifi_dir = tests_dir / "wifi"
wifi_dir.mkdir(parents=True)
(wifi_dir / "test.esp32-idf.yaml").write_text("test: wifi")
# Mock changed_files to return both i2c and wifi
with (
patch.object(determine_jobs, "root_path", str(tmp_path)),
patch.object(helpers, "root_path", str(tmp_path)),
patch.object(determine_jobs, "changed_files") as mock_changed_files,
):
mock_changed_files.return_value = [
"esphome/components/i2c/i2c.cpp",
"esphome/components/wifi/wifi.cpp",
]
determine_jobs._component_has_tests.cache_clear()
result = determine_jobs.detect_memory_impact_config()
# Should only include wifi, not i2c
assert result["should_run"] == "true"
assert result["components"] == ["wifi"]
assert "i2c" not in result["components"]

View File

@@ -387,6 +387,42 @@ def test_idedata_addr2line_path_unix(setup_core: Path) -> None:
assert result == "/usr/bin/addr2line"
def test_idedata_objdump_path_windows(setup_core: Path) -> None:
"""Test IDEData.objdump_path on Windows."""
raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "C:\\tools\\gcc.exe"}
idedata = platformio_api.IDEData(raw_data)
result = idedata.objdump_path
assert result == "C:\\tools\\objdump.exe"
def test_idedata_objdump_path_unix(setup_core: Path) -> None:
"""Test IDEData.objdump_path on Unix."""
raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "/usr/bin/gcc"}
idedata = platformio_api.IDEData(raw_data)
result = idedata.objdump_path
assert result == "/usr/bin/objdump"
def test_idedata_readelf_path_windows(setup_core: Path) -> None:
"""Test IDEData.readelf_path on Windows."""
raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "C:\\tools\\gcc.exe"}
idedata = platformio_api.IDEData(raw_data)
result = idedata.readelf_path
assert result == "C:\\tools\\readelf.exe"
def test_idedata_readelf_path_unix(setup_core: Path) -> None:
"""Test IDEData.readelf_path on Unix."""
raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "/usr/bin/gcc"}
idedata = platformio_api.IDEData(raw_data)
result = idedata.readelf_path
assert result == "/usr/bin/readelf"
def test_patch_structhash(setup_core: Path) -> None:
"""Test patch_structhash monkey patches platformio functions."""
# Create simple namespace objects to act as modules