1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-30 06:33:51 +00:00

Merge branch 'dev' into runtime_stats

This commit is contained in:
J. Nick Koston
2025-07-12 07:21:44 -10:00
committed by GitHub
118 changed files with 5866 additions and 1926 deletions

View File

@@ -0,0 +1,12 @@
i2c:
- id: i2c_gl_r01_i2c
scl: ${scl_pin}
sda: ${sda_pin}
sensor:
- platform: gl_r01_i2c
id: tof
name: "ToF sensor"
i2c_id: i2c_gl_r01_i2c
address: 0x74
update_interval: 15s

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO16
sda_pin: GPIO17
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO5
sda_pin: GPIO4
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO5
sda_pin: GPIO4
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO16
sda_pin: GPIO17
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO5
sda_pin: GPIO4
<<: !include common.yaml

View File

@@ -0,0 +1,5 @@
substitutions:
scl_pin: GPIO5
sda_pin: GPIO4
<<: !include common.yaml

View File

@@ -0,0 +1,8 @@
sensor:
- platform: lps22
address: 0x5d
update_interval: 10s
temperature:
name: "LPS22 Temperature"
pressure:
name: "LPS22 Pressure"

View File

@@ -0,0 +1,6 @@
i2c:
- id: i2c_lps22
scl: 16
sda: 17
<<: !include common.yaml

View File

@@ -0,0 +1,6 @@
i2c:
- id: i2c_lps22
scl: 5
sda: 4
<<: !include common.yaml

View File

@@ -0,0 +1,6 @@
i2c:
- id: i2c_lps22
scl: 5
sda: 4
<<: !include common.yaml

View File

@@ -0,0 +1,6 @@
i2c:
- id: i2c_lps22
scl: 16
sda: 17
<<: !include common.yaml

View File

@@ -0,0 +1,6 @@
i2c:
- id: i2c_lps22
scl: 5
sda: 4
<<: !include common.yaml

View File

@@ -0,0 +1,6 @@
i2c:
- id: i2c_lps22
scl: 5
sda: 4
<<: !include common.yaml

View File

