mirror of
https://github.com/esphome/esphome.git
synced 2025-10-20 18:53:47 +01:00
preen
This commit is contained in:
@@ -411,137 +411,115 @@ def find_existing_comment(pr_number: str) -> str | None:
|
||||
|
||||
Returns:
|
||||
Comment numeric ID if found, None otherwise
|
||||
|
||||
Raises:
|
||||
subprocess.CalledProcessError: If gh command fails
|
||||
"""
|
||||
try:
|
||||
print(
|
||||
f"DEBUG: Looking for existing comment on PR #{pr_number}", file=sys.stderr
|
||||
)
|
||||
print(f"DEBUG: Looking for existing comment on PR #{pr_number}", file=sys.stderr)
|
||||
|
||||
# Use gh api to get comments directly - this returns the numeric id field
|
||||
result = subprocess.run(
|
||||
[
|
||||
"gh",
|
||||
"api",
|
||||
f"/repos/{{owner}}/{{repo}}/issues/{pr_number}/comments",
|
||||
"--jq",
|
||||
".[] | {id, body}",
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
# Use gh api to get comments directly - this returns the numeric id field
|
||||
result = subprocess.run(
|
||||
[
|
||||
"gh",
|
||||
"api",
|
||||
f"/repos/{{owner}}/{{repo}}/issues/{pr_number}/comments",
|
||||
"--jq",
|
||||
".[] | {id, body}",
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
|
||||
print(
|
||||
f"DEBUG: gh api comments output (first 500 chars):\n{result.stdout[:500]}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
print(
|
||||
f"DEBUG: gh api comments output (first 500 chars):\n{result.stdout[:500]}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
# Parse comments and look for our marker
|
||||
comment_count = 0
|
||||
for line in result.stdout.strip().split("\n"):
|
||||
if not line:
|
||||
continue
|
||||
# Parse comments and look for our marker
|
||||
comment_count = 0
|
||||
for line in result.stdout.strip().split("\n"):
|
||||
if not line:
|
||||
continue
|
||||
|
||||
try:
|
||||
comment = json.loads(line)
|
||||
comment_count += 1
|
||||
comment_id = comment.get("id")
|
||||
try:
|
||||
comment = json.loads(line)
|
||||
comment_count += 1
|
||||
comment_id = comment.get("id")
|
||||
print(
|
||||
f"DEBUG: Checking comment {comment_count}: id={comment_id}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
body = comment.get("body", "")
|
||||
if COMMENT_MARKER in body:
|
||||
print(
|
||||
f"DEBUG: Checking comment {comment_count}: id={comment_id}",
|
||||
f"DEBUG: Found existing comment with id={comment_id}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
# Return the numeric id
|
||||
return str(comment_id)
|
||||
print("DEBUG: Comment does not contain marker", file=sys.stderr)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"DEBUG: JSON decode error: {e}", file=sys.stderr)
|
||||
continue
|
||||
|
||||
body = comment.get("body", "")
|
||||
if COMMENT_MARKER in body:
|
||||
print(
|
||||
f"DEBUG: Found existing comment with id={comment_id}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
# Return the numeric id
|
||||
return str(comment_id)
|
||||
print("DEBUG: Comment does not contain marker", file=sys.stderr)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"DEBUG: JSON decode error: {e}", file=sys.stderr)
|
||||
continue
|
||||
|
||||
print(
|
||||
f"DEBUG: No existing comment found (checked {comment_count} comments)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return None
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Error finding existing comment: {e}", file=sys.stderr)
|
||||
if e.stderr:
|
||||
print(f"stderr: {e.stderr.decode()}", file=sys.stderr)
|
||||
return None
|
||||
print(
|
||||
f"DEBUG: No existing comment found (checked {comment_count} comments)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def post_or_update_comment(pr_number: str, comment_body: str) -> bool:
|
||||
def post_or_update_comment(pr_number: str, comment_body: str) -> None:
|
||||
"""Post a new comment or update existing one.
|
||||
|
||||
Args:
|
||||
pr_number: PR number
|
||||
comment_body: Comment body text
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
Raises:
|
||||
subprocess.CalledProcessError: If gh command fails
|
||||
"""
|
||||
# Look for existing comment
|
||||
existing_comment_id = find_existing_comment(pr_number)
|
||||
|
||||
try:
|
||||
if existing_comment_id and existing_comment_id != "None":
|
||||
# Update existing comment
|
||||
print(
|
||||
f"DEBUG: Updating existing comment {existing_comment_id}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
result = subprocess.run(
|
||||
[
|
||||
"gh",
|
||||
"api",
|
||||
f"/repos/{{owner}}/{{repo}}/issues/comments/{existing_comment_id}",
|
||||
"-X",
|
||||
"PATCH",
|
||||
"-f",
|
||||
f"body={comment_body}",
|
||||
],
|
||||
check=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
print(f"DEBUG: Update response: {result.stdout}", file=sys.stderr)
|
||||
else:
|
||||
# Post new comment
|
||||
print(
|
||||
f"DEBUG: Posting new comment (existing_comment_id={existing_comment_id})",
|
||||
file=sys.stderr,
|
||||
)
|
||||
result = subprocess.run(
|
||||
["gh", "pr", "comment", pr_number, "--body", comment_body],
|
||||
check=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
print(f"DEBUG: Post response: {result.stdout}", file=sys.stderr)
|
||||
if existing_comment_id and existing_comment_id != "None":
|
||||
# Update existing comment
|
||||
print(
|
||||
f"DEBUG: Updating existing comment {existing_comment_id}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
result = subprocess.run(
|
||||
[
|
||||
"gh",
|
||||
"api",
|
||||
f"/repos/{{owner}}/{{repo}}/issues/comments/{existing_comment_id}",
|
||||
"-X",
|
||||
"PATCH",
|
||||
"-f",
|
||||
f"body={comment_body}",
|
||||
],
|
||||
check=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
print(f"DEBUG: Update response: {result.stdout}", file=sys.stderr)
|
||||
else:
|
||||
# Post new comment
|
||||
print(
|
||||
f"DEBUG: Posting new comment (existing_comment_id={existing_comment_id})",
|
||||
file=sys.stderr,
|
||||
)
|
||||
result = subprocess.run(
|
||||
["gh", "pr", "comment", pr_number, "--body", comment_body],
|
||||
check=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
print(f"DEBUG: Post response: {result.stdout}", file=sys.stderr)
|
||||
|
||||
print("Comment posted/updated successfully", file=sys.stderr)
|
||||
return True
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Error posting/updating comment: {e}", file=sys.stderr)
|
||||
if e.stderr:
|
||||
print(
|
||||
f"stderr: {e.stderr.decode() if isinstance(e.stderr, bytes) else e.stderr}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
if e.stdout:
|
||||
print(
|
||||
f"stdout: {e.stdout.decode() if isinstance(e.stdout, bytes) else e.stdout}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return False
|
||||
print("Comment posted/updated successfully", file=sys.stderr)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
|
@@ -27,6 +27,11 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
# pylint: disable=wrong-import-position
|
||||
from script.ci_helpers import write_github_output
|
||||
|
||||
# Regex patterns for extracting memory usage from PlatformIO output
|
||||
_RAM_PATTERN = re.compile(r"RAM:\s+\[.*?\]\s+\d+\.\d+%\s+\(used\s+(\d+)\s+bytes")
|
||||
_FLASH_PATTERN = re.compile(r"Flash:\s+\[.*?\]\s+\d+\.\d+%\s+\(used\s+(\d+)\s+bytes")
|
||||
_BUILD_PATH_PATTERN = re.compile(r"Build path: (.+)")
|
||||
|
||||
|
||||
def extract_from_compile_output(
|
||||
output_text: str,
|
||||
@@ -42,7 +47,7 @@ def extract_from_compile_output(
|
||||
Flash: [=== ] 34.0% (used 348511 bytes from 1023984 bytes)
|
||||
|
||||
Also extracts build directory from lines like:
|
||||
INFO Deleting /path/to/build/.esphome/build/componenttestesp8266ard/.pioenvs
|
||||
INFO Compiling app... Build path: /path/to/build
|
||||
|
||||
Args:
|
||||
output_text: Compile output text (may contain multiple builds)
|
||||
@@ -51,12 +56,8 @@ def extract_from_compile_output(
|
||||
Tuple of (total_ram_bytes, total_flash_bytes, build_dir) or (None, None, None) if not found
|
||||
"""
|
||||
# Find all RAM and Flash matches (may be multiple builds)
|
||||
ram_matches = re.findall(
|
||||
r"RAM:\s+\[.*?\]\s+\d+\.\d+%\s+\(used\s+(\d+)\s+bytes", output_text
|
||||
)
|
||||
flash_matches = re.findall(
|
||||
r"Flash:\s+\[.*?\]\s+\d+\.\d+%\s+\(used\s+(\d+)\s+bytes", output_text
|
||||
)
|
||||
ram_matches = _RAM_PATTERN.findall(output_text)
|
||||
flash_matches = _FLASH_PATTERN.findall(output_text)
|
||||
|
||||
if not ram_matches or not flash_matches:
|
||||
return None, None, None
|
||||
|
Reference in New Issue
Block a user