1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-30 22:53:59 +00:00

Merge remote-tracking branch 'upstream/dev' into integration

This commit is contained in:
J. Nick Koston
2025-07-09 15:20:35 -10:00
47 changed files with 2310 additions and 136 deletions

View File

@@ -0,0 +1,12 @@
i2c:
- id: i2c_gl_r01_i2c
scl: ${scl_pin}
sda: ${sda_pin}
sensor:
- platform: gl_r01_i2c
id: tof
name: "ToF sensor"
i2c_id: i2c_gl_r01_i2c
address: 0x74
update_interval: 15s

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO16
sda_pin: GPIO17
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO5
sda_pin: GPIO4
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO5
sda_pin: GPIO4
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO16
sda_pin: GPIO17
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO5
sda_pin: GPIO4
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO5
sda_pin: GPIO4
<<: !include common.yaml

View File

@@ -0,0 +1,8 @@
sensor:
- platform: lps22
address: 0x5d
update_interval: 10s
temperature:
name: "LPS22 Temperature"
pressure:
name: "LPS22 Pressure"

View File

@@ -0,0 +1,6 @@
i2c:
- id: i2c_lps22
scl: 16
sda: 17
<<: !include common.yaml

View File

@@ -0,0 +1,6 @@
i2c:
- id: i2c_lps22
scl: 5
sda: 4
<<: !include common.yaml

View File

@@ -0,0 +1,6 @@
i2c:
- id: i2c_lps22
scl: 5
sda: 4
<<: !include common.yaml

View File

@@ -0,0 +1,6 @@
i2c:
- id: i2c_lps22
scl: 16
sda: 17
<<: !include common.yaml

View File

@@ -0,0 +1,6 @@
i2c:
- id: i2c_lps22
scl: 5
sda: 4
<<: !include common.yaml

View File

@@ -0,0 +1,6 @@
i2c:
- id: i2c_lps22
scl: 5
sda: 4
<<: !include common.yaml

View File