@@ -5,12 +5,14 @@ from __future__ import annotations
import asyncio
from collections.abc import AsyncGenerator, Callable, Generator
from contextlib import AbstractAsyncContextManager, asynccontextmanager
import fcntl
import logging
import os
from pathlib import Path
import platform
import signal
import socket
import subprocess
import sys
import tempfile
from typing import TextIO
@@ -50,6 +52,66 @@ if platform.system() == "Windows":
import pty # not available on Windows
def _get_platformio_env(cache_dir: Path) -> dict[str, str]:
"""Get environment variables for PlatformIO with shared cache."""
env = os.environ.copy()
env["PLATFORMIO_CORE_DIR"] = str(cache_dir)
env["PLATFORMIO_CACHE_DIR"] = str(cache_dir / ".cache")
env["PLATFORMIO_LIBDEPS_DIR"] = str(cache_dir / "libdeps")
return env
@pytest.fixture(scope="session")
def shared_platformio_cache() -> Generator[Path]:
"""Initialize a shared PlatformIO cache for all integration tests."""
# Use a dedicated directory for integration tests to avoid conflicts
test_cache_dir = Path.home() / ".esphome-integration-tests"
cache_dir = test_cache_dir / "platformio"
# Use a lock file in the home directory to ensure only one process initializes the cache
# This is needed when running with pytest-xdist
# The lock file must be in a directory that already exists to avoid race conditions
lock_file = Path.home() / ".esphome-integration-tests-init.lock"
# Always acquire the lock to ensure cache is ready before proceeding
with open(lock_file, "w") as lock_fd:
fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX)
# Check if cache needs initialization while holding the lock
if not cache_dir.exists() or not any(cache_dir.iterdir()):
# Create the test cache directory if it doesn't exist
test_cache_dir.mkdir(exist_ok=True)
with tempfile.TemporaryDirectory() as tmpdir:
# Create a basic host config
init_dir = Path(tmpdir)
config_path = init_dir / "cache_init.yaml"
config_path.write_text("""esphome:
name: cache-init
host:
api:
encryption:
key: "IIevImVI42I0FGos5nLqFK91jrJehrgidI0ArwMLr8w="
logger:
""")
# Run compilation to populate the cache
# We must succeed here to avoid race conditions where multiple
# tests try to populate the same cache directory simultaneously
env = _get_platformio_env(cache_dir)
subprocess.run(
["esphome", "compile", str(config_path)],
check=True,
cwd=init_dir,
env=env,
)
# Lock is held until here, ensuring cache is fully populated before any test proceeds
yield cache_dir
@pytest.fixture(scope="module", autouse=True)
def enable_aioesphomeapi_debug_logging():
"""Enable debug logging for aioesphomeapi to help diagnose connection issues."""
@@ -161,22 +223,14 @@ async def write_yaml_config(
@pytest_asyncio.fixture
async def compile_esphome(
integration_test_dir: Path,
shared_platformio_cache: Path,
) -> AsyncGenerator[CompileFunction]:
"""Compile an ESPHome configuration and return the binary path."""
async def _compile(config_path: Path) -> Path:
# Create a unique PlatformIO directory for this test to avoid race conditions
platformio_dir = integration_test_dir / ".platformio"
platformio_dir.mkdir(parents=True, exist_ok=True)
# Create cache directory as well
platformio_cache_dir = platformio_dir / ".cache"
platformio_cache_dir.mkdir(parents=True, exist_ok=True)
# Set up environment with isolated PlatformIO directories
env = os.environ.copy()
env["PLATFORMIO_CORE_DIR"] = str(platformio_dir)
env["PLATFORMIO_CACHE_DIR"] = str(platformio_cache_dir)
# Use the shared PlatformIO cache for faster compilation
# This avoids re-downloading dependencies for each test
env = _get_platformio_env(shared_platformio_cache)
# Retry compilation up to 3 times if we get a segfault
max_retries = 3

View File

@@ -0,0 +1,24 @@
esphome:
name: api-custom-services-test
host:
# This is required for CustomAPIDevice to work
api:
custom_services: true
# Also test that YAML services still work
actions:
- action: test_yaml_service
then:
- logger.log: "YAML service called"
logger:
level: DEBUG
# External component that uses CustomAPIDevice
external_components:
- source:
type: local
path: EXTERNAL_COMPONENT_PATH
components: [custom_api_device_component]
custom_api_device_component:

View File

@@ -0,0 +1,19 @@
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import CONF_ID
custom_api_device_component_ns = cg.esphome_ns.namespace("custom_api_device_component")
CustomAPIDeviceComponent = custom_api_device_component_ns.class_(
"CustomAPIDeviceComponent", cg.Component
)
CONFIG_SCHEMA = cv.Schema(
{
cv.GenerateID(): cv.declare_id(CustomAPIDeviceComponent),
}
).extend(cv.COMPONENT_SCHEMA)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await cg.register_component(var, config)

View File

@@ -0,0 +1,53 @@
#include "custom_api_device_component.h"
#include "esphome/core/log.h"
#ifdef USE_API
namespace esphome {
namespace custom_api_device_component {
static const char *const TAG = "custom_api";
void CustomAPIDeviceComponent::setup() {
// Register services using CustomAPIDevice
register_service(&CustomAPIDeviceComponent::on_test_service, "custom_test_service");
register_service(&CustomAPIDeviceComponent::on_service_with_args, "custom_service_with_args",
{"arg_string", "arg_int", "arg_bool", "arg_float"});
// Test array types
register_service(&CustomAPIDeviceComponent::on_service_with_arrays, "custom_service_with_arrays",
{"bool_array", "int_array", "float_array", "string_array"});
}
void CustomAPIDeviceComponent::on_test_service() { ESP_LOGI(TAG, "Custom test service called!"); }
// NOLINTNEXTLINE(performance-unnecessary-value-param)
void CustomAPIDeviceComponent::on_service_with_args(std::string arg_string, int32_t arg_int, bool arg_bool,
float arg_float) {
ESP_LOGI(TAG, "Custom service called with: %s, %d, %d, %.2f", arg_string.c_str(), arg_int, arg_bool, arg_float);
}
void CustomAPIDeviceComponent::on_service_with_arrays(std::vector<bool> bool_array, std::vector<int32_t> int_array,
std::vector<float> float_array,
std::vector<std::string> string_array) {
ESP_LOGI(TAG, "Array service called with %zu bools, %zu ints, %zu floats, %zu strings", bool_array.size(),
int_array.size(), float_array.size(), string_array.size());
// Log first element of each array if not empty
if (!bool_array.empty()) {
ESP_LOGI(TAG, "First bool: %s", bool_array[0] ? "true" : "false");
}
if (!int_array.empty()) {
ESP_LOGI(TAG, "First int: %d", int_array[0]);
}
if (!float_array.empty()) {
ESP_LOGI(TAG, "First float: %.2f", float_array[0]);
}
if (!string_array.empty()) {
ESP_LOGI(TAG, "First string: %s", string_array[0].c_str());
}
}
} // namespace custom_api_device_component
} // namespace esphome
#endif // USE_API

View File

@@ -0,0 +1,29 @@
#pragma once
#include <string>
#include <vector>
#include "esphome/core/component.h"
#include "esphome/components/api/custom_api_device.h"
#ifdef USE_API
namespace esphome {
namespace custom_api_device_component {
using namespace api;
class CustomAPIDeviceComponent : public Component, public CustomAPIDevice {
public:
void setup() override;
void on_test_service();
// NOLINTNEXTLINE(performance-unnecessary-value-param)
void on_service_with_args(std::string arg_string, int32_t arg_int, bool arg_bool, float arg_float);
void on_service_with_arrays(std::vector<bool> bool_array, std::vector<int32_t> int_array,
std::vector<float> float_array, std::vector<std::string> string_array);
};
} // namespace custom_api_device_component
} // namespace esphome
#endif // USE_API

View File

@@ -23,19 +23,6 @@ void SchedulerStringLifetimeComponent::run_string_lifetime_test() {
test_vector_reallocation();
test_string_move_semantics();
test_lambda_capture_lifetime();
// Schedule final check
this->set_timeout("final_check", 200, [this]() {
ESP_LOGI(TAG, "Tests passed: %d", this->tests_passed_);
ESP_LOGI(TAG, "Tests failed: %d", this->tests_failed_);
if (this->tests_failed_ == 0) {
ESP_LOGI(TAG, "SUCCESS: All string lifetime tests passed!");
} else {
ESP_LOGE(TAG, "FAILURE: %d string lifetime tests failed!", this->tests_failed_);
}
ESP_LOGI(TAG, "String lifetime tests complete");
});
}
void SchedulerStringLifetimeComponent::run_test1() {
@@ -69,7 +56,6 @@ void SchedulerStringLifetimeComponent::run_test5() {
}
void SchedulerStringLifetimeComponent::run_final_check() {
ESP_LOGI(TAG, "String lifetime tests complete");
ESP_LOGI(TAG, "Tests passed: %d", this->tests_passed_);
ESP_LOGI(TAG, "Tests failed: %d", this->tests_failed_);
@@ -78,6 +64,7 @@ void SchedulerStringLifetimeComponent::run_final_check() {
} else {
ESP_LOGE(TAG, "FAILURE: %d string lifetime tests failed!", this->tests_failed_);
}
ESP_LOGI(TAG, "String lifetime tests complete");
}
void SchedulerStringLifetimeComponent::test_temporary_string_lifetime() {

View File

@@ -0,0 +1,43 @@
esphome:
name: scheduler-null-name
host:
logger:
level: DEBUG
api:
services:
- service: test_null_name
then:
- lambda: |-
// First, create a scenario that would trigger the crash
// The crash happens when defer() is called with a name that would be cancelled
// Test 1: Create a defer with a valid name
App.scheduler.set_timeout(nullptr, "test_defer", 0, []() {
ESP_LOGI("TEST", "First defer should be cancelled");
});
// Test 2: Create another defer with the same name - this triggers cancel_item_locked_
// In the unfixed code, this would crash if the name was NULL
App.scheduler.set_timeout(nullptr, "test_defer", 0, []() {
ESP_LOGI("TEST", "Second defer executed");
});
// Test 3: Now test with nullptr - this is the actual crash scenario
// Create a defer item without a name (like voice assistant does)
const char* null_name = nullptr;
App.scheduler.set_timeout(nullptr, null_name, 0, []() {
ESP_LOGI("TEST", "Defer with null name executed");
});
// Test 4: Create another defer with null name - this would trigger the crash
App.scheduler.set_timeout(nullptr, null_name, 0, []() {
ESP_LOGI("TEST", "Second null defer executed");
});
// Test 5: Verify scheduler still works
App.scheduler.set_timeout(nullptr, "valid_timeout", 50, []() {
ESP_LOGI("TEST", "Test completed successfully");
});

View File

@@ -0,0 +1,144 @@
"""Integration test for API custom services using CustomAPIDevice."""
from __future__ import annotations
import asyncio
from pathlib import Path
import re
from aioesphomeapi import UserService, UserServiceArgType
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_api_custom_services(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test CustomAPIDevice services work correctly with custom_services: true."""
# Get the path to the external components directory
external_components_path = str(
Path(__file__).parent / "fixtures" / "external_components"
)
# Replace the placeholder in the YAML config with the actual path
yaml_config = yaml_config.replace(
"EXTERNAL_COMPONENT_PATH", external_components_path
)
loop = asyncio.get_running_loop()
# Track log messages
yaml_service_future = loop.create_future()
custom_service_future = loop.create_future()
custom_args_future = loop.create_future()
custom_arrays_future = loop.create_future()
# Patterns to match in logs
yaml_service_pattern = re.compile(r"YAML service called")
custom_service_pattern = re.compile(r"Custom test service called!")
custom_args_pattern = re.compile(
r"Custom service called with: test_string, 456, 1, 78\.90"
)
custom_arrays_pattern = re.compile(
r"Array service called with 2 bools, 3 ints, 2 floats, 2 strings"
)
def check_output(line: str) -> None:
"""Check log output for expected messages."""
if not yaml_service_future.done() and yaml_service_pattern.search(line):
yaml_service_future.set_result(True)
elif not custom_service_future.done() and custom_service_pattern.search(line):
custom_service_future.set_result(True)
elif not custom_args_future.done() and custom_args_pattern.search(line):
custom_args_future.set_result(True)
elif not custom_arrays_future.done() and custom_arrays_pattern.search(line):
custom_arrays_future.set_result(True)
# Run with log monitoring
async with run_compiled(yaml_config, line_callback=check_output):
async with api_client_connected() as client:
# Verify device info
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "api-custom-services-test"
# List services
_, services = await client.list_entities_services()
# Should have 4 services: 1 YAML + 3 CustomAPIDevice
assert len(services) == 4, f"Expected 4 services, found {len(services)}"
# Find our services
yaml_service: UserService | None = None
custom_service: UserService | None = None
custom_args_service: UserService | None = None
custom_arrays_service: UserService | None = None
for service in services:
if service.name == "test_yaml_service":
yaml_service = service
elif service.name == "custom_test_service":
custom_service = service
elif service.name == "custom_service_with_args":
custom_args_service = service
elif service.name == "custom_service_with_arrays":
custom_arrays_service = service
assert yaml_service is not None, "test_yaml_service not found"
assert custom_service is not None, "custom_test_service not found"
assert custom_args_service is not None, "custom_service_with_args not found"
assert custom_arrays_service is not None, (
"custom_service_with_arrays not found"
)
# Test YAML service
client.execute_service(yaml_service, {})
await asyncio.wait_for(yaml_service_future, timeout=5.0)
# Test simple CustomAPIDevice service
client.execute_service(custom_service, {})
await asyncio.wait_for(custom_service_future, timeout=5.0)
# Verify custom_args_service arguments
assert len(custom_args_service.args) == 4
arg_types = {arg.name: arg.type for arg in custom_args_service.args}
assert arg_types["arg_string"] == UserServiceArgType.STRING
assert arg_types["arg_int"] == UserServiceArgType.INT
assert arg_types["arg_bool"] == UserServiceArgType.BOOL
assert arg_types["arg_float"] == UserServiceArgType.FLOAT
# Test CustomAPIDevice service with arguments
client.execute_service(
custom_args_service,
{
"arg_string": "test_string",
"arg_int": 456,
"arg_bool": True,
"arg_float": 78.9,
},
)
await asyncio.wait_for(custom_args_future, timeout=5.0)
# Verify array service arguments
assert len(custom_arrays_service.args) == 4
array_arg_types = {arg.name: arg.type for arg in custom_arrays_service.args}
assert array_arg_types["bool_array"] == UserServiceArgType.BOOL_ARRAY
assert array_arg_types["int_array"] == UserServiceArgType.INT_ARRAY
assert array_arg_types["float_array"] == UserServiceArgType.FLOAT_ARRAY
assert array_arg_types["string_array"] == UserServiceArgType.STRING_ARRAY
# Test CustomAPIDevice service with arrays
client.execute_service(
custom_arrays_service,
{
"bool_array": [True, False],
"int_array": [1, 2, 3],
"float_array": [1.1, 2.2],
"string_array": ["hello", "world"],
},
)
await asyncio.wait_for(custom_arrays_future, timeout=5.0)

View File

@@ -0,0 +1,59 @@
"""Test that scheduler handles NULL names safely without crashing."""
import asyncio
import re
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_scheduler_null_name(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test that scheduler handles NULL names safely without crashing."""
loop = asyncio.get_running_loop()
test_complete_future: asyncio.Future[bool] = loop.create_future()
# Pattern to match test completion
test_complete_pattern = re.compile(r"Test completed successfully")
def check_output(line: str) -> None:
"""Check log output for test completion."""
if not test_complete_future.done() and test_complete_pattern.search(line):
test_complete_future.set_result(True)
async with run_compiled(yaml_config, line_callback=check_output):
async with api_client_connected() as client:
# Verify we can connect
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "scheduler-null-name"
# List services
_, services = await asyncio.wait_for(
client.list_entities_services(), timeout=5.0
)
# Find our test service
test_null_name_service = next(
(s for s in services if s.name == "test_null_name"), None
)
assert test_null_name_service is not None, (
"test_null_name service not found"
)
# Execute the test
client.execute_service(test_null_name_service, {})
# Wait for test completion
try:
await asyncio.wait_for(test_complete_future, timeout=10.0)
except asyncio.TimeoutError:
pytest.fail(
"Test did not complete within timeout - likely crashed due to NULL name"
)

0
tests/script/__init__.py Normal file
View File

View File

@@ -0,0 +1,359 @@
"""Unit tests for script/clang_tidy_hash.py module."""
import hashlib
from pathlib import Path
import sys
from unittest.mock import Mock, patch
import pytest
# Add the script directory to Python path so we can import clang_tidy_hash
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "script"))
import clang_tidy_hash # noqa: E402
@pytest.mark.parametrize(
("file_content", "expected"),
[
(
"clang-tidy==18.1.5 # via -r requirements_dev.in\n",
"clang-tidy==18.1.5 # via -r requirements_dev.in",
),
(
"other-package==1.0\nclang-tidy==17.0.0\nmore-packages==2.0\n",
"clang-tidy==17.0.0",
),
(
"# comment\nclang-tidy==16.0.0 # some comment\n",
"clang-tidy==16.0.0 # some comment",
),
("no-clang-tidy-here==1.0\n", "clang-tidy version not found"),
],
)
def test_get_clang_tidy_version_from_requirements(
file_content: str, expected: str
) -> None:
"""Test extracting clang-tidy version from various file formats."""
# Mock read_file_lines to return our test content
with patch("clang_tidy_hash.read_file_lines") as mock_read:
mock_read.return_value = file_content.splitlines(keepends=True)
result = clang_tidy_hash.get_clang_tidy_version_from_requirements()
assert result == expected
@pytest.mark.parametrize(
("platformio_content", "expected_flags"),
[
(
"[env:esp32]\n"
"platform = espressif32\n"
"\n"
"[flags:clangtidy]\n"
"build_flags = -Wall\n"
"extra_flags = -Wextra\n"
"\n"
"[env:esp8266]\n",
"build_flags = -Wall\nextra_flags = -Wextra",
),
(
"[flags:clangtidy]\n# Comment line\nbuild_flags = -O2\n\n[next_section]\n",
"build_flags = -O2",
),
(
"[flags:clangtidy]\nflag_c = -std=c99\nflag_b = -Wall\nflag_a = -O2\n",
"flag_a = -O2\nflag_b = -Wall\nflag_c = -std=c99", # Sorted
),
(
"[env:esp32]\nplatform = espressif32\n", # No clangtidy section
"",
),
],
)
def test_extract_platformio_flags(platformio_content: str, expected_flags: str) -> None:
"""Test extracting clang-tidy flags from platformio.ini."""
# Mock read_file_lines to return our test content
with patch("clang_tidy_hash.read_file_lines") as mock_read:
mock_read.return_value = platformio_content.splitlines(keepends=True)
result = clang_tidy_hash.extract_platformio_flags()
assert result == expected_flags
def test_calculate_clang_tidy_hash() -> None:
"""Test calculating hash from all configuration sources."""
clang_tidy_content = b"Checks: '-*,readability-*'\n"
requirements_version = "clang-tidy==18.1.5"
pio_flags = "build_flags = -Wall"
# Expected hash calculation
expected_hasher = hashlib.sha256()
expected_hasher.update(clang_tidy_content)
expected_hasher.update(requirements_version.encode())
expected_hasher.update(pio_flags.encode())
expected_hash = expected_hasher.hexdigest()
# Mock the dependencies
with (
patch("clang_tidy_hash.read_file_bytes", return_value=clang_tidy_content),
patch(
"clang_tidy_hash.get_clang_tidy_version_from_requirements",
return_value=requirements_version,
),
patch("clang_tidy_hash.extract_platformio_flags", return_value=pio_flags),
):
result = clang_tidy_hash.calculate_clang_tidy_hash()
assert result == expected_hash
def test_read_stored_hash_exists(tmp_path: Path) -> None:
"""Test reading hash when file exists."""
stored_hash = "abc123def456"
hash_file = tmp_path / ".clang-tidy.hash"
hash_file.write_text(f"{stored_hash}\n")
with (
patch("clang_tidy_hash.Path") as mock_path_class,
patch("clang_tidy_hash.read_file_lines", return_value=[f"{stored_hash}\n"]),
):
# Mock the path calculation and exists check
mock_hash_file = Mock()
mock_hash_file.exists.return_value = True
mock_path_class.return_value.parent.parent.__truediv__.return_value = (
mock_hash_file
)
result = clang_tidy_hash.read_stored_hash()
assert result == stored_hash
def test_read_stored_hash_not_exists() -> None:
"""Test reading hash when file doesn't exist."""
with patch("clang_tidy_hash.Path") as mock_path_class:
# Mock the path calculation and exists check
mock_hash_file = Mock()
mock_hash_file.exists.return_value = False
mock_path_class.return_value.parent.parent.__truediv__.return_value = (
mock_hash_file
)
result = clang_tidy_hash.read_stored_hash()
assert result is None
def test_write_hash() -> None:
"""Test writing hash to file."""
hash_value = "abc123def456"
with patch("clang_tidy_hash.write_file_content") as mock_write:
clang_tidy_hash.write_hash(hash_value)
# Verify write_file_content was called with correct parameters
mock_write.assert_called_once()
args = mock_write.call_args[0]
assert str(args[0]).endswith(".clang-tidy.hash")
assert args[1] == hash_value
@pytest.mark.parametrize(
("args", "current_hash", "stored_hash", "expected_exit"),
[
(["--check"], "abc123", "abc123", 1), # Hashes match, no scan needed
(["--check"], "abc123", "def456", 0), # Hashes differ, scan needed
(["--check"], "abc123", None, 0), # No stored hash, scan needed
],
)
def test_main_check_mode(
args: list[str], current_hash: str, stored_hash: str | None, expected_exit: int
) -> None:
"""Test main function in check mode."""
with (
patch("sys.argv", ["clang_tidy_hash.py"] + args),
patch("clang_tidy_hash.calculate_clang_tidy_hash", return_value=current_hash),
patch("clang_tidy_hash.read_stored_hash", return_value=stored_hash),
pytest.raises(SystemExit) as exc_info,
):
clang_tidy_hash.main()
assert exc_info.value.code == expected_exit
def test_main_update_mode(capsys: pytest.CaptureFixture[str]) -> None:
"""Test main function in update mode."""
current_hash = "abc123"
with (
patch("sys.argv", ["clang_tidy_hash.py", "--update"]),
patch("clang_tidy_hash.calculate_clang_tidy_hash", return_value=current_hash),
patch("clang_tidy_hash.write_hash") as mock_write,
):
clang_tidy_hash.main()
mock_write.assert_called_once_with(current_hash)
captured = capsys.readouterr()
assert f"Hash updated: {current_hash}" in captured.out
@pytest.mark.parametrize(
("current_hash", "stored_hash"),
[
("abc123", "def456"), # Hash changed, should update
("abc123", None), # No stored hash, should update
],
)
def test_main_update_if_changed_mode_update(
current_hash: str, stored_hash: str | None, capsys: pytest.CaptureFixture[str]
) -> None:
"""Test main function in update-if-changed mode when update is needed."""
with (
patch("sys.argv", ["clang_tidy_hash.py", "--update-if-changed"]),
patch("clang_tidy_hash.calculate_clang_tidy_hash", return_value=current_hash),
patch("clang_tidy_hash.read_stored_hash", return_value=stored_hash),
patch("clang_tidy_hash.write_hash") as mock_write,
pytest.raises(SystemExit) as exc_info,
):
clang_tidy_hash.main()
assert exc_info.value.code == 0
mock_write.assert_called_once_with(current_hash)
captured = capsys.readouterr()
assert "Clang-tidy hash updated" in captured.out
def test_main_update_if_changed_mode_no_update(
capsys: pytest.CaptureFixture[str],
) -> None:
"""Test main function in update-if-changed mode when no update is needed."""
current_hash = "abc123"
stored_hash = "abc123"
with (
patch("sys.argv", ["clang_tidy_hash.py", "--update-if-changed"]),
patch("clang_tidy_hash.calculate_clang_tidy_hash", return_value=current_hash),
patch("clang_tidy_hash.read_stored_hash", return_value=stored_hash),
patch("clang_tidy_hash.write_hash") as mock_write,
pytest.raises(SystemExit) as exc_info,
):
clang_tidy_hash.main()
assert exc_info.value.code == 0
mock_write.assert_not_called()
captured = capsys.readouterr()
assert "Clang-tidy hash unchanged" in captured.out
def test_main_verify_mode_success(capsys: pytest.CaptureFixture[str]) -> None:
"""Test main function in verify mode when verification passes."""
current_hash = "abc123"
stored_hash = "abc123"
with (
patch("sys.argv", ["clang_tidy_hash.py", "--verify"]),
patch("clang_tidy_hash.calculate_clang_tidy_hash", return_value=current_hash),
patch("clang_tidy_hash.read_stored_hash", return_value=stored_hash),
):
clang_tidy_hash.main()
captured = capsys.readouterr()
assert "Hash verification passed" in captured.out
@pytest.mark.parametrize(
("current_hash", "stored_hash"),
[
("abc123", "def456"), # Hashes differ, verification fails
("abc123", None), # No stored hash, verification fails
],
)
def test_main_verify_mode_failure(
current_hash: str, stored_hash: str | None, capsys: pytest.CaptureFixture[str]
) -> None:
"""Test main function in verify mode when verification fails."""
with (
patch("sys.argv", ["clang_tidy_hash.py", "--verify"]),
patch("clang_tidy_hash.calculate_clang_tidy_hash", return_value=current_hash),
patch("clang_tidy_hash.read_stored_hash", return_value=stored_hash),
pytest.raises(SystemExit) as exc_info,
):
clang_tidy_hash.main()
assert exc_info.value.code == 1
captured = capsys.readouterr()
assert "ERROR: Clang-tidy configuration has changed" in captured.out
def test_main_default_mode(capsys: pytest.CaptureFixture[str]) -> None:
"""Test main function in default mode (no arguments)."""
current_hash = "abc123"
stored_hash = "def456"
with (
patch("sys.argv", ["clang_tidy_hash.py"]),
patch("clang_tidy_hash.calculate_clang_tidy_hash", return_value=current_hash),
patch("clang_tidy_hash.read_stored_hash", return_value=stored_hash),
):
clang_tidy_hash.main()
captured = capsys.readouterr()
assert f"Current hash: {current_hash}" in captured.out
assert f"Stored hash: {stored_hash}" in captured.out
assert "Match: False" in captured.out
def test_read_file_lines(tmp_path: Path) -> None:
"""Test read_file_lines helper function."""
test_file = tmp_path / "test.txt"
test_content = "line1\nline2\nline3\n"
test_file.write_text(test_content)
result = clang_tidy_hash.read_file_lines(test_file)
assert result == ["line1\n", "line2\n", "line3\n"]
def test_read_file_bytes(tmp_path: Path) -> None:
"""Test read_file_bytes helper function."""
test_file = tmp_path / "test.bin"
test_content = b"binary content\x00\xff"
test_file.write_bytes(test_content)
result = clang_tidy_hash.read_file_bytes(test_file)
assert result == test_content
def test_write_file_content(tmp_path: Path) -> None:
"""Test write_file_content helper function."""
test_file = tmp_path / "test.txt"
test_content = "test content"
clang_tidy_hash.write_file_content(test_file, test_content)
assert test_file.read_text() == test_content
@pytest.mark.parametrize(
("line", "expected"),
[
("clang-tidy==18.1.5", ("clang-tidy", "clang-tidy==18.1.5")),
(
"clang-tidy==18.1.5 # comment",
("clang-tidy", "clang-tidy==18.1.5 # comment"),
),
("some-package>=1.0,<2.0", ("some-package", "some-package>=1.0,<2.0")),
("pkg_with-dashes==1.0", ("pkg_with-dashes", "pkg_with-dashes==1.0")),
("# just a comment", None),
("", None),
(" ", None),
("invalid line without version", None),
],
)
def test_parse_requirement_line(line: str, expected: tuple[str, str] | None) -> None:
"""Test parsing individual requirement lines."""
result = clang_tidy_hash.parse_requirement_line(line)
assert result == expected

View File

@@ -0,0 +1,352 @@
"""Unit tests for script/determine-jobs.py module."""
from collections.abc import Generator
import importlib.util
import json
import os
import subprocess
import sys
from unittest.mock import Mock, patch
import pytest
# Add the script directory to Python path so we can import the module
script_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "script")
)
sys.path.insert(0, script_dir)
spec = importlib.util.spec_from_file_location(
"determine_jobs", os.path.join(script_dir, "determine-jobs.py")
)
determine_jobs = importlib.util.module_from_spec(spec)
spec.loader.exec_module(determine_jobs)
@pytest.fixture
def mock_should_run_integration_tests() -> Generator[Mock, None, None]:
"""Mock should_run_integration_tests from helpers."""
with patch.object(determine_jobs, "should_run_integration_tests") as mock:
yield mock
@pytest.fixture
def mock_should_run_clang_tidy() -> Generator[Mock, None, None]:
"""Mock should_run_clang_tidy from helpers."""
with patch.object(determine_jobs, "should_run_clang_tidy") as mock:
yield mock
@pytest.fixture
def mock_should_run_clang_format() -> Generator[Mock, None, None]:
"""Mock should_run_clang_format from helpers."""
with patch.object(determine_jobs, "should_run_clang_format") as mock:
yield mock
@pytest.fixture
def mock_should_run_python_linters() -> Generator[Mock, None, None]:
"""Mock should_run_python_linters from helpers."""
with patch.object(determine_jobs, "should_run_python_linters") as mock:
yield mock
@pytest.fixture
def mock_subprocess_run() -> Generator[Mock, None, None]:
"""Mock subprocess.run for list-components.py calls."""
with patch.object(determine_jobs.subprocess, "run") as mock:
yield mock
def test_main_all_tests_should_run(
mock_should_run_integration_tests: Mock,
mock_should_run_clang_tidy: Mock,
mock_should_run_clang_format: Mock,
mock_should_run_python_linters: Mock,
mock_subprocess_run: Mock,
capsys: pytest.CaptureFixture[str],
) -> None:
"""Test when all tests should run."""
mock_should_run_integration_tests.return_value = True
mock_should_run_clang_tidy.return_value = True
mock_should_run_clang_format.return_value = True
mock_should_run_python_linters.return_value = True
# Mock list-components.py output
mock_result = Mock()
mock_result.stdout = "wifi\napi\nsensor\n"
mock_subprocess_run.return_value = mock_result
# Run main function with mocked argv
with patch("sys.argv", ["determine-jobs.py"]):
determine_jobs.main()
# Check output
captured = capsys.readouterr()
output = json.loads(captured.out)
assert output["integration_tests"] is True
assert output["clang_tidy"] is True
assert output["clang_format"] is True
assert output["python_linters"] is True
assert output["changed_components"] == ["wifi", "api", "sensor"]
assert output["component_test_count"] == 3
def test_main_no_tests_should_run(
mock_should_run_integration_tests: Mock,
mock_should_run_clang_tidy: Mock,
mock_should_run_clang_format: Mock,
mock_should_run_python_linters: Mock,
mock_subprocess_run: Mock,
capsys: pytest.CaptureFixture[str],
) -> None:
"""Test when no tests should run."""
mock_should_run_integration_tests.return_value = False
mock_should_run_clang_tidy.return_value = False
mock_should_run_clang_format.return_value = False
mock_should_run_python_linters.return_value = False
# Mock empty list-components.py output
mock_result = Mock()
mock_result.stdout = ""
mock_subprocess_run.return_value = mock_result
# Run main function with mocked argv
with patch("sys.argv", ["determine-jobs.py"]):
determine_jobs.main()
# Check output
captured = capsys.readouterr()
output = json.loads(captured.out)
assert output["integration_tests"] is False
assert output["clang_tidy"] is False
assert output["clang_format"] is False
assert output["python_linters"] is False
assert output["changed_components"] == []
assert output["component_test_count"] == 0
def test_main_list_components_fails(
mock_should_run_integration_tests: Mock,
mock_should_run_clang_tidy: Mock,
mock_should_run_clang_format: Mock,
mock_should_run_python_linters: Mock,
mock_subprocess_run: Mock,
capsys: pytest.CaptureFixture[str],
) -> None:
"""Test when list-components.py fails."""
mock_should_run_integration_tests.return_value = True
mock_should_run_clang_tidy.return_value = True
mock_should_run_clang_format.return_value = True
mock_should_run_python_linters.return_value = True
# Mock list-components.py failure
mock_subprocess_run.side_effect = subprocess.CalledProcessError(1, "cmd")
# Run main function with mocked argv - should raise
with patch("sys.argv", ["determine-jobs.py"]):
with pytest.raises(subprocess.CalledProcessError):
determine_jobs.main()
def test_main_with_branch_argument(
mock_should_run_integration_tests: Mock,
mock_should_run_clang_tidy: Mock,
mock_should_run_clang_format: Mock,
mock_should_run_python_linters: Mock,
mock_subprocess_run: Mock,
capsys: pytest.CaptureFixture[str],
) -> None:
"""Test with branch argument."""
mock_should_run_integration_tests.return_value = False
mock_should_run_clang_tidy.return_value = True
mock_should_run_clang_format.return_value = False
mock_should_run_python_linters.return_value = True
# Mock list-components.py output
mock_result = Mock()
mock_result.stdout = "mqtt\n"
mock_subprocess_run.return_value = mock_result
with patch("sys.argv", ["script.py", "-b", "main"]):
determine_jobs.main()
# Check that functions were called with branch
mock_should_run_integration_tests.assert_called_once_with("main")
mock_should_run_clang_tidy.assert_called_once_with("main")
mock_should_run_clang_format.assert_called_once_with("main")
mock_should_run_python_linters.assert_called_once_with("main")
# Check that list-components.py was called with branch
mock_subprocess_run.assert_called_once()
call_args = mock_subprocess_run.call_args[0][0]
assert "--changed" in call_args
assert "-b" in call_args
assert "main" in call_args
# Check output
captured = capsys.readouterr()
output = json.loads(captured.out)
assert output["integration_tests"] is False
assert output["clang_tidy"] is True
assert output["clang_format"] is False
assert output["python_linters"] is True
assert output["changed_components"] == ["mqtt"]
assert output["component_test_count"] == 1
def test_should_run_integration_tests(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test should_run_integration_tests function."""
# Core C++ files trigger tests
with patch.object(
determine_jobs, "changed_files", return_value=["esphome/core/component.cpp"]
):
result = determine_jobs.should_run_integration_tests()
assert result is True
# Core Python files trigger tests
with patch.object(
determine_jobs, "changed_files", return_value=["esphome/core/config.py"]
):
result = determine_jobs.should_run_integration_tests()
assert result is True
# Python files directly in esphome/ do NOT trigger tests
with patch.object(
determine_jobs, "changed_files", return_value=["esphome/config.py"]
):
result = determine_jobs.should_run_integration_tests()
assert result is False
# Python files in subdirectories (not core) do NOT trigger tests
with patch.object(
determine_jobs,
"changed_files",
return_value=["esphome/dashboard/web_server.py"],
):
result = determine_jobs.should_run_integration_tests()
assert result is False
def test_should_run_integration_tests_with_branch() -> None:
"""Test should_run_integration_tests with branch argument."""
with patch.object(determine_jobs, "changed_files") as mock_changed:
mock_changed.return_value = []
determine_jobs.should_run_integration_tests("release")
mock_changed.assert_called_once_with("release")
def test_should_run_integration_tests_component_dependency() -> None:
"""Test that integration tests run when components used in fixtures change."""
with patch.object(
determine_jobs, "changed_files", return_value=["esphome/components/api/api.cpp"]
):
with patch.object(
determine_jobs, "get_components_from_integration_fixtures"
) as mock_fixtures:
mock_fixtures.return_value = {"api", "sensor"}
with patch.object(determine_jobs, "get_all_dependencies") as mock_deps:
mock_deps.return_value = {"api", "sensor", "network"}
result = determine_jobs.should_run_integration_tests()
assert result is True
@pytest.mark.parametrize(
("check_returncode", "changed_files", "expected_result"),
[
(0, [], True), # Hash changed - need full scan
(1, ["esphome/core.cpp"], True), # C++ file changed
(1, ["README.md"], False), # No C++ files changed
],
)
def test_should_run_clang_tidy(
check_returncode: int,
changed_files: list[str],
expected_result: bool,
) -> None:
"""Test should_run_clang_tidy function."""
with patch.object(determine_jobs, "changed_files", return_value=changed_files):
# Test with hash check returning specific code
with patch("subprocess.run") as mock_run:
mock_run.return_value = Mock(returncode=check_returncode)
result = determine_jobs.should_run_clang_tidy()
assert result == expected_result
# Test with hash check failing (exception)
if check_returncode != 0:
with patch("subprocess.run", side_effect=Exception("Failed")):
result = determine_jobs.should_run_clang_tidy()
assert result is True # Fail safe - run clang-tidy
def test_should_run_clang_tidy_with_branch() -> None:
"""Test should_run_clang_tidy with branch argument."""
with patch.object(determine_jobs, "changed_files") as mock_changed:
mock_changed.return_value = []
with patch("subprocess.run") as mock_run:
mock_run.return_value = Mock(returncode=1) # Hash unchanged
determine_jobs.should_run_clang_tidy("release")
mock_changed.assert_called_once_with("release")
@pytest.mark.parametrize(
("changed_files", "expected_result"),
[
(["esphome/core.py"], True),
(["script/test.py"], True),
(["esphome/test.pyi"], True), # .pyi files should trigger
(["README.md"], False),
([], False),
],
)
def test_should_run_python_linters(
changed_files: list[str], expected_result: bool
) -> None:
"""Test should_run_python_linters function."""
with patch.object(determine_jobs, "changed_files", return_value=changed_files):
result = determine_jobs.should_run_python_linters()
assert result == expected_result
def test_should_run_python_linters_with_branch() -> None:
"""Test should_run_python_linters with branch argument."""
with patch.object(determine_jobs, "changed_files") as mock_changed:
mock_changed.return_value = []
determine_jobs.should_run_python_linters("release")
mock_changed.assert_called_once_with("release")
@pytest.mark.parametrize(
("changed_files", "expected_result"),
[
(["esphome/core.cpp"], True),
(["esphome/core.h"], True),
(["test.hpp"], True),
(["test.cc"], True),
(["test.cxx"], True),
(["test.c"], True),
(["test.tcc"], True),
(["README.md"], False),
([], False),
],
)
def test_should_run_clang_format(
changed_files: list[str], expected_result: bool
) -> None:
"""Test should_run_clang_format function."""
with patch.object(determine_jobs, "changed_files", return_value=changed_files):
result = determine_jobs.should_run_clang_format()
assert result == expected_result
def test_should_run_clang_format_with_branch() -> None:
"""Test should_run_clang_format with branch argument."""
with patch.object(determine_jobs, "changed_files") as mock_changed:
mock_changed.return_value = []
determine_jobs.should_run_clang_format("release")
mock_changed.assert_called_once_with("release")

1014
tests/script/test_helpers.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -8,9 +8,19 @@ from typing import Any
import pytest
from esphome.config_validation import Invalid
from esphome.const import CONF_DEVICE_ID, CONF_DISABLED_BY_DEFAULT, CONF_ICON, CONF_NAME
from esphome.const import (
CONF_DEVICE_ID,
CONF_DISABLED_BY_DEFAULT,
CONF_ICON,
CONF_INTERNAL,
CONF_NAME,
)
from esphome.core import CORE, ID, entity_helpers
from esphome.core.entity_helpers import get_base_entity_object_id, setup_entity
from esphome.core.entity_helpers import (
entity_duplicate_validator,
get_base_entity_object_id,
setup_entity,
)
from esphome.cpp_generator import MockObj
from esphome.helpers import sanitize, snake_case
@@ -493,11 +503,6 @@ async def test_setup_entity_disabled_by_default(
def test_entity_duplicate_validator() -> None:
"""Test the entity_duplicate_validator function."""
from esphome.core.entity_helpers import entity_duplicate_validator
# Reset CORE unique_ids for clean test
CORE.unique_ids.clear()
# Create validator for sensor platform
validator = entity_duplicate_validator("sensor")
@@ -523,11 +528,6 @@ def test_entity_duplicate_validator() -> None:
def test_entity_duplicate_validator_with_devices() -> None:
"""Test entity_duplicate_validator with devices."""
from esphome.core.entity_helpers import entity_duplicate_validator
# Reset CORE unique_ids for clean test
CORE.unique_ids.clear()
# Create validator for sensor platform
validator = entity_duplicate_validator("sensor")
@@ -605,3 +605,36 @@ def test_entity_different_platforms_yaml_validation(
)
# This should succeed
assert result is not None
def test_entity_duplicate_validator_internal_entities() -> None:
"""Test that internal entities are excluded from duplicate name validation."""
# Create validator for sensor platform
validator = entity_duplicate_validator("sensor")
# First entity should pass
config1 = {CONF_NAME: "Temperature"}
validated1 = validator(config1)
assert validated1 == config1
assert ("sensor", "temperature") in CORE.unique_ids
# Internal entity with same name should pass (not added to unique_ids)
config2 = {CONF_NAME: "Temperature", CONF_INTERNAL: True}
validated2 = validator(config2)
assert validated2 == config2
# Internal entity should not be added to unique_ids
assert len([k for k in CORE.unique_ids if k == ("sensor", "temperature")]) == 1
# Another internal entity with same name should also pass
config3 = {CONF_NAME: "Temperature", CONF_INTERNAL: True}
validated3 = validator(config3)
assert validated3 == config3
# Still only one entry in unique_ids (from the non-internal entity)
assert len([k for k in CORE.unique_ids if k == ("sensor", "temperature")]) == 1
# Non-internal entity with same name should fail
config4 = {CONF_NAME: "Temperature"}
with pytest.raises(
Invalid, match=r"Duplicate sensor entity with name 'Temperature' found"
):
validator(config4)

View File

@@ -47,9 +47,8 @@ def dict_diff(a, b, path=""):
elif len(b) > len(a):
for i in range(min_len, len(b)):
diffs.append(f"{path}[{i}] only in expected: {b[i]!r}")
else:
if a != b:
diffs.append(f"\t{path}: actual={a!r} expected={b!r}")
elif a != b:
diffs.append(f"\t{path}: actual={a!r} expected={b!r}")
return diffs