From 9cf1fd24fd5e1a91bbc5fe49ed941b60dce5eb49 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 17 Oct 2025 18:06:13 -1000 Subject: [PATCH] preen --- esphome/analyze_memory/__init__.py | 42 +++---- esphome/analyze_memory/cli.py | 2 +- script/ci_memory_impact_comment.py | 196 +++++++++++++---------------- script/ci_memory_impact_extract.py | 15 +-- 4 files changed, 116 insertions(+), 139 deletions(-) diff --git a/esphome/analyze_memory/__init__.py b/esphome/analyze_memory/__init__.py index 3e85c4d869..942caabe70 100644 --- a/esphome/analyze_memory/__init__.py +++ b/esphome/analyze_memory/__init__.py @@ -77,7 +77,7 @@ class MemoryAnalyzer: readelf_path: str | None = None, external_components: set[str] | None = None, idedata: "IDEData | None" = None, - ): + ) -> None: """Initialize memory analyzer. Args: @@ -311,15 +311,13 @@ class MemoryAnalyzer: potential_cppfilt, ) else: - _LOGGER.info( - "✗ Using system c++filt (objdump_path=%s)", self.objdump_path - ) + _LOGGER.info("✗ Using system c++filt (objdump_path=%s)", self.objdump_path) # Strip GCC optimization suffixes and prefixes before demangling # Suffixes like $isra$0, $part$0, $constprop$0 confuse c++filt # Prefixes like _GLOBAL__sub_I_ need to be removed and tracked - symbols_stripped = [] - symbols_prefixes = [] # Track removed prefixes + symbols_stripped: list[str] = [] + symbols_prefixes: list[str] = [] # Track removed prefixes for symbol in symbols: # Remove GCC optimization markers stripped = re.sub(r"\$(?:isra|part|constprop)\$\d+", "", symbol) @@ -327,12 +325,11 @@ class MemoryAnalyzer: # Handle GCC global constructor/initializer prefixes # _GLOBAL__sub_I_ -> extract for demangling prefix = "" - if stripped.startswith("_GLOBAL__sub_I_"): - prefix = "_GLOBAL__sub_I_" - stripped = stripped[len(prefix) :] - elif stripped.startswith("_GLOBAL__sub_D_"): - prefix = "_GLOBAL__sub_D_" - stripped = stripped[len(prefix) :] + for gcc_prefix in _GCC_PREFIX_ANNOTATIONS: + if stripped.startswith(gcc_prefix): + prefix = gcc_prefix + stripped = stripped[len(prefix) :] + break symbols_stripped.append(stripped) symbols_prefixes.append(prefix) @@ -405,17 +402,18 @@ class MemoryAnalyzer: if stripped == demangled and stripped.startswith("_Z"): failed_count += 1 if failed_count <= 5: # Only log first 5 failures - _LOGGER.warning("Failed to demangle: %s", original[:100]) + _LOGGER.warning("Failed to demangle: %s", original) - if failed_count > 0: - _LOGGER.warning( - "Failed to demangle %d/%d symbols using %s", - failed_count, - len(symbols), - cppfilt_cmd, - ) - else: - _LOGGER.warning("Successfully demangled all %d symbols", len(symbols)) + if failed_count == 0: + _LOGGER.info("Successfully demangled all %d symbols", len(symbols)) + return + + _LOGGER.warning( + "Failed to demangle %d/%d symbols using %s", + failed_count, + len(symbols), + cppfilt_cmd, + ) @staticmethod def _restore_symbol_prefix(prefix: str, stripped: str, demangled: str) -> str: diff --git a/esphome/analyze_memory/cli.py b/esphome/analyze_memory/cli.py index bcf9f45de9..a2366430dd 100644 --- a/esphome/analyze_memory/cli.py +++ b/esphome/analyze_memory/cli.py @@ -83,7 +83,7 @@ class MemoryAnalyzerCLI(MemoryAnalyzer): total_ram = sum(c.ram_total for _, c in components) # Build report - lines = [] + lines: list[str] = [] lines.append("=" * self.TABLE_WIDTH) lines.append("Component Memory Analysis".center(self.TABLE_WIDTH)) diff --git a/script/ci_memory_impact_comment.py b/script/ci_memory_impact_comment.py index 8b0dbb6f58..d177b101a8 100755 --- a/script/ci_memory_impact_comment.py +++ b/script/ci_memory_impact_comment.py @@ -411,137 +411,115 @@ def find_existing_comment(pr_number: str) -> str | None: Returns: Comment numeric ID if found, None otherwise + + Raises: + subprocess.CalledProcessError: If gh command fails """ - try: - print( - f"DEBUG: Looking for existing comment on PR #{pr_number}", file=sys.stderr - ) + print(f"DEBUG: Looking for existing comment on PR #{pr_number}", file=sys.stderr) - # Use gh api to get comments directly - this returns the numeric id field - result = subprocess.run( - [ - "gh", - "api", - f"/repos/{{owner}}/{{repo}}/issues/{pr_number}/comments", - "--jq", - ".[] | {id, body}", - ], - capture_output=True, - text=True, - check=True, - ) + # Use gh api to get comments directly - this returns the numeric id field + result = subprocess.run( + [ + "gh", + "api", + f"/repos/{{owner}}/{{repo}}/issues/{pr_number}/comments", + "--jq", + ".[] | {id, body}", + ], + capture_output=True, + text=True, + check=True, + ) - print( - f"DEBUG: gh api comments output (first 500 chars):\n{result.stdout[:500]}", - file=sys.stderr, - ) + print( + f"DEBUG: gh api comments output (first 500 chars):\n{result.stdout[:500]}", + file=sys.stderr, + ) - # Parse comments and look for our marker - comment_count = 0 - for line in result.stdout.strip().split("\n"): - if not line: - continue + # Parse comments and look for our marker + comment_count = 0 + for line in result.stdout.strip().split("\n"): + if not line: + continue - try: - comment = json.loads(line) - comment_count += 1 - comment_id = comment.get("id") + try: + comment = json.loads(line) + comment_count += 1 + comment_id = comment.get("id") + print( + f"DEBUG: Checking comment {comment_count}: id={comment_id}", + file=sys.stderr, + ) + + body = comment.get("body", "") + if COMMENT_MARKER in body: print( - f"DEBUG: Checking comment {comment_count}: id={comment_id}", + f"DEBUG: Found existing comment with id={comment_id}", file=sys.stderr, ) + # Return the numeric id + return str(comment_id) + print("DEBUG: Comment does not contain marker", file=sys.stderr) + except json.JSONDecodeError as e: + print(f"DEBUG: JSON decode error: {e}", file=sys.stderr) + continue - body = comment.get("body", "") - if COMMENT_MARKER in body: - print( - f"DEBUG: Found existing comment with id={comment_id}", - file=sys.stderr, - ) - # Return the numeric id - return str(comment_id) - print("DEBUG: Comment does not contain marker", file=sys.stderr) - except json.JSONDecodeError as e: - print(f"DEBUG: JSON decode error: {e}", file=sys.stderr) - continue - - print( - f"DEBUG: No existing comment found (checked {comment_count} comments)", - file=sys.stderr, - ) - return None - - except subprocess.CalledProcessError as e: - print(f"Error finding existing comment: {e}", file=sys.stderr) - if e.stderr: - print(f"stderr: {e.stderr.decode()}", file=sys.stderr) - return None + print( + f"DEBUG: No existing comment found (checked {comment_count} comments)", + file=sys.stderr, + ) + return None -def post_or_update_comment(pr_number: str, comment_body: str) -> bool: +def post_or_update_comment(pr_number: str, comment_body: str) -> None: """Post a new comment or update existing one. Args: pr_number: PR number comment_body: Comment body text - Returns: - True if successful, False otherwise + Raises: + subprocess.CalledProcessError: If gh command fails """ # Look for existing comment existing_comment_id = find_existing_comment(pr_number) - try: - if existing_comment_id and existing_comment_id != "None": - # Update existing comment - print( - f"DEBUG: Updating existing comment {existing_comment_id}", - file=sys.stderr, - ) - result = subprocess.run( - [ - "gh", - "api", - f"/repos/{{owner}}/{{repo}}/issues/comments/{existing_comment_id}", - "-X", - "PATCH", - "-f", - f"body={comment_body}", - ], - check=True, - capture_output=True, - text=True, - ) - print(f"DEBUG: Update response: {result.stdout}", file=sys.stderr) - else: - # Post new comment - print( - f"DEBUG: Posting new comment (existing_comment_id={existing_comment_id})", - file=sys.stderr, - ) - result = subprocess.run( - ["gh", "pr", "comment", pr_number, "--body", comment_body], - check=True, - capture_output=True, - text=True, - ) - print(f"DEBUG: Post response: {result.stdout}", file=sys.stderr) + if existing_comment_id and existing_comment_id != "None": + # Update existing comment + print( + f"DEBUG: Updating existing comment {existing_comment_id}", + file=sys.stderr, + ) + result = subprocess.run( + [ + "gh", + "api", + f"/repos/{{owner}}/{{repo}}/issues/comments/{existing_comment_id}", + "-X", + "PATCH", + "-f", + f"body={comment_body}", + ], + check=True, + capture_output=True, + text=True, + ) + print(f"DEBUG: Update response: {result.stdout}", file=sys.stderr) + else: + # Post new comment + print( + f"DEBUG: Posting new comment (existing_comment_id={existing_comment_id})", + file=sys.stderr, + ) + result = subprocess.run( + ["gh", "pr", "comment", pr_number, "--body", comment_body], + check=True, + capture_output=True, + text=True, + ) + print(f"DEBUG: Post response: {result.stdout}", file=sys.stderr) - print("Comment posted/updated successfully", file=sys.stderr) - return True - - except subprocess.CalledProcessError as e: - print(f"Error posting/updating comment: {e}", file=sys.stderr) - if e.stderr: - print( - f"stderr: {e.stderr.decode() if isinstance(e.stderr, bytes) else e.stderr}", - file=sys.stderr, - ) - if e.stdout: - print( - f"stdout: {e.stdout.decode() if isinstance(e.stdout, bytes) else e.stdout}", - file=sys.stderr, - ) - return False + print("Comment posted/updated successfully", file=sys.stderr) def main() -> int: diff --git a/script/ci_memory_impact_extract.py b/script/ci_memory_impact_extract.py index 76632ebc33..5522d522f0 100755 --- a/script/ci_memory_impact_extract.py +++ b/script/ci_memory_impact_extract.py @@ -27,6 +27,11 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) # pylint: disable=wrong-import-position from script.ci_helpers import write_github_output +# Regex patterns for extracting memory usage from PlatformIO output +_RAM_PATTERN = re.compile(r"RAM:\s+\[.*?\]\s+\d+\.\d+%\s+\(used\s+(\d+)\s+bytes") +_FLASH_PATTERN = re.compile(r"Flash:\s+\[.*?\]\s+\d+\.\d+%\s+\(used\s+(\d+)\s+bytes") +_BUILD_PATH_PATTERN = re.compile(r"Build path: (.+)") + def extract_from_compile_output( output_text: str, @@ -42,7 +47,7 @@ def extract_from_compile_output( Flash: [=== ] 34.0% (used 348511 bytes from 1023984 bytes) Also extracts build directory from lines like: - INFO Deleting /path/to/build/.esphome/build/componenttestesp8266ard/.pioenvs + INFO Compiling app... Build path: /path/to/build Args: output_text: Compile output text (may contain multiple builds) @@ -51,12 +56,8 @@ def extract_from_compile_output( Tuple of (total_ram_bytes, total_flash_bytes, build_dir) or (None, None, None) if not found """ # Find all RAM and Flash matches (may be multiple builds) - ram_matches = re.findall( - r"RAM:\s+\[.*?\]\s+\d+\.\d+%\s+\(used\s+(\d+)\s+bytes", output_text - ) - flash_matches = re.findall( - r"Flash:\s+\[.*?\]\s+\d+\.\d+%\s+\(used\s+(\d+)\s+bytes", output_text - ) + ram_matches = _RAM_PATTERN.findall(output_text) + flash_matches = _FLASH_PATTERN.findall(output_text) if not ram_matches or not flash_matches: return None, None, None