@@ -5,12 +5,14 @@ from __future__ import annotations
import asyncio
from collections.abc import AsyncGenerator, Callable, Generator
from contextlib import AbstractAsyncContextManager, asynccontextmanager
import fcntl
import logging
import os
from pathlib import Path
import platform
import signal
import socket
import subprocess
import sys
import tempfile
from typing import TextIO
@@ -50,6 +52,66 @@ if platform.system() == "Windows":
import pty # not available on Windows
def _get_platformio_env(cache_dir: Path) -> dict[str, str]:
"""Get environment variables for PlatformIO with shared cache."""
env = os.environ.copy()
env["PLATFORMIO_CORE_DIR"] = str(cache_dir)
env["PLATFORMIO_CACHE_DIR"] = str(cache_dir / ".cache")
env["PLATFORMIO_LIBDEPS_DIR"] = str(cache_dir / "libdeps")
return env
@pytest.fixture(scope="session")
def shared_platformio_cache() -> Generator[Path]:
"""Initialize a shared PlatformIO cache for all integration tests."""
# Use a dedicated directory for integration tests to avoid conflicts
test_cache_dir = Path.home() / ".esphome-integration-tests"
cache_dir = test_cache_dir / "platformio"
# Use a lock file in the home directory to ensure only one process initializes the cache
# This is needed when running with pytest-xdist
# The lock file must be in a directory that already exists to avoid race conditions
lock_file = Path.home() / ".esphome-integration-tests-init.lock"
# Always acquire the lock to ensure cache is ready before proceeding
with open(lock_file, "w") as lock_fd:
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX)
# Check if cache needs initialization while holding the lock
if not cache_dir.exists() or not any(cache_dir.iterdir()):
# Create the test cache directory if it doesn't exist
test_cache_dir.mkdir(exist_ok=True)
with tempfile.TemporaryDirectory() as tmpdir:
# Create a basic host config
init_dir = Path(tmpdir)
config_path = init_dir / "cache_init.yaml"
config_path.write_text("""esphome:
name: cache-init
host:
api:
encryption:
key: "IIevImVI42I0FGos5nLqFK91jrJehrgidI0ArwMLr8w="
logger:
""")
# Run compilation to populate the cache
# We must succeed here to avoid race conditions where multiple
# tests try to populate the same cache directory simultaneously
env = _get_platformio_env(cache_dir)
subprocess.run(
["esphome", "compile", str(config_path)],
check=True,
cwd=init_dir,
env=env,
)
# Lock is held until here, ensuring cache is fully populated before any test proceeds
yield cache_dir
@pytest.fixture(scope="module", autouse=True)
def enable_aioesphomeapi_debug_logging():
"""Enable debug logging for aioesphomeapi to help diagnose connection issues."""
@@ -161,22 +223,14 @@ async def write_yaml_config(
@pytest_asyncio.fixture
async def compile_esphome(
integration_test_dir: Path,
shared_platformio_cache: Path,
) -> AsyncGenerator[CompileFunction]:
"""Compile an ESPHome configuration and return the binary path."""
async def _compile(config_path: Path) -> Path:
# Create a unique PlatformIO directory for this test to avoid race conditions
platformio_dir = integration_test_dir / ".platformio"
platformio_dir.mkdir(parents=True, exist_ok=True)
# Create cache directory as well
platformio_cache_dir = platformio_dir / ".cache"
platformio_cache_dir.mkdir(parents=True, exist_ok=True)
# Set up environment with isolated PlatformIO directories
env = os.environ.copy()
env["PLATFORMIO_CORE_DIR"] = str(platformio_dir)
env["PLATFORMIO_CACHE_DIR"] = str(platformio_cache_dir)
# Use the shared PlatformIO cache for faster compilation
# This avoids re-downloading dependencies for each test
env = _get_platformio_env(shared_platformio_cache)
# Retry compilation up to 3 times if we get a segfault
max_retries = 3

View File

@@ -23,19 +23,6 @@ void SchedulerStringLifetimeComponent::run_string_lifetime_test() {
test_vector_reallocation();
test_string_move_semantics();
test_lambda_capture_lifetime();
// Schedule final check
this->set_timeout("final_check", 200, [this]() {
ESP_LOGI(TAG, "Tests passed: %d", this->tests_passed_);
ESP_LOGI(TAG, "Tests failed: %d", this->tests_failed_);
if (this->tests_failed_ == 0) {
ESP_LOGI(TAG, "SUCCESS: All string lifetime tests passed!");
} else {
ESP_LOGE(TAG, "FAILURE: %d string lifetime tests failed!", this->tests_failed_);
}
ESP_LOGI(TAG, "String lifetime tests complete");
});
}
void SchedulerStringLifetimeComponent::run_test1() {
@@ -69,7 +56,6 @@ void SchedulerStringLifetimeComponent::run_test5() {
}
void SchedulerStringLifetimeComponent::run_final_check() {
ESP_LOGI(TAG, "String lifetime tests complete");
ESP_LOGI(TAG, "Tests passed: %d", this->tests_passed_);
ESP_LOGI(TAG, "Tests failed: %d", this->tests_failed_);
@@ -78,6 +64,7 @@ void SchedulerStringLifetimeComponent::run_final_check() {
} else {
ESP_LOGE(TAG, "FAILURE: %d string lifetime tests failed!", this->tests_failed_);
}
ESP_LOGI(TAG, "String lifetime tests complete");
}
void SchedulerStringLifetimeComponent::test_temporary_string_lifetime() {

0
tests/script/__init__.py Normal file
View File

View File

@@ -0,0 +1,359 @@
"""Unit tests for script/clang_tidy_hash.py module."""
import hashlib
from pathlib import Path
import sys
from unittest.mock import Mock, patch
import pytest
# Add the script directory to Python path so we can import clang_tidy_hash
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "script"))
import clang_tidy_hash # noqa: E402
@pytest.mark.parametrize(
("file_content", "expected"),
[
(
"clang-tidy==18.1.5 # via -r requirements_dev.in\n",
"clang-tidy==18.1.5 # via -r requirements_dev.in",
),
(
"other-package==1.0\nclang-tidy==17.0.0\nmore-packages==2.0\n",
"clang-tidy==17.0.0",
),
(
"# comment\nclang-tidy==16.0.0 # some comment\n",
"clang-tidy==16.0.0 # some comment",
),
("no-clang-tidy-here==1.0\n", "clang-tidy version not found"),
],
)
def test_get_clang_tidy_version_from_requirements(
file_content: str, expected: str
) -> None:
"""Test extracting clang-tidy version from various file formats."""
# Mock read_file_lines to return our test content
with patch("clang_tidy_hash.read_file_lines") as mock_read:
mock_read.return_value = file_content.splitlines(keepends=True)
result = clang_tidy_hash.get_clang_tidy_version_from_requirements()
assert result == expected
@pytest.mark.parametrize(
("platformio_content", "expected_flags"),
[
(
"[env:esp32]\n"
"platform = espressif32\n"
"\n"
"[flags:clangtidy]\n"
"build_flags = -Wall\n"
"extra_flags = -Wextra\n"
"\n"
"[env:esp8266]\n",
"build_flags = -Wall\nextra_flags = -Wextra",
),
(
"[flags:clangtidy]\n# Comment line\nbuild_flags = -O2\n\n[next_section]\n",
"build_flags = -O2",
),
(
"[flags:clangtidy]\nflag_c = -std=c99\nflag_b = -Wall\nflag_a = -O2\n",
"flag_a = -O2\nflag_b = -Wall\nflag_c = -std=c99", # Sorted
),
(
"[env:esp32]\nplatform = espressif32\n", # No clangtidy section
"",
),
],
)
def test_extract_platformio_flags(platformio_content: str, expected_flags: str) -> None:
"""Test extracting clang-tidy flags from platformio.ini."""
# Mock read_file_lines to return our test content
with patch("clang_tidy_hash.read_file_lines") as mock_read:
mock_read.return_value = platformio_content.splitlines(keepends=True)
result = clang_tidy_hash.extract_platformio_flags()
assert result == expected_flags
def test_calculate_clang_tidy_hash() -> None:
"""Test calculating hash from all configuration sources."""
clang_tidy_content = b"Checks: '-*,readability-*'\n"
requirements_version = "clang-tidy==18.1.5"
pio_flags = "build_flags = -Wall"
# Expected hash calculation
expected_hasher = hashlib.sha256()
expected_hasher.update(clang_tidy_content)
expected_hasher.update(requirements_version.encode())
expected_hasher.update(pio_flags.encode())
expected_hash = expected_hasher.hexdigest()
# Mock the dependencies
with (
patch("clang_tidy_hash.read_file_bytes", return_value=clang_tidy_content),
patch(
"clang_tidy_hash.get_clang_tidy_version_from_requirements",
return_value=requirements_version,
),
patch("clang_tidy_hash.extract_platformio_flags", return_value=pio_flags),
):
result = clang_tidy_hash.calculate_clang_tidy_hash()
assert result == expected_hash
def test_read_stored_hash_exists(tmp_path: Path) -> None:
"""Test reading hash when file exists."""
stored_hash = "abc123def456"
hash_file = tmp_path / ".clang-tidy.hash"
hash_file.write_text(f"{stored_hash}\n")
with (
patch("clang_tidy_hash.Path") as mock_path_class,
patch("clang_tidy_hash.read_file_lines", return_value=[f"{stored_hash}\n"]),
):
# Mock the path calculation and exists check
mock_hash_file = Mock()
mock_hash_file.exists.return_value = True
mock_path_class.return_value.parent.parent.__truediv__.return_value = (
mock_hash_file
)
result = clang_tidy_hash.read_stored_hash()
assert result == stored_hash
def test_read_stored_hash_not_exists() -> None:
"""Test reading hash when file doesn't exist."""
with patch("clang_tidy_hash.Path") as mock_path_class:
# Mock the path calculation and exists check
mock_hash_file = Mock()
mock_hash_file.exists.return_value = False
mock_path_class.return_value.parent.parent.__truediv__.return_value = (
mock_hash_file
)
result = clang_tidy_hash.read_stored_hash()
assert result is None
def test_write_hash() -> None:
"""Test writing hash to file."""
hash_value = "abc123def456"
with patch("clang_tidy_hash.write_file_content") as mock_write:
clang_tidy_hash.write_hash(hash_value)
# Verify write_file_content was called with correct parameters
mock_write.assert_called_once()
args = mock_write.call_args[0]
assert str(args[0]).endswith(".clang-tidy.hash")
assert args[1] == hash_value
@pytest.mark.parametrize(
("args", "current_hash", "stored_hash", "expected_exit"),
[
(["--check"], "abc123", "abc123", 1), # Hashes match, no scan needed
(["--check"], "abc123", "def456", 0), # Hashes differ, scan needed
(["--check"], "abc123", None, 0), # No stored hash, scan needed
],
)
def test_main_check_mode(
args: list[str], current_hash: str, stored_hash: str | None, expected_exit: int
) -> None:
"""Test main function in check mode."""
with (
patch("sys.argv", ["clang_tidy_hash.py"] + args),
patch("clang_tidy_hash.calculate_clang_tidy_hash", return_value=current_hash),
patch("clang_tidy_hash.read_stored_hash", return_value=stored_hash),
pytest.raises(SystemExit) as exc_info,
):
clang_tidy_hash.main()
assert exc_info.value.code == expected_exit
def test_main_update_mode(capsys: pytest.CaptureFixture[str]) -> None:
"""Test main function in update mode."""
current_hash = "abc123"
with (
patch("sys.argv", ["clang_tidy_hash.py", "--update"]),
patch("clang_tidy_hash.calculate_clang_tidy_hash", return_value=current_hash),
patch("clang_tidy_hash.write_hash") as mock_write,
):
clang_tidy_hash.main()
mock_write.assert_called_once_with(current_hash)
captured = capsys.readouterr()
assert f"Hash updated: {current_hash}" in captured.out
@pytest.mark.parametrize(
("current_hash", "stored_hash"),
[
("abc123", "def456"), # Hash changed, should update
("abc123", None), # No stored hash, should update
],
)
def test_main_update_if_changed_mode_update(
current_hash: str, stored_hash: str | None, capsys: pytest.CaptureFixture[str]
) -> None:
"""Test main function in update-if-changed mode when update is needed."""
with (
patch("sys.argv", ["clang_tidy_hash.py", "--update-if-changed"]),
patch("clang_tidy_hash.calculate_clang_tidy_hash", return_value=current_hash),
patch("clang_tidy_hash.read_stored_hash", return_value=stored_hash),
patch("clang_tidy_hash.write_hash") as mock_write,
pytest.raises(SystemExit) as exc_info,
):
clang_tidy_hash.main()
assert exc_info.value.code == 0
mock_write.assert_called_once_with(current_hash)
captured = capsys.readouterr()
assert "Clang-tidy hash updated" in captured.out
def test_main_update_if_changed_mode_no_update(
capsys: pytest.CaptureFixture[str],
) -> None:
"""Test main function in update-if-changed mode when no update is needed."""
current_hash = "abc123"
stored_hash = "abc123"
with (
patch("sys.argv", ["clang_tidy_hash.py", "--update-if-changed"]),
patch("clang_tidy_hash.calculate_clang_tidy_hash", return_value=current_hash),
patch("clang_tidy_hash.read_stored_hash", return_value=stored_hash),
patch("clang_tidy_hash.write_hash") as mock_write,
pytest.raises(SystemExit) as exc_info,
):
clang_tidy_hash.main()
assert exc_info.value.code == 0
mock_write.assert_not_called()
captured = capsys.readouterr()
assert "Clang-tidy hash unchanged" in captured.out
def test_main_verify_mode_success(capsys: pytest.CaptureFixture[str]) -> None:
"""Test main function in verify mode when verification passes."""
current_hash = "abc123"
stored_hash = "abc123"
with (
patch("sys.argv", ["clang_tidy_hash.py", "--verify"]),
patch("clang_tidy_hash.calculate_clang_tidy_hash", return_value=current_hash),
patch("clang_tidy_hash.read_stored_hash", return_value=stored_hash),
):
clang_tidy_hash.main()
captured = capsys.readouterr()
assert "Hash verification passed" in captured.out
@pytest.mark.parametrize(
("current_hash", "stored_hash"),
[
("abc123", "def456"), # Hashes differ, verification fails
("abc123", None), # No stored hash, verification fails
],
)
def test_main_verify_mode_failure(
current_hash: str, stored_hash: str | None, capsys: pytest.CaptureFixture[str]
) -> None:
"""Test main function in verify mode when verification fails."""
with (
patch("sys.argv", ["clang_tidy_hash.py", "--verify"]),
patch("clang_tidy_hash.calculate_clang_tidy_hash", return_value=current_hash),
patch("clang_tidy_hash.read_stored_hash", return_value=stored_hash),
pytest.raises(SystemExit) as exc_info,
):
clang_tidy_hash.main()
assert exc_info.value.code == 1
captured = capsys.readouterr()
assert "ERROR: Clang-tidy configuration has changed" in captured.out
def test_main_default_mode(capsys: pytest.CaptureFixture[str]) -> None:
"""Test main function in default mode (no arguments)."""
current_hash = "abc123"
stored_hash = "def456"
with (
patch("sys.argv", ["clang_tidy_hash.py"]),
patch("clang_tidy_hash.calculate_clang_tidy_hash", return_value=current_hash),
patch("clang_tidy_hash.read_stored_hash", return_value=stored_hash),
):
clang_tidy_hash.main()
captured = capsys.readouterr()
assert f"Current hash: {current_hash}" in captured.out
assert f"Stored hash: {stored_hash}" in captured.out
assert "Match: False" in captured.out
def test_read_file_lines(tmp_path: Path) -> None:
"""Test read_file_lines helper function."""
test_file = tmp_path / "test.txt"
test_content = "line1\nline2\nline3\n"
test_file.write_text(test_content)
result = clang_tidy_hash.read_file_lines(test_file)
assert result == ["line1\n", "line2\n", "line3\n"]
def test_read_file_bytes(tmp_path: Path) -> None:
"""Test read_file_bytes helper function."""
test_file = tmp_path / "test.bin"
test_content = b"binary content\x00\xff"
test_file.write_bytes(test_content)
result = clang_tidy_hash.read_file_bytes(test_file)
assert result == test_content
def test_write_file_content(tmp_path: Path) -> None:
"""Test write_file_content helper function."""
test_file = tmp_path / "test.txt"
test_content = "test content"
clang_tidy_hash.write_file_content(test_file, test_content)
assert test_file.read_text() == test_content
@pytest.mark.parametrize(
("line", "expected"),
[
("clang-tidy==18.1.5", ("clang-tidy", "clang-tidy==18.1.5")),
(
"clang-tidy==18.1.5 # comment",
("clang-tidy", "clang-tidy==18.1.5 # comment"),
),
("some-package>=1.0,<2.0", ("some-package", "some-package>=1.0,<2.0")),
("pkg_with-dashes==1.0", ("pkg_with-dashes", "pkg_with-dashes==1.0")),
("# just a comment", None),
("", None),
(" ", None),
("invalid line without version", None),
],
)
def test_parse_requirement_line(line: str, expected: tuple[str, str] | None) -> None:
"""Test parsing individual requirement lines."""
result = clang_tidy_hash.parse_requirement_line(line)
assert result == expected

View File

@@ -0,0 +1,849 @@
"""Unit tests for script/helpers.py module."""
import json
import os
from pathlib import Path
import subprocess
import sys
from unittest.mock import Mock, patch
import pytest
from pytest import MonkeyPatch
# Add the script directory to Python path so we can import helpers
sys.path.insert(
0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "script"))
)
import helpers # noqa: E402
changed_files = helpers.changed_files
filter_changed = helpers.filter_changed
get_changed_components = helpers.get_changed_components
_get_changed_files_from_command = helpers._get_changed_files_from_command
_get_pr_number_from_github_env = helpers._get_pr_number_from_github_env
_get_changed_files_github_actions = helpers._get_changed_files_github_actions
_filter_changed_ci = helpers._filter_changed_ci
_filter_changed_local = helpers._filter_changed_local
build_all_include = helpers.build_all_include
print_file_list = helpers.print_file_list
@pytest.mark.parametrize(
("github_ref", "expected_pr_number"),
[
("refs/pull/1234/merge", "1234"),
("refs/pull/5678/head", "5678"),
("refs/pull/999/merge", "999"),
("refs/heads/main", None),
("", None),
],
)
def test_get_pr_number_from_github_env_ref(
monkeypatch: MonkeyPatch, github_ref: str, expected_pr_number: str | None
) -> None:
"""Test extracting PR number from GITHUB_REF."""
monkeypatch.setenv("GITHUB_REF", github_ref)
# Make sure GITHUB_EVENT_PATH is not set
monkeypatch.delenv("GITHUB_EVENT_PATH", raising=False)
result = _get_pr_number_from_github_env()
assert result == expected_pr_number
def test_get_pr_number_from_github_env_event_file(
monkeypatch: MonkeyPatch, tmp_path: Path
) -> None:
"""Test extracting PR number from GitHub event file."""
# No PR number in ref
monkeypatch.setenv("GITHUB_REF", "refs/heads/feature-branch")
event_file = tmp_path / "event.json"
event_data = {"pull_request": {"number": 5678}}
event_file.write_text(json.dumps(event_data))
monkeypatch.setenv("GITHUB_EVENT_PATH", str(event_file))
result = _get_pr_number_from_github_env()
assert result == "5678"
def test_get_pr_number_from_github_env_no_pr(
monkeypatch: MonkeyPatch, tmp_path: Path
) -> None:
"""Test when no PR number is available."""
monkeypatch.setenv("GITHUB_REF", "refs/heads/main")
event_file = tmp_path / "event.json"
event_data = {"push": {"head_commit": {"id": "abc123"}}}
event_file.write_text(json.dumps(event_data))
monkeypatch.setenv("GITHUB_EVENT_PATH", str(event_file))
result = _get_pr_number_from_github_env()
assert result is None
@pytest.mark.parametrize(
("github_ref", "expected_pr_number"),
[
("refs/pull/1234/merge", "1234"),
("refs/pull/5678/head", "5678"),
("refs/pull/999/merge", "999"),
],
)
def test_github_actions_pull_request_with_pr_number_in_ref(
monkeypatch: MonkeyPatch, github_ref: str, expected_pr_number: str
) -> None:
"""Test PR detection via GITHUB_REF."""
monkeypatch.setenv("GITHUB_ACTIONS", "true")
monkeypatch.setenv("GITHUB_EVENT_NAME", "pull_request")
monkeypatch.setenv("GITHUB_REF", github_ref)
expected_files = ["file1.py", "file2.cpp"]
with patch("helpers._get_changed_files_from_command") as mock_get:
mock_get.return_value = expected_files
result = changed_files()
mock_get.assert_called_once_with(
["gh", "pr", "diff", expected_pr_number, "--name-only"]
)
assert result == expected_files
def test_github_actions_pull_request_with_event_file(
monkeypatch: MonkeyPatch, tmp_path: Path
) -> None:
"""Test PR detection via GitHub event file."""
monkeypatch.setenv("GITHUB_ACTIONS", "true")
monkeypatch.setenv("GITHUB_EVENT_NAME", "pull_request")
monkeypatch.setenv("GITHUB_REF", "refs/heads/feature-branch")
event_file = tmp_path / "event.json"
event_data = {"pull_request": {"number": 5678}}
event_file.write_text(json.dumps(event_data))
monkeypatch.setenv("GITHUB_EVENT_PATH", str(event_file))
expected_files = ["file1.py", "file2.cpp"]
with patch("helpers._get_changed_files_from_command") as mock_get:
mock_get.return_value = expected_files
result = changed_files()
mock_get.assert_called_once_with(["gh", "pr", "diff", "5678", "--name-only"])
assert result == expected_files
def test_github_actions_push_event(monkeypatch: MonkeyPatch) -> None:
"""Test push event handling."""
monkeypatch.setenv("GITHUB_ACTIONS", "true")
monkeypatch.setenv("GITHUB_EVENT_NAME", "push")
expected_files = ["file1.py", "file2.cpp"]
with patch("helpers._get_changed_files_from_command") as mock_get:
mock_get.return_value = expected_files
result = changed_files()
mock_get.assert_called_once_with(["git", "diff", "HEAD~1..HEAD", "--name-only"])
assert result == expected_files
def test_get_changed_files_github_actions_pull_request(
monkeypatch: MonkeyPatch,
) -> None:
"""Test _get_changed_files_github_actions for pull request event."""
monkeypatch.setenv("GITHUB_EVENT_NAME", "pull_request")
expected_files = ["file1.py", "file2.cpp"]
with (
patch("helpers._get_pr_number_from_github_env", return_value="1234"),
patch("helpers._get_changed_files_from_command") as mock_get,
):
mock_get.return_value = expected_files
result = _get_changed_files_github_actions()
mock_get.assert_called_once_with(["gh", "pr", "diff", "1234", "--name-only"])
assert result == expected_files
def test_get_changed_files_github_actions_pull_request_no_pr_number(
monkeypatch: MonkeyPatch,
) -> None:
"""Test _get_changed_files_github_actions when no PR number is found."""
monkeypatch.setenv("GITHUB_EVENT_NAME", "pull_request")
with patch("helpers._get_pr_number_from_github_env", return_value=None):
result = _get_changed_files_github_actions()
assert result is None
def test_get_changed_files_github_actions_push(monkeypatch: MonkeyPatch) -> None:
"""Test _get_changed_files_github_actions for push event."""
monkeypatch.setenv("GITHUB_EVENT_NAME", "push")
expected_files = ["file1.py", "file2.cpp"]
with patch("helpers._get_changed_files_from_command") as mock_get:
mock_get.return_value = expected_files
result = _get_changed_files_github_actions()
mock_get.assert_called_once_with(["git", "diff", "HEAD~1..HEAD", "--name-only"])
assert result == expected_files
def test_get_changed_files_github_actions_push_fallback(
monkeypatch: MonkeyPatch,
) -> None:
"""Test _get_changed_files_github_actions fallback for push event."""
monkeypatch.setenv("GITHUB_EVENT_NAME", "push")
with patch("helpers._get_changed_files_from_command") as mock_get:
mock_get.side_effect = Exception("Failed")
result = _get_changed_files_github_actions()
assert result is None
def test_get_changed_files_github_actions_other_event(monkeypatch: MonkeyPatch) -> None:
"""Test _get_changed_files_github_actions for other event types."""
monkeypatch.setenv("GITHUB_EVENT_NAME", "workflow_dispatch")
result = _get_changed_files_github_actions()
assert result is None
def test_github_actions_push_event_fallback(monkeypatch: MonkeyPatch) -> None:
"""Test push event fallback to git merge-base."""
monkeypatch.setenv("GITHUB_ACTIONS", "true")
monkeypatch.setenv("GITHUB_EVENT_NAME", "push")
expected_files = ["file1.py", "file2.cpp"]
with (
patch("helpers._get_changed_files_from_command") as mock_get,
patch("helpers.get_output") as mock_output,
):
# First call fails, triggering fallback
mock_get.side_effect = [
Exception("Failed"),
expected_files,
]
mock_output.side_effect = [
"origin\nupstream\n", # git remote
"abc123\n", # merge base
]
result = changed_files()
assert mock_get.call_count == 2
assert result == expected_files
@pytest.mark.parametrize(
("branch", "merge_base"),
[
(None, "abc123"), # Default branch (dev)
("release", "def456"),
("beta", "ghi789"),
],
)
def test_local_development_branches(
monkeypatch: MonkeyPatch, branch: str | None, merge_base: str
) -> None:
"""Test local development with different branches."""
monkeypatch.delenv("GITHUB_ACTIONS", raising=False)
expected_files = ["file1.py", "file2.cpp"]
with (
patch("helpers.get_output") as mock_output,
patch("helpers._get_changed_files_from_command") as mock_get,
):
if branch is None:
# For default branch, helpers.get_output is called twice (git remote and merge-base)
mock_output.side_effect = [
"origin\nupstream\n", # git remote
f"{merge_base}\n", # merge base for upstream/dev
]
else:
# For custom branch, may need more calls if trying multiple remotes
mock_output.side_effect = [
"origin\nupstream\n", # git remote
Exception("not found"), # upstream/{branch} may fail
f"{merge_base}\n", # merge base for origin/{branch}
]
mock_get.return_value = expected_files
result = changed_files(branch)
mock_get.assert_called_once_with(["git", "diff", merge_base, "--name-only"])
assert result == expected_files
def test_local_development_no_remotes_configured(monkeypatch: MonkeyPatch) -> None:
"""Test error when no git remotes are configured."""
monkeypatch.delenv("GITHUB_ACTIONS", raising=False)
with patch("helpers.get_output") as mock_output:
# The function calls get_output multiple times:
# 1. First to get list of remotes: git remote
# 2. Then for each remote it tries: git merge-base
# We simulate having some remotes but all merge-base attempts fail
def side_effect_func(*args):
if args == ("git", "remote"):
return "origin\nupstream\n"
else:
# All merge-base attempts fail
raise Exception("Command failed")
mock_output.side_effect = side_effect_func
with pytest.raises(ValueError, match="Git not configured"):
changed_files()
@pytest.mark.parametrize(
("stdout", "expected"),
[
("file1.py\nfile2.cpp\n\n", ["file1.py", "file2.cpp"]),
("\n\n", []),
("single.py\n", ["single.py"]),
(
"path/to/file.cpp\nanother/path.h\n",
["another/path.h", "path/to/file.cpp"],
), # Sorted
],
)
def test_get_changed_files_from_command_successful(
stdout: str, expected: list[str]
) -> None:
"""Test successful command execution with various outputs."""
mock_result = Mock()
mock_result.returncode = 0
mock_result.stdout = stdout
with patch("subprocess.run", return_value=mock_result):
result = _get_changed_files_from_command(["git", "diff"])
# Normalize paths to forward slashes for comparison
# since os.path.relpath returns OS-specific separators
normalized_result = [f.replace(os.sep, "/") for f in result]
assert normalized_result == expected
@pytest.mark.parametrize(
("returncode", "stderr"),
[
(1, "Error: command failed"),
(128, "fatal: not a git repository"),
(2, "Unknown error"),
],
)
def test_get_changed_files_from_command_failed(returncode: int, stderr: str) -> None:
"""Test command failure handling."""
mock_result = Mock()
mock_result.returncode = returncode
mock_result.stderr = stderr
with patch("subprocess.run", return_value=mock_result):
with pytest.raises(Exception) as exc_info:
_get_changed_files_from_command(["git", "diff"])
assert "Command failed" in str(exc_info.value)
assert stderr in str(exc_info.value)
def test_get_changed_files_from_command_relative_paths() -> None:
"""Test that paths are made relative to current directory."""
mock_result = Mock()
mock_result.returncode = 0
mock_result.stdout = "/some/project/file1.py\n/some/project/sub/file2.cpp\n"
with (
patch("subprocess.run", return_value=mock_result),
patch(
"os.path.relpath", side_effect=["file1.py", "sub/file2.cpp"]
) as mock_relpath,
patch("os.getcwd", return_value="/some/project"),
):
result = _get_changed_files_from_command(["git", "diff"])
# Check relpath was called with correct arguments
assert mock_relpath.call_count == 2
assert result == ["file1.py", "sub/file2.cpp"]
@pytest.mark.parametrize(
"changed_files_list",
[
["esphome/core/component.h", "esphome/components/wifi/wifi.cpp"],
["esphome/core/helpers.cpp"],
["esphome/core/application.h", "esphome/core/defines.h"],
],
)
def test_get_changed_components_core_cpp_files_trigger_full_scan(
changed_files_list: list[str],
) -> None:
"""Test that core C++/header file changes trigger full scan without calling subprocess."""
with patch("helpers.changed_files") as mock_changed:
mock_changed.return_value = changed_files_list
# Should return None without calling subprocess
result = get_changed_components()
assert result is None
def test_get_changed_components_core_python_files_no_full_scan() -> None:
"""Test that core Python file changes do NOT trigger full scan."""
changed_files_list = [
"esphome/core/__init__.py",
"esphome/core/config.py",
"esphome/components/wifi/wifi.cpp",
]
with patch("helpers.changed_files") as mock_changed:
mock_changed.return_value = changed_files_list
mock_result = Mock()
mock_result.stdout = "wifi\n"
with patch("subprocess.run", return_value=mock_result):
result = get_changed_components()
# Should NOT return None - should call list-components.py
assert result == ["wifi"]
def test_get_changed_components_mixed_core_files_with_cpp() -> None:
"""Test that mixed Python and C++ core files still trigger full scan due to C++ file."""
changed_files_list = [
"esphome/core/__init__.py",
"esphome/core/config.py",
"esphome/core/helpers.cpp", # This C++ file should trigger full scan
"esphome/components/wifi/wifi.cpp",
]
with patch("helpers.changed_files") as mock_changed:
mock_changed.return_value = changed_files_list
# Should return None without calling subprocess due to helpers.cpp
result = get_changed_components()
assert result is None
@pytest.mark.parametrize(
("changed_files_list", "expected"),
[
# Only component files changed
(
["esphome/components/wifi/wifi.cpp", "esphome/components/api/api.cpp"],
["wifi", "api"],
),
# Non-component files only
(["README.md", "script/clang-tidy"], []),
# Single component
(["esphome/components/mqtt/mqtt_client.cpp"], ["mqtt"]),
],
)
def test_get_changed_components_returns_component_list(
changed_files_list: list[str], expected: list[str]
) -> None:
"""Test component detection returns correct component list."""
with patch("helpers.changed_files") as mock_changed:
mock_changed.return_value = changed_files_list
mock_result = Mock()
mock_result.stdout = "\n".join(expected) + "\n" if expected else "\n"
with patch("subprocess.run", return_value=mock_result):
result = get_changed_components()
assert result == expected
def test_get_changed_components_script_failure() -> None:
"""Test fallback to full scan when script fails."""
with patch("helpers.changed_files") as mock_changed:
mock_changed.return_value = ["esphome/components/wifi/wifi_component.cpp"]
with patch("subprocess.run") as mock_run:
mock_run.side_effect = subprocess.CalledProcessError(1, "cmd")
result = get_changed_components()
assert result is None # None means full scan
@pytest.mark.parametrize(
("components", "all_files", "expected_files"),
[
# Core C++/header files changed (full scan)
(
None,
["esphome/components/wifi/wifi.cpp", "esphome/core/helpers.cpp"],
["esphome/components/wifi/wifi.cpp", "esphome/core/helpers.cpp"],
),
# Specific components
(
["wifi", "api"],
[
"esphome/components/wifi/wifi.cpp",
"esphome/components/wifi/wifi.h",
"esphome/components/api/api.cpp",
"esphome/components/mqtt/mqtt.cpp",
],
[
"esphome/components/wifi/wifi.cpp",
"esphome/components/wifi/wifi.h",
"esphome/components/api/api.cpp",
],
),
# No components changed
(
[],
["esphome/components/wifi/wifi.cpp", "script/clang-tidy"],
["script/clang-tidy"], # Only non-component changed files
),
],
)
def test_filter_changed_ci_mode(
monkeypatch: MonkeyPatch,
components: list[str] | None,
all_files: list[str],
expected_files: list[str],
) -> None:
"""Test filter_changed in CI mode with different component scenarios."""
monkeypatch.setenv("GITHUB_ACTIONS", "true")
with patch("helpers.get_changed_components") as mock_components:
mock_components.return_value = components
if components == []:
# No components changed scenario needs changed_files mock
with patch("helpers.changed_files") as mock_changed:
mock_changed.return_value = ["script/clang-tidy", "README.md"]
result = filter_changed(all_files)
else:
result = filter_changed(all_files)
assert set(result) == set(expected_files)
def test_filter_changed_local_mode(monkeypatch: MonkeyPatch) -> None:
"""Test filter_changed in local mode filters files directly."""
monkeypatch.delenv("GITHUB_ACTIONS", raising=False)
all_files = [
"esphome/components/wifi/wifi.cpp",
"esphome/components/api/api.cpp",
"esphome/core/helpers.cpp",
]
with patch("helpers.changed_files") as mock_changed:
mock_changed.return_value = [
"esphome/components/wifi/wifi.cpp",
"esphome/core/helpers.cpp",
]
result = filter_changed(all_files)
# Should only include files that actually changed
expected = ["esphome/components/wifi/wifi.cpp", "esphome/core/helpers.cpp"]
assert set(result) == set(expected)
def test_filter_changed_component_path_parsing(monkeypatch: MonkeyPatch) -> None:
"""Test correct parsing of component paths."""
monkeypatch.setenv("GITHUB_ACTIONS", "true")
all_files = [
"esphome/components/wifi/wifi_component.cpp",
"esphome/components/wifi_info/wifi_info_text_sensor.cpp", # Different component
"esphome/components/api/api_server.cpp",
"esphome/components/api/custom_api_device.h",
]
with patch("helpers.get_changed_components") as mock_components:
mock_components.return_value = ["wifi"] # Only wifi, not wifi_info
result = filter_changed(all_files)
# Should only include files from wifi component, not wifi_info
expected = ["esphome/components/wifi/wifi_component.cpp"]
assert result == expected
def test_filter_changed_prints_output(
monkeypatch: MonkeyPatch, capsys: pytest.CaptureFixture[str]
) -> None:
"""Test that appropriate messages are printed."""
monkeypatch.setenv("GITHUB_ACTIONS", "true")
all_files = ["esphome/components/wifi/wifi_component.cpp"]
with patch("helpers.get_changed_components") as mock_components:
mock_components.return_value = ["wifi"]
filter_changed(all_files)
# Check that output was produced (not checking exact messages)
captured = capsys.readouterr()
assert len(captured.out) > 0
@pytest.mark.parametrize(
("files", "expected_empty"),
[
([], True),
(["file.cpp"], False),
],
ids=["empty_files", "non_empty_files"],
)
def test_filter_changed_empty_file_handling(
monkeypatch: MonkeyPatch, files: list[str], expected_empty: bool
) -> None:
"""Test handling of empty file lists."""
monkeypatch.setenv("GITHUB_ACTIONS", "true")
with patch("helpers.get_changed_components") as mock_components:
mock_components.return_value = ["wifi"]
result = filter_changed(files)
# Both cases should be empty:
# - Empty files list -> empty result
# - file.cpp doesn't match esphome/components/wifi/* pattern -> filtered out
assert len(result) == 0
def test_filter_changed_ci_full_scan() -> None:
"""Test _filter_changed_ci when core C++/header files changed (full scan)."""
all_files = ["esphome/components/wifi/wifi.cpp", "esphome/core/helpers.cpp"]
with patch("helpers.get_changed_components", return_value=None):
result = _filter_changed_ci(all_files)
# Should return all files for full scan
assert result == all_files
def test_filter_changed_ci_no_components_changed() -> None:
"""Test _filter_changed_ci when no components changed."""
all_files = ["esphome/components/wifi/wifi.cpp", "script/clang-tidy", "README.md"]
with (
patch("helpers.get_changed_components", return_value=[]),
patch("helpers.changed_files", return_value=["script/clang-tidy", "README.md"]),
):
result = _filter_changed_ci(all_files)
# Should only include non-component files that changed
assert set(result) == {"script/clang-tidy", "README.md"}
def test_filter_changed_ci_specific_components() -> None:
"""Test _filter_changed_ci with specific components changed."""
all_files = [
"esphome/components/wifi/wifi.cpp",
"esphome/components/wifi/wifi.h",
"esphome/components/api/api.cpp",
"esphome/components/mqtt/mqtt.cpp",
]
with patch("helpers.get_changed_components", return_value=["wifi", "api"]):
result = _filter_changed_ci(all_files)
# Should include all files from wifi and api components
expected = [
"esphome/components/wifi/wifi.cpp",
"esphome/components/wifi/wifi.h",
"esphome/components/api/api.cpp",
]
assert set(result) == set(expected)
def test_filter_changed_local() -> None:
"""Test _filter_changed_local filters based on git changes."""
all_files = [
"esphome/components/wifi/wifi.cpp",
"esphome/components/api/api.cpp",
"esphome/core/helpers.cpp",
]
with patch("helpers.changed_files") as mock_changed:
mock_changed.return_value = [
"esphome/components/wifi/wifi.cpp",
"esphome/core/helpers.cpp",
]
result = _filter_changed_local(all_files)
# Should only include files that actually changed
expected = ["esphome/components/wifi/wifi.cpp", "esphome/core/helpers.cpp"]
assert set(result) == set(expected)
def test_build_all_include_with_git(tmp_path: Path) -> None:
"""Test build_all_include using git ls-files."""
# Mock git output
git_output = "esphome/core/component.h\nesphome/components/wifi/wifi.h\nesphome/components/api/api.h\n"
mock_proc = Mock()
mock_proc.returncode = 0
mock_proc.stdout = git_output
with (
patch("subprocess.run", return_value=mock_proc),
patch("helpers.temp_header_file", str(tmp_path / "all-include.cpp")),
):
build_all_include()
# Check the generated file
include_file = tmp_path / "all-include.cpp"
assert include_file.exists()
content = include_file.read_text()
expected_lines = [
'#include "esphome/components/api/api.h"',
'#include "esphome/components/wifi/wifi.h"',
'#include "esphome/core/component.h"',
"", # Empty line at end
]
assert content == "\n".join(expected_lines)
def test_build_all_include_empty_output(tmp_path: Path) -> None:
"""Test build_all_include with empty git output."""
# Mock git returning empty output
mock_proc = Mock()
mock_proc.returncode = 0
mock_proc.stdout = ""
with (
patch("subprocess.run", return_value=mock_proc),
patch("helpers.temp_header_file", str(tmp_path / "all-include.cpp")),
):
build_all_include()
# Check the generated file
include_file = tmp_path / "all-include.cpp"
assert include_file.exists()
content = include_file.read_text()
# When git output is empty, the list comprehension filters out empty strings,
# then we append "" to get [""], which joins to just ""
assert content == ""
def test_build_all_include_creates_directory(tmp_path: Path) -> None:
"""Test that build_all_include creates the temp directory if needed."""
# Use a subdirectory that doesn't exist
temp_file = tmp_path / "subdir" / "all-include.cpp"
mock_proc = Mock()
mock_proc.returncode = 0
mock_proc.stdout = "esphome/core/test.h\n"
with (
patch("subprocess.run", return_value=mock_proc),
patch("helpers.temp_header_file", str(temp_file)),
):
build_all_include()
# Check that directory was created
assert temp_file.parent.exists()
assert temp_file.exists()
def test_print_file_list_empty(capsys: pytest.CaptureFixture[str]) -> None:
"""Test printing an empty file list."""
print_file_list([], "Test Files:")
captured = capsys.readouterr()
assert "Test Files:" in captured.out
assert "No files to check!" in captured.out
def test_print_file_list_small(capsys: pytest.CaptureFixture[str]) -> None:
"""Test printing a small list of files (less than max_files)."""
files = ["file1.cpp", "file2.cpp", "file3.cpp"]
print_file_list(files, "Test Files:", max_files=20)
captured = capsys.readouterr()
assert "Test Files:" in captured.out
assert " file1.cpp" in captured.out
assert " file2.cpp" in captured.out
assert " file3.cpp" in captured.out
assert "... and" not in captured.out
def test_print_file_list_exact_max_files(capsys: pytest.CaptureFixture[str]) -> None:
"""Test printing exactly max_files number of files."""
files = [f"file{i}.cpp" for i in range(20)]
print_file_list(files, "Test Files:", max_files=20)
captured = capsys.readouterr()
# All files should be shown
for i in range(20):
assert f" file{i}.cpp" in captured.out
assert "... and" not in captured.out
def test_print_file_list_large(capsys: pytest.CaptureFixture[str]) -> None:
"""Test printing a large list of files (more than max_files)."""
files = [f"file{i:03d}.cpp" for i in range(50)]
print_file_list(files, "Test Files:", max_files=20)
captured = capsys.readouterr()
assert "Test Files:" in captured.out
# First 10 files should be shown (sorted)
for i in range(10):
assert f" file{i:03d}.cpp" in captured.out
# Files 10-49 should not be shown
assert " file010.cpp" not in captured.out
assert " file049.cpp" not in captured.out
# Should show count of remaining files
assert "... and 40 more files" in captured.out
def test_print_file_list_unsorted(capsys: pytest.CaptureFixture[str]) -> None:
"""Test that files are sorted before printing."""
files = ["z_file.cpp", "a_file.cpp", "m_file.cpp"]
print_file_list(files, "Test Files:", max_files=20)
captured = capsys.readouterr()
lines = captured.out.strip().split("\n")
# Check order in output
assert lines[1] == " a_file.cpp"
assert lines[2] == " m_file.cpp"
assert lines[3] == " z_file.cpp"
def test_print_file_list_custom_max_files(capsys: pytest.CaptureFixture[str]) -> None:
"""Test with custom max_files parameter."""
files = [f"file{i}.cpp" for i in range(15)]
print_file_list(files, "Test Files:", max_files=10)
captured = capsys.readouterr()
# Should truncate after 10 files
assert "... and 5 more files" in captured.out
def test_print_file_list_default_title(capsys: pytest.CaptureFixture[str]) -> None:
"""Test with default title."""
print_file_list(["test.cpp"])
captured = capsys.readouterr()
assert "Files:" in captured.out
assert " test.cpp" in captured.out