From 35dce3c80d783aa7c04c02cb1529c3aba842efef Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 14 Sep 2025 16:31:38 -0500 Subject: [PATCH] Add additional test coverage ahead of Path conversion (#10700) --- tests/unit_tests/conftest.py | 44 ++ tests/unit_tests/core/test_config.py | 329 ++++++++++++++- tests/unit_tests/test_platformio_api.py | 521 +++++++++++++++++++++++- tests/unit_tests/test_storage_json.py | 492 +++++++++++++++++++++- 4 files changed, 1369 insertions(+), 17 deletions(-) diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index a1e438b577..06d06d0506 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -9,8 +9,10 @@ not be part of a unit test suite. """ +from collections.abc import Generator from pathlib import Path import sys +from unittest.mock import Mock, patch import pytest @@ -43,3 +45,45 @@ def setup_core(tmp_path: Path) -> Path: """Set up CORE with test paths.""" CORE.config_path = str(tmp_path / "test.yaml") return tmp_path + + +@pytest.fixture +def mock_write_file_if_changed() -> Generator[Mock, None, None]: + """Mock write_file_if_changed for storage_json.""" + with patch("esphome.storage_json.write_file_if_changed") as mock: + yield mock + + +@pytest.fixture +def mock_copy_file_if_changed() -> Generator[Mock, None, None]: + """Mock copy_file_if_changed for core.config.""" + with patch("esphome.core.config.copy_file_if_changed") as mock: + yield mock + + +@pytest.fixture +def mock_run_platformio_cli() -> Generator[Mock, None, None]: + """Mock run_platformio_cli for platformio_api.""" + with patch("esphome.platformio_api.run_platformio_cli") as mock: + yield mock + + +@pytest.fixture +def mock_run_platformio_cli_run() -> Generator[Mock, None, None]: + """Mock run_platformio_cli_run for platformio_api.""" + with patch("esphome.platformio_api.run_platformio_cli_run") as mock: + yield mock + + +@pytest.fixture +def mock_decode_pc() -> Generator[Mock, None, None]: + """Mock _decode_pc for platformio_api.""" + with patch("esphome.platformio_api._decode_pc") as mock: + yield mock + + +@pytest.fixture +def mock_run_external_command() -> Generator[Mock, None, None]: + """Mock run_external_command for platformio_api.""" + with patch("esphome.platformio_api.run_external_command") as mock: + yield mock diff --git a/tests/unit_tests/core/test_config.py b/tests/unit_tests/core/test_config.py index f5ba5221ed..484c30d59e 100644 --- a/tests/unit_tests/core/test_config.py +++ b/tests/unit_tests/core/test_config.py @@ -1,15 +1,34 @@ """Unit tests for core config functionality including areas and devices.""" from collections.abc import Callable +import os from pathlib import Path +import types from typing import Any +from unittest.mock import MagicMock, Mock, patch import pytest from esphome import config_validation as cv, core -from esphome.const import CONF_AREA, CONF_AREAS, CONF_DEVICES -from esphome.core import config -from esphome.core.config import Area, validate_area_config +from esphome.const import ( + CONF_AREA, + CONF_AREAS, + CONF_BUILD_PATH, + CONF_DEVICES, + CONF_ESPHOME, + CONF_NAME, + CONF_NAME_ADD_MAC_SUFFIX, + KEY_CORE, +) +from esphome.core import CORE, config +from esphome.core.config import ( + Area, + preload_core_config, + valid_include, + valid_project_name, + validate_area_config, + validate_hostname, +) from .common import load_config_from_fixture @@ -245,3 +264,307 @@ def test_add_platform_defines_priority() -> None: f"_add_platform_defines priority ({config._add_platform_defines.priority}) must be lower than " f"globals priority ({globals_to_code.priority}) to fix issue #10431 (sensor count bug with lambdas)" ) + + +def test_valid_include_with_angle_brackets() -> None: + """Test valid_include accepts angle bracket includes.""" + assert valid_include("") == "" + + +def test_valid_include_with_valid_file(tmp_path: Path) -> None: + """Test valid_include accepts valid include files.""" + CORE.config_path = str(tmp_path / "test.yaml") + include_file = tmp_path / "include.h" + include_file.touch() + + assert valid_include(str(include_file)) == str(include_file) + + +def test_valid_include_with_valid_directory(tmp_path: Path) -> None: + """Test valid_include accepts valid directories.""" + CORE.config_path = str(tmp_path / "test.yaml") + include_dir = tmp_path / "includes" + include_dir.mkdir() + + assert valid_include(str(include_dir)) == str(include_dir) + + +def test_valid_include_invalid_extension(tmp_path: Path) -> None: + """Test valid_include rejects files with invalid extensions.""" + CORE.config_path = str(tmp_path / "test.yaml") + invalid_file = tmp_path / "file.txt" + invalid_file.touch() + + with pytest.raises(cv.Invalid, match="Include has invalid file extension"): + valid_include(str(invalid_file)) + + +def test_valid_project_name_valid() -> None: + """Test valid_project_name accepts valid project names.""" + assert valid_project_name("esphome.my_project") == "esphome.my_project" + + +def test_valid_project_name_no_namespace() -> None: + """Test valid_project_name rejects names without namespace.""" + with pytest.raises(cv.Invalid, match="project name needs to have a namespace"): + valid_project_name("my_project") + + +def test_valid_project_name_multiple_dots() -> None: + """Test valid_project_name rejects names with multiple dots.""" + with pytest.raises(cv.Invalid, match="project name needs to have a namespace"): + valid_project_name("esphome.my.project") + + +def test_validate_hostname_valid() -> None: + """Test validate_hostname accepts valid hostnames.""" + config = {CONF_NAME: "my-device", CONF_NAME_ADD_MAC_SUFFIX: False} + assert validate_hostname(config) == config + + +def test_validate_hostname_too_long() -> None: + """Test validate_hostname rejects hostnames that are too long.""" + config = { + CONF_NAME: "a" * 32, # 32 chars, max is 31 + CONF_NAME_ADD_MAC_SUFFIX: False, + } + with pytest.raises(cv.Invalid, match="Hostnames can only be 31 characters long"): + validate_hostname(config) + + +def test_validate_hostname_too_long_with_mac_suffix() -> None: + """Test validate_hostname accounts for MAC suffix length.""" + config = { + CONF_NAME: "a" * 25, # 25 chars, max is 24 with MAC suffix + CONF_NAME_ADD_MAC_SUFFIX: True, + } + with pytest.raises(cv.Invalid, match="Hostnames can only be 24 characters long"): + validate_hostname(config) + + +def test_validate_hostname_with_underscore(caplog) -> None: + """Test validate_hostname warns about underscores.""" + config = {CONF_NAME: "my_device", CONF_NAME_ADD_MAC_SUFFIX: False} + assert validate_hostname(config) == config + assert ( + "Using the '_' (underscore) character in the hostname is discouraged" + in caplog.text + ) + + +def test_preload_core_config_basic(setup_core: Path) -> None: + """Test preload_core_config sets basic CORE attributes.""" + config = { + CONF_ESPHOME: { + CONF_NAME: "test_device", + }, + "esp32": {}, + } + result = {} + + platform = preload_core_config(config, result) + + assert CORE.name == "test_device" + assert platform == "esp32" + assert KEY_CORE in CORE.data + assert CONF_BUILD_PATH in config[CONF_ESPHOME] + + +def test_preload_core_config_with_build_path(setup_core: Path) -> None: + """Test preload_core_config uses provided build path.""" + config = { + CONF_ESPHOME: { + CONF_NAME: "test_device", + CONF_BUILD_PATH: "/custom/build/path", + }, + "esp8266": {}, + } + result = {} + + platform = preload_core_config(config, result) + + assert config[CONF_ESPHOME][CONF_BUILD_PATH] == "/custom/build/path" + assert platform == "esp8266" + + +def test_preload_core_config_env_build_path(setup_core: Path) -> None: + """Test preload_core_config uses ESPHOME_BUILD_PATH env var.""" + config = { + CONF_ESPHOME: { + CONF_NAME: "test_device", + }, + "rp2040": {}, + } + result = {} + + with patch.dict(os.environ, {"ESPHOME_BUILD_PATH": "/env/build"}): + platform = preload_core_config(config, result) + + assert CONF_BUILD_PATH in config[CONF_ESPHOME] + assert "test_device" in config[CONF_ESPHOME][CONF_BUILD_PATH] + assert platform == "rp2040" + + +def test_preload_core_config_no_platform(setup_core: Path) -> None: + """Test preload_core_config raises when no platform is specified.""" + config = { + CONF_ESPHOME: { + CONF_NAME: "test_device", + }, + } + result = {} + + # Mock _is_target_platform to avoid expensive component loading + with patch("esphome.core.config._is_target_platform") as mock_is_platform: + # Return True for known platforms + mock_is_platform.side_effect = lambda name: name in [ + "esp32", + "esp8266", + "rp2040", + ] + + with pytest.raises(cv.Invalid, match="Platform missing"): + preload_core_config(config, result) + + +def test_preload_core_config_multiple_platforms(setup_core: Path) -> None: + """Test preload_core_config raises when multiple platforms are specified.""" + config = { + CONF_ESPHOME: { + CONF_NAME: "test_device", + }, + "esp32": {}, + "esp8266": {}, + } + result = {} + + # Mock _is_target_platform to avoid expensive component loading + with patch("esphome.core.config._is_target_platform") as mock_is_platform: + # Return True for known platforms + mock_is_platform.side_effect = lambda name: name in [ + "esp32", + "esp8266", + "rp2040", + ] + + with pytest.raises(cv.Invalid, match="Found multiple target platform blocks"): + preload_core_config(config, result) + + +def test_include_file_header(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None: + """Test include_file adds include statement for header files.""" + src_file = tmp_path / "source.h" + src_file.write_text("// Header content") + + CORE.build_path = str(tmp_path / "build") + + with patch("esphome.core.config.cg") as mock_cg: + # Mock RawStatement to capture the text + mock_raw_statement = MagicMock() + mock_raw_statement.text = "" + + def raw_statement_side_effect(text): + mock_raw_statement.text = text + return mock_raw_statement + + mock_cg.RawStatement.side_effect = raw_statement_side_effect + + config.include_file(str(src_file), "test.h") + + mock_copy_file_if_changed.assert_called_once() + mock_cg.add_global.assert_called_once() + # Check that include statement was added + assert '#include "test.h"' in mock_raw_statement.text + + +def test_include_file_cpp(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None: + """Test include_file does not add include for cpp files.""" + src_file = tmp_path / "source.cpp" + src_file.write_text("// CPP content") + + CORE.build_path = str(tmp_path / "build") + + with patch("esphome.core.config.cg") as mock_cg: + config.include_file(str(src_file), "test.cpp") + + mock_copy_file_if_changed.assert_called_once() + # Should not add include statement for .cpp files + mock_cg.add_global.assert_not_called() + + +def test_get_usable_cpu_count() -> None: + """Test get_usable_cpu_count returns CPU count.""" + count = config.get_usable_cpu_count() + assert isinstance(count, int) + assert count > 0 + + +def test_get_usable_cpu_count_with_process_cpu_count() -> None: + """Test get_usable_cpu_count uses process_cpu_count when available.""" + # Test with process_cpu_count (Python 3.13+) + # Create a mock os module with process_cpu_count + + mock_os = types.SimpleNamespace(process_cpu_count=lambda: 8, cpu_count=lambda: 4) + + with patch("esphome.core.config.os", mock_os): + # When process_cpu_count exists, it should be used + count = config.get_usable_cpu_count() + assert count == 8 + + # Test fallback to cpu_count when process_cpu_count not available + mock_os_no_process = types.SimpleNamespace(cpu_count=lambda: 4) + + with patch("esphome.core.config.os", mock_os_no_process): + count = config.get_usable_cpu_count() + assert count == 4 + + +def test_list_target_platforms(tmp_path: Path) -> None: + """Test _list_target_platforms returns available platforms.""" + # Create mock components directory structure + components_dir = tmp_path / "components" + components_dir.mkdir() + + # Create platform and non-platform directories with __init__.py + platforms = ["esp32", "esp8266", "rp2040", "libretiny", "host"] + non_platforms = ["sensor"] + + for component in platforms + non_platforms: + component_dir = components_dir / component + component_dir.mkdir() + (component_dir / "__init__.py").touch() + + # Create a file (not a directory) + (components_dir / "README.md").touch() + + # Create a directory without __init__.py + (components_dir / "no_init").mkdir() + + # Mock Path(__file__).parents[1] to return our tmp_path + with patch("esphome.core.config.Path") as mock_path: + mock_file_path = MagicMock() + mock_file_path.parents = [MagicMock(), tmp_path] + mock_path.return_value = mock_file_path + + platforms = config._list_target_platforms() + + assert isinstance(platforms, list) + # Should include platform components + assert "esp32" in platforms + assert "esp8266" in platforms + assert "rp2040" in platforms + assert "libretiny" in platforms + assert "host" in platforms + # Should not include non-platform components + assert "sensor" not in platforms + assert "README.md" not in platforms + assert "no_init" not in platforms + + +def test_is_target_platform() -> None: + """Test _is_target_platform identifies valid platforms.""" + assert config._is_target_platform("esp32") is True + assert config._is_target_platform("esp8266") is True + assert config._is_target_platform("rp2040") is True + assert config._is_target_platform("invalid_platform") is False + assert config._is_target_platform("api") is False # Component but not platform diff --git a/tests/unit_tests/test_platformio_api.py b/tests/unit_tests/test_platformio_api.py index a1fa963e51..7c7883d391 100644 --- a/tests/unit_tests/test_platformio_api.py +++ b/tests/unit_tests/test_platformio_api.py @@ -1,10 +1,16 @@ """Tests for platformio_api.py path functions.""" +import json +import os from pathlib import Path -from unittest.mock import patch +import shutil +from types import SimpleNamespace +from unittest.mock import MagicMock, Mock, patch + +import pytest from esphome import platformio_api -from esphome.core import CORE +from esphome.core import CORE, EsphomeError def test_idedata_firmware_elf_path(setup_core: Path) -> None: @@ -104,7 +110,9 @@ def test_flash_image_dataclass() -> None: assert image.offset == "0x10000" -def test_load_idedata_returns_dict(setup_core: Path) -> None: +def test_load_idedata_returns_dict( + setup_core: Path, mock_run_platformio_cli_run +) -> None: """Test _load_idedata returns parsed idedata dict when successful.""" CORE.build_path = str(setup_core / "build" / "test") CORE.name = "test" @@ -118,12 +126,511 @@ def test_load_idedata_returns_dict(setup_core: Path) -> None: idedata_path.parent.mkdir(parents=True, exist_ok=True) idedata_path.write_text('{"prog_path": "/test/firmware.elf"}') - with patch("esphome.platformio_api.run_platformio_cli_run") as mock_run: - mock_run.return_value = '{"prog_path": "/test/firmware.elf"}' + mock_run_platformio_cli_run.return_value = '{"prog_path": "/test/firmware.elf"}' - config = {"name": "test"} - result = platformio_api._load_idedata(config) + config = {"name": "test"} + result = platformio_api._load_idedata(config) assert result is not None assert isinstance(result, dict) assert result["prog_path"] == "/test/firmware.elf" + + +def test_load_idedata_uses_cache_when_valid( + setup_core: Path, mock_run_platformio_cli_run: Mock +) -> None: + """Test _load_idedata uses cached data when unchanged.""" + CORE.build_path = str(setup_core / "build" / "test") + CORE.name = "test" + + # Create platformio.ini + platformio_ini = setup_core / "build" / "test" / "platformio.ini" + platformio_ini.parent.mkdir(parents=True, exist_ok=True) + platformio_ini.write_text("content") + + # Create idedata cache file that's newer + idedata_path = setup_core / ".esphome" / "idedata" / "test.json" + idedata_path.parent.mkdir(parents=True, exist_ok=True) + idedata_path.write_text('{"prog_path": "/cached/firmware.elf"}') + + # Make idedata newer than platformio.ini + platformio_ini_mtime = platformio_ini.stat().st_mtime + os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1)) + + config = {"name": "test"} + result = platformio_api._load_idedata(config) + + # Should not call _run_idedata since cache is valid + mock_run_platformio_cli_run.assert_not_called() + + assert result["prog_path"] == "/cached/firmware.elf" + + +def test_load_idedata_regenerates_when_platformio_ini_newer( + setup_core: Path, mock_run_platformio_cli_run: Mock +) -> None: + """Test _load_idedata regenerates when platformio.ini is newer.""" + CORE.build_path = str(setup_core / "build" / "test") + CORE.name = "test" + + # Create idedata cache file first + idedata_path = setup_core / ".esphome" / "idedata" / "test.json" + idedata_path.parent.mkdir(parents=True, exist_ok=True) + idedata_path.write_text('{"prog_path": "/old/firmware.elf"}') + + # Create platformio.ini that's newer + idedata_mtime = idedata_path.stat().st_mtime + platformio_ini = setup_core / "build" / "test" / "platformio.ini" + platformio_ini.parent.mkdir(parents=True, exist_ok=True) + platformio_ini.write_text("content") + # Make platformio.ini newer than idedata + os.utime(platformio_ini, (idedata_mtime + 1, idedata_mtime + 1)) + + # Mock platformio to return new data + new_data = {"prog_path": "/new/firmware.elf"} + mock_run_platformio_cli_run.return_value = json.dumps(new_data) + + config = {"name": "test"} + result = platformio_api._load_idedata(config) + + # Should call _run_idedata since platformio.ini is newer + mock_run_platformio_cli_run.assert_called_once() + + assert result["prog_path"] == "/new/firmware.elf" + + +def test_load_idedata_regenerates_on_corrupted_cache( + setup_core: Path, mock_run_platformio_cli_run: Mock +) -> None: + """Test _load_idedata regenerates when cache file is corrupted.""" + CORE.build_path = str(setup_core / "build" / "test") + CORE.name = "test" + + # Create platformio.ini + platformio_ini = setup_core / "build" / "test" / "platformio.ini" + platformio_ini.parent.mkdir(parents=True, exist_ok=True) + platformio_ini.write_text("content") + + # Create corrupted idedata cache file + idedata_path = setup_core / ".esphome" / "idedata" / "test.json" + idedata_path.parent.mkdir(parents=True, exist_ok=True) + idedata_path.write_text('{"prog_path": invalid json') + + # Make idedata newer so it would be used if valid + platformio_ini_mtime = platformio_ini.stat().st_mtime + os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1)) + + # Mock platformio to return new data + new_data = {"prog_path": "/new/firmware.elf"} + mock_run_platformio_cli_run.return_value = json.dumps(new_data) + + config = {"name": "test"} + result = platformio_api._load_idedata(config) + + # Should call _run_idedata since cache is corrupted + mock_run_platformio_cli_run.assert_called_once() + + assert result["prog_path"] == "/new/firmware.elf" + + +def test_run_idedata_parses_json_from_output( + setup_core: Path, mock_run_platformio_cli_run: Mock +) -> None: + """Test _run_idedata extracts JSON from platformio output.""" + config = {"name": "test"} + + expected_data = { + "prog_path": "/path/to/firmware.elf", + "cc_path": "/path/to/gcc", + "extra": {"flash_images": []}, + } + + # Simulate platformio output with JSON embedded + mock_run_platformio_cli_run.return_value = ( + f"Some preamble\n{json.dumps(expected_data)}\nSome postamble" + ) + + result = platformio_api._run_idedata(config) + + assert result == expected_data + + +def test_run_idedata_raises_on_no_json( + setup_core: Path, mock_run_platformio_cli_run: Mock +) -> None: + """Test _run_idedata raises EsphomeError when no JSON found.""" + config = {"name": "test"} + + mock_run_platformio_cli_run.return_value = "No JSON in this output" + + with pytest.raises(EsphomeError): + platformio_api._run_idedata(config) + + +def test_run_idedata_raises_on_invalid_json( + setup_core: Path, mock_run_platformio_cli_run: Mock +) -> None: + """Test _run_idedata raises on malformed JSON.""" + config = {"name": "test"} + mock_run_platformio_cli_run.return_value = '{"invalid": json"}' + + # The ValueError from json.loads is re-raised + with pytest.raises(ValueError): + platformio_api._run_idedata(config) + + +def test_run_platformio_cli_sets_environment_variables( + setup_core: Path, mock_run_external_command: Mock +) -> None: + """Test run_platformio_cli sets correct environment variables.""" + CORE.build_path = str(setup_core / "build" / "test") + + with patch.dict(os.environ, {}, clear=False): + mock_run_external_command.return_value = 0 + platformio_api.run_platformio_cli("test", "arg") + + # Check environment variables were set + assert os.environ["PLATFORMIO_FORCE_COLOR"] == "true" + assert ( + setup_core / "build" / "test" + in Path(os.environ["PLATFORMIO_BUILD_DIR"]).parents + or Path(os.environ["PLATFORMIO_BUILD_DIR"]) == setup_core / "build" / "test" + ) + assert "PLATFORMIO_LIBDEPS_DIR" in os.environ + assert "PYTHONWARNINGS" in os.environ + + # Check command was called correctly + mock_run_external_command.assert_called_once() + args = mock_run_external_command.call_args[0] + assert "platformio" in args + assert "test" in args + assert "arg" in args + + +def test_run_platformio_cli_run_builds_command( + setup_core: Path, mock_run_platformio_cli: Mock +) -> None: + """Test run_platformio_cli_run builds correct command.""" + CORE.build_path = str(setup_core / "build" / "test") + mock_run_platformio_cli.return_value = 0 + + config = {"name": "test"} + platformio_api.run_platformio_cli_run(config, True, "extra", "args") + + mock_run_platformio_cli.assert_called_once_with( + "run", "-d", CORE.build_path, "-v", "extra", "args" + ) + + +def test_run_compile(setup_core: Path, mock_run_platformio_cli_run: Mock) -> None: + """Test run_compile with process limit.""" + from esphome.const import CONF_COMPILE_PROCESS_LIMIT, CONF_ESPHOME + + CORE.build_path = str(setup_core / "build" / "test") + config = {CONF_ESPHOME: {CONF_COMPILE_PROCESS_LIMIT: 4}} + mock_run_platformio_cli_run.return_value = 0 + + platformio_api.run_compile(config, verbose=True) + + mock_run_platformio_cli_run.assert_called_once_with(config, True, "-j4") + + +def test_get_idedata_caches_result( + setup_core: Path, mock_run_platformio_cli_run: Mock +) -> None: + """Test get_idedata caches result in CORE.data.""" + from esphome.const import KEY_CORE + + CORE.build_path = str(setup_core / "build" / "test") + CORE.name = "test" + CORE.data[KEY_CORE] = {} + + # Create platformio.ini to avoid regeneration + platformio_ini = setup_core / "build" / "test" / "platformio.ini" + platformio_ini.parent.mkdir(parents=True, exist_ok=True) + platformio_ini.write_text("content") + + # Mock platformio to return data + idedata = {"prog_path": "/test/firmware.elf"} + mock_run_platformio_cli_run.return_value = json.dumps(idedata) + + config = {"name": "test"} + + # First call should load and cache + result1 = platformio_api.get_idedata(config) + mock_run_platformio_cli_run.assert_called_once() + + # Second call should use cache from CORE.data + result2 = platformio_api.get_idedata(config) + mock_run_platformio_cli_run.assert_called_once() # Still only called once + + assert result1 is result2 + assert isinstance(result1, platformio_api.IDEData) + assert result1.firmware_elf_path == "/test/firmware.elf" + + +def test_idedata_addr2line_path_windows(setup_core: Path) -> None: + """Test IDEData.addr2line_path on Windows.""" + raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "C:\\tools\\gcc.exe"} + idedata = platformio_api.IDEData(raw_data) + + result = idedata.addr2line_path + assert result == "C:\\tools\\addr2line.exe" + + +def test_idedata_addr2line_path_unix(setup_core: Path) -> None: + """Test IDEData.addr2line_path on Unix.""" + raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "/usr/bin/gcc"} + idedata = platformio_api.IDEData(raw_data) + + result = idedata.addr2line_path + assert result == "/usr/bin/addr2line" + + +def test_patch_structhash(setup_core: Path) -> None: + """Test patch_structhash monkey patches platformio functions.""" + # Create simple namespace objects to act as modules + mock_cli = SimpleNamespace() + mock_helpers = SimpleNamespace() + mock_run = SimpleNamespace(cli=mock_cli, helpers=mock_helpers) + + # Mock platformio modules + with patch.dict( + "sys.modules", + { + "platformio.run.cli": mock_cli, + "platformio.run.helpers": mock_helpers, + "platformio.run": mock_run, + "platformio.project.helpers": MagicMock(), + "platformio.fs": MagicMock(), + "platformio": MagicMock(), + }, + ): + # Call patch_structhash + platformio_api.patch_structhash() + + # Verify both modules had clean_build_dir patched + # Check that clean_build_dir was set on both modules + assert hasattr(mock_cli, "clean_build_dir") + assert hasattr(mock_helpers, "clean_build_dir") + + # Verify they got the same function assigned + assert mock_cli.clean_build_dir is mock_helpers.clean_build_dir + + # Verify it's a real function (not a Mock) + assert callable(mock_cli.clean_build_dir) + assert mock_cli.clean_build_dir.__name__ == "patched_clean_build_dir" + + +def test_patched_clean_build_dir_removes_outdated(setup_core: Path) -> None: + """Test patched_clean_build_dir removes build dir when platformio.ini is newer.""" + build_dir = setup_core / "build" + build_dir.mkdir() + platformio_ini = setup_core / "platformio.ini" + platformio_ini.write_text("config") + + # Make platformio.ini newer than build_dir + build_mtime = build_dir.stat().st_mtime + os.utime(platformio_ini, (build_mtime + 1, build_mtime + 1)) + + # Track if directory was removed + removed_paths: list[str] = [] + + def track_rmtree(path: str) -> None: + removed_paths.append(path) + shutil.rmtree(path) + + # Create mock modules that patch_structhash expects + mock_cli = SimpleNamespace() + mock_helpers = SimpleNamespace() + mock_project_helpers = MagicMock() + mock_project_helpers.get_project_dir.return_value = str(setup_core) + mock_fs = SimpleNamespace(rmtree=track_rmtree) + + with patch.dict( + "sys.modules", + { + "platformio": SimpleNamespace(fs=mock_fs), + "platformio.fs": mock_fs, + "platformio.project.helpers": mock_project_helpers, + "platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers), + "platformio.run.cli": mock_cli, + "platformio.run.helpers": mock_helpers, + }, + ): + # Call patch_structhash to install the patched function + platformio_api.patch_structhash() + + # Call the patched function + mock_helpers.clean_build_dir(str(build_dir), []) + + # Verify directory was removed and recreated + assert len(removed_paths) == 1 + assert removed_paths[0] == str(build_dir) + assert build_dir.exists() # makedirs recreated it + + +def test_patched_clean_build_dir_keeps_updated(setup_core: Path) -> None: + """Test patched_clean_build_dir keeps build dir when it's up to date.""" + build_dir = setup_core / "build" + build_dir.mkdir() + test_file = build_dir / "test.txt" + test_file.write_text("test content") + + platformio_ini = setup_core / "platformio.ini" + platformio_ini.write_text("config") + + # Make build_dir newer than platformio.ini + ini_mtime = platformio_ini.stat().st_mtime + os.utime(build_dir, (ini_mtime + 1, ini_mtime + 1)) + + # Track if rmtree is called + removed_paths: list[str] = [] + + def track_rmtree(path: str) -> None: + removed_paths.append(path) + + # Create mock modules + mock_cli = SimpleNamespace() + mock_helpers = SimpleNamespace() + mock_project_helpers = MagicMock() + mock_project_helpers.get_project_dir.return_value = str(setup_core) + mock_fs = SimpleNamespace(rmtree=track_rmtree) + + with patch.dict( + "sys.modules", + { + "platformio": SimpleNamespace(fs=mock_fs), + "platformio.fs": mock_fs, + "platformio.project.helpers": mock_project_helpers, + "platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers), + "platformio.run.cli": mock_cli, + "platformio.run.helpers": mock_helpers, + }, + ): + # Call patch_structhash to install the patched function + platformio_api.patch_structhash() + + # Call the patched function + mock_helpers.clean_build_dir(str(build_dir), []) + + # Verify rmtree was NOT called + assert len(removed_paths) == 0 + + # Verify directory and file still exist + assert build_dir.exists() + assert test_file.exists() + assert test_file.read_text() == "test content" + + +def test_patched_clean_build_dir_creates_missing(setup_core: Path) -> None: + """Test patched_clean_build_dir creates build dir when it doesn't exist.""" + build_dir = setup_core / "build" + platformio_ini = setup_core / "platformio.ini" + platformio_ini.write_text("config") + + # Ensure build_dir doesn't exist + assert not build_dir.exists() + + # Track if rmtree is called + removed_paths: list[str] = [] + + def track_rmtree(path: str) -> None: + removed_paths.append(path) + + # Create mock modules + mock_cli = SimpleNamespace() + mock_helpers = SimpleNamespace() + mock_project_helpers = MagicMock() + mock_project_helpers.get_project_dir.return_value = str(setup_core) + mock_fs = SimpleNamespace(rmtree=track_rmtree) + + with patch.dict( + "sys.modules", + { + "platformio": SimpleNamespace(fs=mock_fs), + "platformio.fs": mock_fs, + "platformio.project.helpers": mock_project_helpers, + "platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers), + "platformio.run.cli": mock_cli, + "platformio.run.helpers": mock_helpers, + }, + ): + # Call patch_structhash to install the patched function + platformio_api.patch_structhash() + + # Call the patched function + mock_helpers.clean_build_dir(str(build_dir), []) + + # Verify rmtree was NOT called + assert len(removed_paths) == 0 + + # Verify directory was created + assert build_dir.exists() + + +def test_process_stacktrace_esp8266_exception(setup_core: Path, caplog) -> None: + """Test process_stacktrace handles ESP8266 exceptions.""" + config = {"name": "test"} + + # Test exception type parsing + line = "Exception (28):" + backtrace_state = False + + result = platformio_api.process_stacktrace(config, line, backtrace_state) + + assert "Access to invalid address: LOAD (wild pointer?)" in caplog.text + assert result is False + + +def test_process_stacktrace_esp8266_backtrace( + setup_core: Path, mock_decode_pc: Mock +) -> None: + """Test process_stacktrace handles ESP8266 multi-line backtrace.""" + config = {"name": "test"} + + # Start of backtrace + line1 = ">>>stack>>>" + state = platformio_api.process_stacktrace(config, line1, False) + assert state is True + + # Backtrace content with addresses + line2 = "40201234 40205678" + state = platformio_api.process_stacktrace(config, line2, state) + assert state is True + assert mock_decode_pc.call_count == 2 + + # End of backtrace + line3 = "<< None: + """Test process_stacktrace handles ESP32 single-line backtrace.""" + config = {"name": "test"} + + line = "Backtrace: 0x40081234:0x3ffb1234 0x40085678:0x3ffb5678" + state = platformio_api.process_stacktrace(config, line, False) + + # Should decode both addresses + assert mock_decode_pc.call_count == 2 + mock_decode_pc.assert_any_call(config, "40081234") + mock_decode_pc.assert_any_call(config, "40085678") + assert state is False + + +def test_process_stacktrace_bad_alloc( + setup_core: Path, mock_decode_pc: Mock, caplog +) -> None: + """Test process_stacktrace handles bad alloc messages.""" + config = {"name": "test"} + + line = "last failed alloc call: 40201234(512)" + state = platformio_api.process_stacktrace(config, line, False) + + assert "Memory allocation of 512 bytes failed at 40201234" in caplog.text + mock_decode_pc.assert_called_once_with(config, "40201234") + assert state is False diff --git a/tests/unit_tests/test_storage_json.py b/tests/unit_tests/test_storage_json.py index 52de327bbc..e1abe565b1 100644 --- a/tests/unit_tests/test_storage_json.py +++ b/tests/unit_tests/test_storage_json.py @@ -1,12 +1,15 @@ """Tests for storage_json.py path functions.""" +from datetime import datetime +import json from pathlib import Path import sys -from unittest.mock import patch +from unittest.mock import MagicMock, Mock, patch import pytest from esphome import storage_json +from esphome.const import CONF_DISABLED, CONF_MDNS from esphome.core import CORE @@ -115,7 +118,9 @@ def test_storage_json_firmware_bin_path_property(setup_core: Path) -> None: assert storage.firmware_bin_path == "/path/to/firmware.bin" -def test_storage_json_save_creates_directory(setup_core: Path, tmp_path: Path) -> None: +def test_storage_json_save_creates_directory( + setup_core: Path, tmp_path: Path, mock_write_file_if_changed: Mock +) -> None: """Test StorageJSON.save creates storage directory if it doesn't exist.""" storage_dir = tmp_path / "new_data" / "storage" storage_file = storage_dir / "test.json" @@ -139,11 +144,10 @@ def test_storage_json_save_creates_directory(setup_core: Path, tmp_path: Path) - no_mdns=False, ) - with patch("esphome.storage_json.write_file_if_changed") as mock_write: - storage.save(str(storage_file)) - mock_write.assert_called_once() - call_args = mock_write.call_args[0] - assert call_args[0] == str(storage_file) + storage.save(str(storage_file)) + mock_write_file_if_changed.assert_called_once() + call_args = mock_write_file_if_changed.call_args[0] + assert call_args[0] == str(storage_file) def test_storage_json_from_wizard(setup_core: Path) -> None: @@ -180,3 +184,477 @@ def test_storage_paths_with_ha_addon(mock_is_ha_addon: bool, tmp_path: Path) -> result = storage_json.esphome_storage_path() expected = str(Path("/data") / "esphome.json") assert result == expected + + +def test_storage_json_as_dict() -> None: + """Test StorageJSON.as_dict returns correct dictionary.""" + storage = storage_json.StorageJSON( + storage_version=1, + name="test_device", + friendly_name="Test Device", + comment="Test comment", + esphome_version="2024.1.0", + src_version=1, + address="192.168.1.100", + web_port=80, + target_platform="ESP32", + build_path="/path/to/build", + firmware_bin_path="/path/to/firmware.bin", + loaded_integrations={"wifi", "api", "ota"}, + loaded_platforms={"sensor", "binary_sensor"}, + no_mdns=True, + framework="arduino", + core_platform="esp32", + ) + + result = storage.as_dict() + + assert result["storage_version"] == 1 + assert result["name"] == "test_device" + assert result["friendly_name"] == "Test Device" + assert result["comment"] == "Test comment" + assert result["esphome_version"] == "2024.1.0" + assert result["src_version"] == 1 + assert result["address"] == "192.168.1.100" + assert result["web_port"] == 80 + assert result["esp_platform"] == "ESP32" + assert result["build_path"] == "/path/to/build" + assert result["firmware_bin_path"] == "/path/to/firmware.bin" + assert "api" in result["loaded_integrations"] + assert "wifi" in result["loaded_integrations"] + assert "ota" in result["loaded_integrations"] + assert result["loaded_integrations"] == sorted( + ["wifi", "api", "ota"] + ) # Should be sorted + assert "sensor" in result["loaded_platforms"] + assert result["loaded_platforms"] == sorted( + ["sensor", "binary_sensor"] + ) # Should be sorted + assert result["no_mdns"] is True + assert result["framework"] == "arduino" + assert result["core_platform"] == "esp32" + + +def test_storage_json_to_json() -> None: + """Test StorageJSON.to_json returns valid JSON string.""" + storage = storage_json.StorageJSON( + storage_version=1, + name="test", + friendly_name="Test", + comment=None, + esphome_version="2024.1.0", + src_version=None, + address="test.local", + web_port=None, + target_platform="ESP8266", + build_path=None, + firmware_bin_path=None, + loaded_integrations=set(), + loaded_platforms=set(), + no_mdns=False, + ) + + json_str = storage.to_json() + + # Should be valid JSON + parsed = json.loads(json_str) + assert parsed["name"] == "test" + assert parsed["storage_version"] == 1 + + # Should end with newline + assert json_str.endswith("\n") + + +def test_storage_json_save(tmp_path: Path) -> None: + """Test StorageJSON.save writes file correctly.""" + storage = storage_json.StorageJSON( + storage_version=1, + name="test", + friendly_name="Test", + comment=None, + esphome_version="2024.1.0", + src_version=None, + address="test.local", + web_port=None, + target_platform="ESP32", + build_path=None, + firmware_bin_path=None, + loaded_integrations=set(), + loaded_platforms=set(), + no_mdns=False, + ) + + save_path = tmp_path / "test.json" + + with patch("esphome.storage_json.write_file_if_changed") as mock_write: + storage.save(str(save_path)) + mock_write.assert_called_once_with(str(save_path), storage.to_json()) + + +def test_storage_json_from_esphome_core(setup_core: Path) -> None: + """Test StorageJSON.from_esphome_core creates correct storage object.""" + # Mock CORE object + mock_core = MagicMock() + mock_core.name = "my_device" + mock_core.friendly_name = "My Device" + mock_core.comment = "A test device" + mock_core.address = "192.168.1.50" + mock_core.web_port = 8080 + mock_core.target_platform = "esp32" + mock_core.is_esp32 = True + mock_core.build_path = "/build/my_device" + mock_core.firmware_bin = "/build/my_device/firmware.bin" + mock_core.loaded_integrations = {"wifi", "api"} + mock_core.loaded_platforms = {"sensor"} + mock_core.config = {CONF_MDNS: {CONF_DISABLED: True}} + mock_core.target_framework = "esp-idf" + + with patch("esphome.components.esp32.get_esp32_variant") as mock_variant: + mock_variant.return_value = "ESP32-C3" + + result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None) + + assert result.name == "my_device" + assert result.friendly_name == "My Device" + assert result.comment == "A test device" + assert result.address == "192.168.1.50" + assert result.web_port == 8080 + assert result.target_platform == "ESP32-C3" + assert result.build_path == "/build/my_device" + assert result.firmware_bin_path == "/build/my_device/firmware.bin" + assert result.loaded_integrations == {"wifi", "api"} + assert result.loaded_platforms == {"sensor"} + assert result.no_mdns is True + assert result.framework == "esp-idf" + assert result.core_platform == "esp32" + + +def test_storage_json_from_esphome_core_mdns_enabled(setup_core: Path) -> None: + """Test from_esphome_core with mDNS enabled.""" + mock_core = MagicMock() + mock_core.name = "test" + mock_core.friendly_name = "Test" + mock_core.comment = None + mock_core.address = "test.local" + mock_core.web_port = None + mock_core.target_platform = "esp8266" + mock_core.is_esp32 = False + mock_core.build_path = "/build" + mock_core.firmware_bin = "/build/firmware.bin" + mock_core.loaded_integrations = set() + mock_core.loaded_platforms = set() + mock_core.config = {} # No MDNS config means enabled + mock_core.target_framework = "arduino" + + result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None) + + assert result.no_mdns is False + + +def test_storage_json_load_valid_file(tmp_path: Path) -> None: + """Test StorageJSON.load with valid JSON file.""" + storage_data = { + "storage_version": 1, + "name": "loaded_device", + "friendly_name": "Loaded Device", + "comment": "Loaded from file", + "esphome_version": "2024.1.0", + "src_version": 2, + "address": "10.0.0.1", + "web_port": 8080, + "esp_platform": "ESP32", + "build_path": "/loaded/build", + "firmware_bin_path": "/loaded/firmware.bin", + "loaded_integrations": ["wifi", "api"], + "loaded_platforms": ["sensor"], + "no_mdns": True, + "framework": "arduino", + "core_platform": "esp32", + } + + file_path = tmp_path / "storage.json" + file_path.write_text(json.dumps(storage_data)) + + result = storage_json.StorageJSON.load(str(file_path)) + + assert result is not None + assert result.name == "loaded_device" + assert result.friendly_name == "Loaded Device" + assert result.comment == "Loaded from file" + assert result.esphome_version == "2024.1.0" + assert result.src_version == 2 + assert result.address == "10.0.0.1" + assert result.web_port == 8080 + assert result.target_platform == "ESP32" + assert result.build_path == "/loaded/build" + assert result.firmware_bin_path == "/loaded/firmware.bin" + assert result.loaded_integrations == {"wifi", "api"} + assert result.loaded_platforms == {"sensor"} + assert result.no_mdns is True + assert result.framework == "arduino" + assert result.core_platform == "esp32" + + +def test_storage_json_load_invalid_file(tmp_path: Path) -> None: + """Test StorageJSON.load with invalid JSON file.""" + file_path = tmp_path / "invalid.json" + file_path.write_text("not valid json{") + + result = storage_json.StorageJSON.load(str(file_path)) + + assert result is None + + +def test_storage_json_load_nonexistent_file() -> None: + """Test StorageJSON.load with non-existent file.""" + result = storage_json.StorageJSON.load("/nonexistent/file.json") + + assert result is None + + +def test_storage_json_equality() -> None: + """Test StorageJSON equality comparison.""" + storage1 = storage_json.StorageJSON( + storage_version=1, + name="test", + friendly_name="Test", + comment=None, + esphome_version="2024.1.0", + src_version=1, + address="test.local", + web_port=80, + target_platform="ESP32", + build_path="/build", + firmware_bin_path="/firmware.bin", + loaded_integrations={"wifi"}, + loaded_platforms=set(), + no_mdns=False, + ) + + storage2 = storage_json.StorageJSON( + storage_version=1, + name="test", + friendly_name="Test", + comment=None, + esphome_version="2024.1.0", + src_version=1, + address="test.local", + web_port=80, + target_platform="ESP32", + build_path="/build", + firmware_bin_path="/firmware.bin", + loaded_integrations={"wifi"}, + loaded_platforms=set(), + no_mdns=False, + ) + + storage3 = storage_json.StorageJSON( + storage_version=1, + name="different", # Different name + friendly_name="Test", + comment=None, + esphome_version="2024.1.0", + src_version=1, + address="test.local", + web_port=80, + target_platform="ESP32", + build_path="/build", + firmware_bin_path="/firmware.bin", + loaded_integrations={"wifi"}, + loaded_platforms=set(), + no_mdns=False, + ) + + assert storage1 == storage2 + assert storage1 != storage3 + assert storage1 != "not a storage object" + + +def test_esphome_storage_json_as_dict() -> None: + """Test EsphomeStorageJSON.as_dict returns correct dictionary.""" + storage = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="secret123", + last_update_check="2024-01-15T10:30:00", + remote_version="2024.1.1", + ) + + result = storage.as_dict() + + assert result["storage_version"] == 1 + assert result["cookie_secret"] == "secret123" + assert result["last_update_check"] == "2024-01-15T10:30:00" + assert result["remote_version"] == "2024.1.1" + + +def test_esphome_storage_json_last_update_check_property() -> None: + """Test EsphomeStorageJSON.last_update_check property.""" + storage = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="secret", + last_update_check="2024-01-15T10:30:00", + remote_version=None, + ) + + # Test getter + result = storage.last_update_check + assert isinstance(result, datetime) + assert result.year == 2024 + assert result.month == 1 + assert result.day == 15 + assert result.hour == 10 + assert result.minute == 30 + + # Test setter + new_date = datetime(2024, 2, 20, 15, 45, 30) + storage.last_update_check = new_date + assert storage.last_update_check_str == "2024-02-20T15:45:30" + + +def test_esphome_storage_json_last_update_check_invalid() -> None: + """Test EsphomeStorageJSON.last_update_check with invalid date.""" + storage = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="secret", + last_update_check="invalid date", + remote_version=None, + ) + + result = storage.last_update_check + assert result is None + + +def test_esphome_storage_json_to_json() -> None: + """Test EsphomeStorageJSON.to_json returns valid JSON string.""" + storage = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="mysecret", + last_update_check="2024-01-15T10:30:00", + remote_version="2024.1.1", + ) + + json_str = storage.to_json() + + # Should be valid JSON + parsed = json.loads(json_str) + assert parsed["cookie_secret"] == "mysecret" + assert parsed["storage_version"] == 1 + + # Should end with newline + assert json_str.endswith("\n") + + +def test_esphome_storage_json_save(tmp_path: Path) -> None: + """Test EsphomeStorageJSON.save writes file correctly.""" + storage = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="secret", + last_update_check=None, + remote_version=None, + ) + + save_path = tmp_path / "esphome.json" + + with patch("esphome.storage_json.write_file_if_changed") as mock_write: + storage.save(str(save_path)) + mock_write.assert_called_once_with(str(save_path), storage.to_json()) + + +def test_esphome_storage_json_load_valid_file(tmp_path: Path) -> None: + """Test EsphomeStorageJSON.load with valid JSON file.""" + storage_data = { + "storage_version": 1, + "cookie_secret": "loaded_secret", + "last_update_check": "2024-01-20T14:30:00", + "remote_version": "2024.1.2", + } + + file_path = tmp_path / "esphome.json" + file_path.write_text(json.dumps(storage_data)) + + result = storage_json.EsphomeStorageJSON.load(str(file_path)) + + assert result is not None + assert result.storage_version == 1 + assert result.cookie_secret == "loaded_secret" + assert result.last_update_check_str == "2024-01-20T14:30:00" + assert result.remote_version == "2024.1.2" + + +def test_esphome_storage_json_load_invalid_file(tmp_path: Path) -> None: + """Test EsphomeStorageJSON.load with invalid JSON file.""" + file_path = tmp_path / "invalid.json" + file_path.write_text("not valid json{") + + result = storage_json.EsphomeStorageJSON.load(str(file_path)) + + assert result is None + + +def test_esphome_storage_json_load_nonexistent_file() -> None: + """Test EsphomeStorageJSON.load with non-existent file.""" + result = storage_json.EsphomeStorageJSON.load("/nonexistent/file.json") + + assert result is None + + +def test_esphome_storage_json_get_default() -> None: + """Test EsphomeStorageJSON.get_default creates default storage.""" + with patch("esphome.storage_json.os.urandom") as mock_urandom: + # Mock urandom to return predictable bytes + mock_urandom.return_value = b"test" * 16 # 64 bytes + + result = storage_json.EsphomeStorageJSON.get_default() + + assert result.storage_version == 1 + assert len(result.cookie_secret) == 128 # 64 bytes hex = 128 chars + assert result.last_update_check is None + assert result.remote_version is None + + +def test_esphome_storage_json_equality() -> None: + """Test EsphomeStorageJSON equality comparison.""" + storage1 = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="secret", + last_update_check="2024-01-15T10:30:00", + remote_version="2024.1.1", + ) + + storage2 = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="secret", + last_update_check="2024-01-15T10:30:00", + remote_version="2024.1.1", + ) + + storage3 = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="different", # Different secret + last_update_check="2024-01-15T10:30:00", + remote_version="2024.1.1", + ) + + assert storage1 == storage2 + assert storage1 != storage3 + assert storage1 != "not a storage object" + + +def test_storage_json_load_legacy_esphomeyaml_version(tmp_path: Path) -> None: + """Test loading storage with legacy esphomeyaml_version field.""" + storage_data = { + "storage_version": 1, + "name": "legacy_device", + "friendly_name": "Legacy Device", + "esphomeyaml_version": "1.14.0", # Legacy field name + "address": "legacy.local", + "esp_platform": "ESP8266", + } + + file_path = tmp_path / "legacy.json" + file_path.write_text(json.dumps(storage_data)) + + result = storage_json.StorageJSON.load(str(file_path)) + + assert result is not None + assert result.esphome_version == "1.14.0" # Should map to esphome_version