#!/usr/bin/env python3 """Post or update a PR comment with memory impact analysis results. This script creates or updates a GitHub PR comment with memory usage changes. It uses the GitHub CLI (gh) to manage comments and maintains a single comment that gets updated on subsequent runs. """ from __future__ import annotations import argparse import json from pathlib import Path import subprocess import sys # Add esphome to path for analyze_memory import sys.path.insert(0, str(Path(__file__).parent.parent)) # pylint: disable=wrong-import-position # Comment marker to identify our memory impact comments COMMENT_MARKER = "" def load_analysis_json(json_path: str) -> dict | None: """Load memory analysis results from JSON file. Args: json_path: Path to analysis JSON file Returns: Dictionary with analysis results or None if file doesn't exist/can't be loaded """ json_file = Path(json_path) if not json_file.exists(): print(f"Analysis JSON not found: {json_path}", file=sys.stderr) return None try: with open(json_file, encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError) as e: print(f"Failed to load analysis JSON: {e}", file=sys.stderr) return None def format_bytes(bytes_value: int) -> str: """Format bytes value with comma separators. Args: bytes_value: Number of bytes Returns: Formatted string with comma separators (e.g., "1,234 bytes") """ return f"{bytes_value:,} bytes" def format_change(before: int, after: int) -> str: """Format memory change with delta and percentage. Args: before: Memory usage before change (in bytes) after: Memory usage after change (in bytes) Returns: Formatted string with delta and percentage """ delta = after - before percentage = 0.0 if before == 0 else (delta / before) * 100 # Format delta with sign and always show in bytes for precision if delta > 0: delta_str = f"+{delta:,} bytes" # Use 🚨 for significant increases (>1%), 🔸 for smaller ones emoji = "🚨" if abs(percentage) > 1.0 else "🔸" elif delta < 0: delta_str = f"{delta:,} bytes" # Use 🎉 for significant reductions (>1%), ✅ for smaller ones emoji = "🎉" if abs(percentage) > 1.0 else "✅" else: delta_str = "+0 bytes" emoji = "➡️" # Format percentage with sign if percentage > 0: pct_str = f"+{percentage:.2f}%" elif percentage < 0: pct_str = f"{percentage:.2f}%" else: pct_str = "0.00%" return f"{emoji} {delta_str} ({pct_str})" def format_symbol_for_display(symbol: str) -> str: """Format a symbol name for display in markdown table. Args: symbol: Symbol name to format Returns: Formatted symbol with backticks or HTML details tag for long names """ if len(symbol) <= 100: return f"`{symbol}`" # Use HTML details for very long symbols (no backticks inside HTML) return f"
{symbol[:97]}...{symbol}
" def create_symbol_changes_table( target_symbols: dict | None, pr_symbols: dict | None ) -> str: """Create a markdown table showing symbols that changed size. Args: target_symbols: Symbol name to size mapping for target branch pr_symbols: Symbol name to size mapping for PR branch Returns: Formatted markdown table """ if not target_symbols or not pr_symbols: return "" # Find all symbols that exist in both branches or only in one all_symbols = set(target_symbols.keys()) | set(pr_symbols.keys()) # Track changes changed_symbols = [] new_symbols = [] removed_symbols = [] for symbol in all_symbols: target_size = target_symbols.get(symbol, 0) pr_size = pr_symbols.get(symbol, 0) if target_size == 0 and pr_size > 0: # New symbol new_symbols.append((symbol, pr_size)) elif target_size > 0 and pr_size == 0: # Removed symbol removed_symbols.append((symbol, target_size)) elif target_size != pr_size: # Changed symbol delta = pr_size - target_size changed_symbols.append((symbol, target_size, pr_size, delta)) if not changed_symbols and not new_symbols and not removed_symbols: return "" lines = [ "", "
", "🔍 Symbol-Level Changes (click to expand)", "", ] # Show changed symbols (sorted by absolute delta) if changed_symbols: changed_symbols.sort(key=lambda x: abs(x[3]), reverse=True) lines.extend( [ "### Changed Symbols", "", "| Symbol | Target Size | PR Size | Change |", "|--------|-------------|---------|--------|", ] ) # Show top 30 changes for symbol, target_size, pr_size, delta in changed_symbols[:30]: target_str = format_bytes(target_size) pr_str = format_bytes(pr_size) change_str = format_change(target_size, pr_size) display_symbol = format_symbol_for_display(symbol) lines.append( f"| {display_symbol} | {target_str} | {pr_str} | {change_str} |" ) if len(changed_symbols) > 30: lines.append( f"| ... | ... | ... | *({len(changed_symbols) - 30} more changed symbols not shown)* |" ) lines.append("") # Show new symbols if new_symbols: new_symbols.sort(key=lambda x: x[1], reverse=True) lines.extend( [ "### New Symbols (top 15)", "", "| Symbol | Size |", "|--------|------|", ] ) for symbol, size in new_symbols[:15]: display_symbol = format_symbol_for_display(symbol) lines.append(f"| {display_symbol} | {format_bytes(size)} |") if len(new_symbols) > 15: total_new_size = sum(s[1] for s in new_symbols) lines.append( f"| *{len(new_symbols) - 15} more new symbols...* | *Total: {format_bytes(total_new_size)}* |" ) lines.append("") # Show removed symbols if removed_symbols: removed_symbols.sort(key=lambda x: x[1], reverse=True) lines.extend( [ "### Removed Symbols (top 15)", "", "| Symbol | Size |", "|--------|------|", ] ) for symbol, size in removed_symbols[:15]: display_symbol = format_symbol_for_display(symbol) lines.append(f"| {display_symbol} | {format_bytes(size)} |") if len(removed_symbols) > 15: total_removed_size = sum(s[1] for s in removed_symbols) lines.append( f"| *{len(removed_symbols) - 15} more removed symbols...* | *Total: {format_bytes(total_removed_size)}* |" ) lines.append("") lines.extend(["
", ""]) return "\n".join(lines) def create_detailed_breakdown_table( target_analysis: dict | None, pr_analysis: dict | None ) -> str: """Create a markdown table showing detailed memory breakdown by component. Args: target_analysis: Component memory breakdown for target branch pr_analysis: Component memory breakdown for PR branch Returns: Formatted markdown table """ if not target_analysis or not pr_analysis: return "" # Combine all components from both analyses all_components = set(target_analysis.keys()) | set(pr_analysis.keys()) # Filter to components that have changed (ignoring noise ≤2 bytes) changed_components = [] for comp in all_components: target_mem = target_analysis.get(comp, {}) pr_mem = pr_analysis.get(comp, {}) target_flash = target_mem.get("flash_total", 0) pr_flash = pr_mem.get("flash_total", 0) # Only include if component has meaningful change (>2 bytes) delta = pr_flash - target_flash if abs(delta) > 2: changed_components.append((comp, target_flash, pr_flash, delta)) if not changed_components: return "" # Sort by absolute delta (largest changes first) changed_components.sort(key=lambda x: abs(x[3]), reverse=True) # Build table - limit to top 20 changes lines = [ "", "
", "📊 Component Memory Breakdown", "", "| Component | Target Flash | PR Flash | Change |", "|-----------|--------------|----------|--------|", ] for comp, target_flash, pr_flash, delta in changed_components[:20]: target_str = format_bytes(target_flash) pr_str = format_bytes(pr_flash) change_str = format_change(target_flash, pr_flash) lines.append(f"| `{comp}` | {target_str} | {pr_str} | {change_str} |") if len(changed_components) > 20: lines.append( f"| ... | ... | ... | *({len(changed_components) - 20} more components not shown)* |" ) lines.extend(["", "
", ""]) return "\n".join(lines) def create_comment_body( components: list[str], platform: str, target_ram: int, target_flash: int, pr_ram: int, pr_flash: int, target_analysis: dict | None = None, pr_analysis: dict | None = None, target_symbols: dict | None = None, pr_symbols: dict | None = None, target_cache_hit: bool = False, ) -> str: """Create the comment body with memory impact analysis. Args: components: List of component names (merged config) platform: Platform name target_ram: RAM usage in target branch target_flash: Flash usage in target branch pr_ram: RAM usage in PR branch pr_flash: Flash usage in PR branch target_analysis: Optional component breakdown for target branch pr_analysis: Optional component breakdown for PR branch target_symbols: Optional symbol map for target branch pr_symbols: Optional symbol map for PR branch target_cache_hit: Whether target branch analysis was loaded from cache Returns: Formatted comment body """ ram_change = format_change(target_ram, pr_ram) flash_change = format_change(target_flash, pr_flash) # Use provided analysis data if available component_breakdown = "" symbol_changes = "" if target_analysis and pr_analysis: component_breakdown = create_detailed_breakdown_table( target_analysis, pr_analysis ) if target_symbols and pr_symbols: symbol_changes = create_symbol_changes_table(target_symbols, pr_symbols) else: print("No ELF files provided, skipping detailed analysis", file=sys.stderr) # Format components list if len(components) == 1: components_str = f"`{components[0]}`" config_note = "a representative test configuration" else: components_str = ", ".join(f"`{c}`" for c in sorted(components)) config_note = f"a merged configuration with {len(components)} components" # Add cache info note if target was cached cache_note = "" if target_cache_hit: cache_note = "\n\n> ⚡ Target branch analysis was loaded from cache (build skipped for faster CI)." return f"""{COMMENT_MARKER} ## Memory Impact Analysis **Components:** {components_str} **Platform:** `{platform}` | Metric | Target Branch | This PR | Change | |--------|--------------|---------|--------| | **RAM** | {format_bytes(target_ram)} | {format_bytes(pr_ram)} | {ram_change} | | **Flash** | {format_bytes(target_flash)} | {format_bytes(pr_flash)} | {flash_change} | {component_breakdown}{symbol_changes}{cache_note} --- > **Note:** This analysis measures **static RAM and Flash usage** only (compile-time allocation). > **Dynamic memory (heap)** cannot be measured automatically. > **⚠️ You must test this PR on a real device** to measure free heap and ensure no runtime memory issues. *This analysis runs automatically when components change. Memory usage is measured from {config_note}.* """ def find_existing_comment(pr_number: str) -> str | None: """Find existing memory impact comment on the PR. Args: pr_number: PR number Returns: Comment numeric ID if found, None otherwise """ try: print( f"DEBUG: Looking for existing comment on PR #{pr_number}", file=sys.stderr ) # Use gh api to get comments directly - this returns the numeric id field result = subprocess.run( [ "gh", "api", f"/repos/{{owner}}/{{repo}}/issues/{pr_number}/comments", "--jq", ".[] | {id, body}", ], capture_output=True, text=True, check=True, ) print( f"DEBUG: gh api comments output (first 500 chars):\n{result.stdout[:500]}", file=sys.stderr, ) # Parse comments and look for our marker comment_count = 0 for line in result.stdout.strip().split("\n"): if not line: continue try: comment = json.loads(line) comment_count += 1 comment_id = comment.get("id") print( f"DEBUG: Checking comment {comment_count}: id={comment_id}", file=sys.stderr, ) body = comment.get("body", "") if COMMENT_MARKER in body: print( f"DEBUG: Found existing comment with id={comment_id}", file=sys.stderr, ) # Return the numeric id return str(comment_id) print("DEBUG: Comment does not contain marker", file=sys.stderr) except json.JSONDecodeError as e: print(f"DEBUG: JSON decode error: {e}", file=sys.stderr) continue print( f"DEBUG: No existing comment found (checked {comment_count} comments)", file=sys.stderr, ) return None except subprocess.CalledProcessError as e: print(f"Error finding existing comment: {e}", file=sys.stderr) if e.stderr: print(f"stderr: {e.stderr.decode()}", file=sys.stderr) return None def post_or_update_comment(pr_number: str, comment_body: str) -> bool: """Post a new comment or update existing one. Args: pr_number: PR number comment_body: Comment body text Returns: True if successful, False otherwise """ # Look for existing comment existing_comment_id = find_existing_comment(pr_number) try: if existing_comment_id and existing_comment_id != "None": # Update existing comment print( f"DEBUG: Updating existing comment {existing_comment_id}", file=sys.stderr, ) result = subprocess.run( [ "gh", "api", f"/repos/{{owner}}/{{repo}}/issues/comments/{existing_comment_id}", "-X", "PATCH", "-f", f"body={comment_body}", ], check=True, capture_output=True, text=True, ) print(f"DEBUG: Update response: {result.stdout}", file=sys.stderr) else: # Post new comment print( f"DEBUG: Posting new comment (existing_comment_id={existing_comment_id})", file=sys.stderr, ) result = subprocess.run( ["gh", "pr", "comment", pr_number, "--body", comment_body], check=True, capture_output=True, text=True, ) print(f"DEBUG: Post response: {result.stdout}", file=sys.stderr) print("Comment posted/updated successfully", file=sys.stderr) return True except subprocess.CalledProcessError as e: print(f"Error posting/updating comment: {e}", file=sys.stderr) if e.stderr: print( f"stderr: {e.stderr.decode() if isinstance(e.stderr, bytes) else e.stderr}", file=sys.stderr, ) if e.stdout: print( f"stdout: {e.stdout.decode() if isinstance(e.stdout, bytes) else e.stdout}", file=sys.stderr, ) return False def main() -> int: """Main entry point.""" parser = argparse.ArgumentParser( description="Post or update PR comment with memory impact analysis" ) parser.add_argument("--pr-number", required=True, help="PR number") parser.add_argument( "--components", required=True, help='JSON array of component names (e.g., \'["api", "wifi"]\')', ) parser.add_argument("--platform", required=True, help="Platform name") parser.add_argument( "--target-ram", type=int, required=True, help="Target branch RAM usage" ) parser.add_argument( "--target-flash", type=int, required=True, help="Target branch flash usage" ) parser.add_argument("--pr-ram", type=int, required=True, help="PR branch RAM usage") parser.add_argument( "--pr-flash", type=int, required=True, help="PR branch flash usage" ) parser.add_argument( "--target-json", help="Optional path to target branch analysis JSON (for detailed analysis)", ) parser.add_argument( "--pr-json", help="Optional path to PR branch analysis JSON (for detailed analysis)", ) parser.add_argument( "--target-cache-hit", action="store_true", help="Indicates that target branch analysis was loaded from cache", ) args = parser.parse_args() # Parse components from JSON try: components = json.loads(args.components) if not isinstance(components, list): print("Error: --components must be a JSON array", file=sys.stderr) sys.exit(1) except json.JSONDecodeError as e: print(f"Error parsing --components JSON: {e}", file=sys.stderr) sys.exit(1) # Load analysis JSON files target_analysis = None pr_analysis = None target_symbols = None pr_symbols = None if args.target_json: target_data = load_analysis_json(args.target_json) if target_data and target_data.get("detailed_analysis"): target_analysis = target_data["detailed_analysis"].get("components") target_symbols = target_data["detailed_analysis"].get("symbols") if args.pr_json: pr_data = load_analysis_json(args.pr_json) if pr_data and pr_data.get("detailed_analysis"): pr_analysis = pr_data["detailed_analysis"].get("components") pr_symbols = pr_data["detailed_analysis"].get("symbols") # Create comment body # Note: Memory totals (RAM/Flash) are summed across all builds if multiple were run. comment_body = create_comment_body( components=components, platform=args.platform, target_ram=args.target_ram, target_flash=args.target_flash, pr_ram=args.pr_ram, pr_flash=args.pr_flash, target_analysis=target_analysis, pr_analysis=pr_analysis, target_symbols=target_symbols, pr_symbols=pr_symbols, target_cache_hit=args.target_cache_hit, ) # Post or update comment success = post_or_update_comment(args.pr_number, comment_body) return 0 if success else 1 if __name__ == "__main__": sys.exit(main())