diff --git a/Doxyfile b/Doxyfile index d35c01b144..96faf3a1e0 100644 --- a/Doxyfile +++ b/Doxyfile @@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 2025.9.0b2 +PROJECT_NUMBER = 2025.9.0b3 # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a diff --git a/esphome/components/ade7880/ade7880.cpp b/esphome/components/ade7880/ade7880.cpp index 55f834bf86..fd560e0676 100644 --- a/esphome/components/ade7880/ade7880.cpp +++ b/esphome/components/ade7880/ade7880.cpp @@ -113,7 +113,7 @@ void ADE7880::update() { if (this->channel_a_ != nullptr) { auto *chan = this->channel_a_; this->update_sensor_from_s24zp_register16_(chan->current, AIRMS, [](float val) { return val / 100000.0f; }); - this->update_sensor_from_s24zp_register16_(chan->voltage, BVRMS, [](float val) { return val / 10000.0f; }); + this->update_sensor_from_s24zp_register16_(chan->voltage, AVRMS, [](float val) { return val / 10000.0f; }); this->update_sensor_from_s24zp_register16_(chan->active_power, AWATT, [](float val) { return val / 100.0f; }); this->update_sensor_from_s24zp_register16_(chan->apparent_power, AVA, [](float val) { return val / 100.0f; }); this->update_sensor_from_s16_register16_(chan->power_factor, APF, diff --git a/esphome/components/ethernet/__init__.py b/esphome/components/ethernet/__init__.py index a26238553c..151da7d0e5 100644 --- a/esphome/components/ethernet/__init__.py +++ b/esphome/components/ethernet/__init__.py @@ -77,6 +77,13 @@ ETHERNET_TYPES = { "DM9051": EthernetType.ETHERNET_TYPE_DM9051, } +# PHY types that need compile-time defines for conditional compilation +_PHY_TYPE_TO_DEFINE = { + "KSZ8081": "USE_ETHERNET_KSZ8081", + "KSZ8081RNA": "USE_ETHERNET_KSZ8081", + # Add other PHY types here only if they need conditional compilation +} + SPI_ETHERNET_TYPES = ["W5500", "DM9051"] SPI_ETHERNET_DEFAULT_POLLING_INTERVAL = TimePeriodMilliseconds(milliseconds=10) @@ -345,6 +352,10 @@ async def to_code(config): if CONF_MANUAL_IP in config: cg.add(var.set_manual_ip(manual_ip(config[CONF_MANUAL_IP]))) + # Add compile-time define for PHY types with specific code + if phy_define := _PHY_TYPE_TO_DEFINE.get(config[CONF_TYPE]): + cg.add_define(phy_define) + cg.add_define("USE_ETHERNET") # Disable WiFi when using Ethernet to save memory diff --git a/esphome/components/ethernet/ethernet_component.cpp b/esphome/components/ethernet/ethernet_component.cpp index a48fd27383..ff14d19427 100644 --- a/esphome/components/ethernet/ethernet_component.cpp +++ b/esphome/components/ethernet/ethernet_component.cpp @@ -229,10 +229,12 @@ void EthernetComponent::setup() { ESPHL_ERROR_CHECK(err, "ETH driver install error"); #ifndef USE_ETHERNET_SPI +#ifdef USE_ETHERNET_KSZ8081 if (this->type_ == ETHERNET_TYPE_KSZ8081RNA && this->clk_mode_ == EMAC_CLK_OUT) { // KSZ8081RNA default is incorrect. It expects a 25MHz clock instead of the 50MHz we provide. this->ksz8081_set_clock_reference_(mac); } +#endif // USE_ETHERNET_KSZ8081 for (const auto &phy_register : this->phy_registers_) { this->write_phy_register_(mac, phy_register); @@ -721,6 +723,7 @@ bool EthernetComponent::powerdown() { #ifndef USE_ETHERNET_SPI +#ifdef USE_ETHERNET_KSZ8081 constexpr uint8_t KSZ80XX_PC2R_REG_ADDR = 0x1F; void EthernetComponent::ksz8081_set_clock_reference_(esp_eth_mac_t *mac) { @@ -749,6 +752,7 @@ void EthernetComponent::ksz8081_set_clock_reference_(esp_eth_mac_t *mac) { ESP_LOGVV(TAG, "KSZ8081 PHY Control 2: %s", format_hex_pretty((u_int8_t *) &phy_control_2, 2).c_str()); } } +#endif // USE_ETHERNET_KSZ8081 void EthernetComponent::write_phy_register_(esp_eth_mac_t *mac, PHYRegister register_data) { esp_err_t err; diff --git a/esphome/components/ethernet/ethernet_component.h b/esphome/components/ethernet/ethernet_component.h index 3d2713ee5c..bbb9d7fb60 100644 --- a/esphome/components/ethernet/ethernet_component.h +++ b/esphome/components/ethernet/ethernet_component.h @@ -104,8 +104,10 @@ class EthernetComponent : public Component { void start_connect_(); void finish_connect_(); void dump_connect_params_(); +#ifdef USE_ETHERNET_KSZ8081 /// @brief Set `RMII Reference Clock Select` bit for KSZ8081. void ksz8081_set_clock_reference_(esp_eth_mac_t *mac); +#endif /// @brief Set arbitratry PHY registers from config. void write_phy_register_(esp_eth_mac_t *mac, PHYRegister register_data); diff --git a/esphome/components/mqtt/mqtt_client.cpp b/esphome/components/mqtt/mqtt_client.cpp index 7675280f1a..7ab6efd1a1 100644 --- a/esphome/components/mqtt/mqtt_client.cpp +++ b/esphome/components/mqtt/mqtt_client.cpp @@ -491,7 +491,7 @@ bool MQTTClientComponent::publish(const std::string &topic, const std::string &p bool MQTTClientComponent::publish(const std::string &topic, const char *payload, size_t payload_length, uint8_t qos, bool retain) { - return publish({.topic = topic, .payload = payload, .qos = qos, .retain = retain}); + return publish({.topic = topic, .payload = std::string(payload, payload_length), .qos = qos, .retain = retain}); } bool MQTTClientComponent::publish(const MQTTMessage &message) { diff --git a/esphome/components/select/select.cpp b/esphome/components/select/select.cpp index 37887da27c..beb72aa320 100644 --- a/esphome/components/select/select.cpp +++ b/esphome/components/select/select.cpp @@ -28,12 +28,12 @@ bool Select::has_option(const std::string &option) const { return this->index_of bool Select::has_index(size_t index) const { return index < this->size(); } size_t Select::size() const { - auto options = traits.get_options(); + const auto &options = traits.get_options(); return options.size(); } optional Select::index_of(const std::string &option) const { - auto options = traits.get_options(); + const auto &options = traits.get_options(); auto it = std::find(options.begin(), options.end(), option); if (it == options.end()) { return {}; @@ -51,7 +51,7 @@ optional Select::active_index() const { optional Select::at(size_t index) const { if (this->has_index(index)) { - auto options = traits.get_options(); + const auto &options = traits.get_options(); return options.at(index); } else { return {}; diff --git a/esphome/components/select/select_call.cpp b/esphome/components/select/select_call.cpp index 85f755645c..a8272f8622 100644 --- a/esphome/components/select/select_call.cpp +++ b/esphome/components/select/select_call.cpp @@ -45,7 +45,7 @@ void SelectCall::perform() { auto *parent = this->parent_; const auto *name = parent->get_name().c_str(); const auto &traits = parent->traits; - auto options = traits.get_options(); + const auto &options = traits.get_options(); if (this->operation_ == SELECT_OP_NONE) { ESP_LOGW(TAG, "'%s' - SelectCall performed without selecting an operation", name); diff --git a/esphome/const.py b/esphome/const.py index 03dc33df89..e23c919bce 100644 --- a/esphome/const.py +++ b/esphome/const.py @@ -4,7 +4,7 @@ from enum import Enum from esphome.enum import StrEnum -__version__ = "2025.9.0b2" +__version__ = "2025.9.0b3" ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_" VALID_SUBSTITUTIONS_CHARACTERS = ( diff --git a/esphome/core/defines.h b/esphome/core/defines.h index 9a7e090b83..6e8d5ed74c 100644 --- a/esphome/core/defines.h +++ b/esphome/core/defines.h @@ -175,6 +175,7 @@ #ifdef USE_ARDUINO #define USE_ARDUINO_VERSION_CODE VERSION_CODE(3, 2, 1) #define USE_ETHERNET +#define USE_ETHERNET_KSZ8081 #endif #ifdef USE_ESP_IDF diff --git a/esphome/dashboard/web_server.py b/esphome/dashboard/web_server.py index 294a180794..e6c5fd3d84 100644 --- a/esphome/dashboard/web_server.py +++ b/esphome/dashboard/web_server.py @@ -1038,12 +1038,9 @@ class ArchiveRequestHandler(BaseHandler): shutil.move(config_file, os.path.join(archive_path, configuration)) storage_json = StorageJSON.load(storage_path) - if storage_json is not None: + if storage_json is not None and storage_json.build_path: # Delete build folder (if exists) - name = storage_json.name - build_folder = os.path.join(settings.config_dir, name) - if build_folder is not None: - shutil.rmtree(build_folder, os.path.join(archive_path, name)) + shutil.rmtree(storage_json.build_path, ignore_errors=True) class UnArchiveRequestHandler(BaseHandler): diff --git a/tests/dashboard/test_web_server.py b/tests/dashboard/test_web_server.py index e206090ac0..1938617f20 100644 --- a/tests/dashboard/test_web_server.py +++ b/tests/dashboard/test_web_server.py @@ -589,7 +589,7 @@ async def test_archive_request_handler_post( mock_ext_storage_path: MagicMock, tmp_path: Path, ) -> None: - """Test ArchiveRequestHandler.post method.""" + """Test ArchiveRequestHandler.post method without storage_json.""" # Set up temp directories config_dir = Path(get_fixture_path("conf")) @@ -616,6 +616,97 @@ async def test_archive_request_handler_post( ).read_text() == "esphome:\n name: test_archive\n" +@pytest.mark.asyncio +async def test_archive_handler_with_build_folder( + dashboard: DashboardTestHelper, + mock_archive_storage_path: MagicMock, + mock_ext_storage_path: MagicMock, + mock_dashboard_settings: MagicMock, + mock_storage_json: MagicMock, + tmp_path: Path, +) -> None: + """Test ArchiveRequestHandler.post with storage_json and build folder.""" + config_dir = tmp_path / "config" + config_dir.mkdir() + archive_dir = tmp_path / "archive" + archive_dir.mkdir() + build_dir = tmp_path / "build" + build_dir.mkdir() + + configuration = "test_device.yaml" + test_config = config_dir / configuration + test_config.write_text("esphome:\n name: test_device\n") + + build_folder = build_dir / "test_device" + build_folder.mkdir() + (build_folder / "firmware.bin").write_text("binary content") + (build_folder / ".pioenvs").mkdir() + + mock_dashboard_settings.config_dir = str(config_dir) + mock_dashboard_settings.rel_path.return_value = str(test_config) + mock_archive_storage_path.return_value = str(archive_dir) + + mock_storage = MagicMock() + mock_storage.name = "test_device" + mock_storage.build_path = str(build_folder) + mock_storage_json.load.return_value = mock_storage + + response = await dashboard.fetch( + "/archive", + method="POST", + body=f"configuration={configuration}", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + assert response.code == 200 + + assert not test_config.exists() + assert (archive_dir / configuration).exists() + + assert not build_folder.exists() + assert not (archive_dir / "test_device").exists() + + +@pytest.mark.asyncio +async def test_archive_handler_no_build_folder( + dashboard: DashboardTestHelper, + mock_archive_storage_path: MagicMock, + mock_ext_storage_path: MagicMock, + mock_dashboard_settings: MagicMock, + mock_storage_json: MagicMock, + tmp_path: Path, +) -> None: + """Test ArchiveRequestHandler.post with storage_json but no build folder.""" + config_dir = tmp_path / "config" + config_dir.mkdir() + archive_dir = tmp_path / "archive" + archive_dir.mkdir() + + configuration = "test_device.yaml" + test_config = config_dir / configuration + test_config.write_text("esphome:\n name: test_device\n") + + mock_dashboard_settings.config_dir = str(config_dir) + mock_dashboard_settings.rel_path.return_value = str(test_config) + mock_archive_storage_path.return_value = str(archive_dir) + + mock_storage = MagicMock() + mock_storage.name = "test_device" + mock_storage.build_path = None + mock_storage_json.load.return_value = mock_storage + + response = await dashboard.fetch( + "/archive", + method="POST", + body=f"configuration={configuration}", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + assert response.code == 200 + + assert not test_config.exists() + assert (archive_dir / configuration).exists() + assert not (archive_dir / "test_device").exists() + + @pytest.mark.skipif(os.name == "nt", reason="Unix sockets are not supported on Windows") @pytest.mark.usefixtures("mock_trash_storage_path", "mock_archive_storage_path") def test_start_web_server_with_unix_socket(tmp_path: Path) -> None: diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index aac5a642f6..06d06d0506 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -9,8 +9,10 @@ not be part of a unit test suite. """ +from collections.abc import Generator from pathlib import Path import sys +from unittest.mock import Mock, patch import pytest @@ -36,3 +38,52 @@ def fixture_path() -> Path: Location of all fixture files. """ return here / "fixtures" + + +@pytest.fixture +def setup_core(tmp_path: Path) -> Path: + """Set up CORE with test paths.""" + CORE.config_path = str(tmp_path / "test.yaml") + return tmp_path + + +@pytest.fixture +def mock_write_file_if_changed() -> Generator[Mock, None, None]: + """Mock write_file_if_changed for storage_json.""" + with patch("esphome.storage_json.write_file_if_changed") as mock: + yield mock + + +@pytest.fixture +def mock_copy_file_if_changed() -> Generator[Mock, None, None]: + """Mock copy_file_if_changed for core.config.""" + with patch("esphome.core.config.copy_file_if_changed") as mock: + yield mock + + +@pytest.fixture +def mock_run_platformio_cli() -> Generator[Mock, None, None]: + """Mock run_platformio_cli for platformio_api.""" + with patch("esphome.platformio_api.run_platformio_cli") as mock: + yield mock + + +@pytest.fixture +def mock_run_platformio_cli_run() -> Generator[Mock, None, None]: + """Mock run_platformio_cli_run for platformio_api.""" + with patch("esphome.platformio_api.run_platformio_cli_run") as mock: + yield mock + + +@pytest.fixture +def mock_decode_pc() -> Generator[Mock, None, None]: + """Mock _decode_pc for platformio_api.""" + with patch("esphome.platformio_api._decode_pc") as mock: + yield mock + + +@pytest.fixture +def mock_run_external_command() -> Generator[Mock, None, None]: + """Mock run_external_command for platformio_api.""" + with patch("esphome.platformio_api.run_external_command") as mock: + yield mock diff --git a/tests/unit_tests/core/test_config.py b/tests/unit_tests/core/test_config.py index f5ba5221ed..46fe0148d8 100644 --- a/tests/unit_tests/core/test_config.py +++ b/tests/unit_tests/core/test_config.py @@ -1,15 +1,34 @@ """Unit tests for core config functionality including areas and devices.""" from collections.abc import Callable +import os from pathlib import Path +import types from typing import Any +from unittest.mock import MagicMock, Mock, patch import pytest from esphome import config_validation as cv, core -from esphome.const import CONF_AREA, CONF_AREAS, CONF_DEVICES -from esphome.core import config -from esphome.core.config import Area, validate_area_config +from esphome.const import ( + CONF_AREA, + CONF_AREAS, + CONF_BUILD_PATH, + CONF_DEVICES, + CONF_ESPHOME, + CONF_NAME, + CONF_NAME_ADD_MAC_SUFFIX, + KEY_CORE, +) +from esphome.core import CORE, config +from esphome.core.config import ( + Area, + preload_core_config, + valid_include, + valid_project_name, + validate_area_config, + validate_hostname, +) from .common import load_config_from_fixture @@ -245,3 +264,316 @@ def test_add_platform_defines_priority() -> None: f"_add_platform_defines priority ({config._add_platform_defines.priority}) must be lower than " f"globals priority ({globals_to_code.priority}) to fix issue #10431 (sensor count bug with lambdas)" ) + + +def test_valid_include_with_angle_brackets() -> None: + """Test valid_include accepts angle bracket includes.""" + assert valid_include("") == "" + + +def test_valid_include_with_valid_file(tmp_path: Path) -> None: + """Test valid_include accepts valid include files.""" + CORE.config_path = str(tmp_path / "test.yaml") + include_file = tmp_path / "include.h" + include_file.touch() + + assert valid_include(str(include_file)) == str(include_file) + + +def test_valid_include_with_valid_directory(tmp_path: Path) -> None: + """Test valid_include accepts valid directories.""" + CORE.config_path = str(tmp_path / "test.yaml") + include_dir = tmp_path / "includes" + include_dir.mkdir() + + assert valid_include(str(include_dir)) == str(include_dir) + + +def test_valid_include_invalid_extension(tmp_path: Path) -> None: + """Test valid_include rejects files with invalid extensions.""" + CORE.config_path = str(tmp_path / "test.yaml") + invalid_file = tmp_path / "file.txt" + invalid_file.touch() + + with pytest.raises(cv.Invalid, match="Include has invalid file extension"): + valid_include(str(invalid_file)) + + +def test_valid_project_name_valid() -> None: + """Test valid_project_name accepts valid project names.""" + assert valid_project_name("esphome.my_project") == "esphome.my_project" + + +def test_valid_project_name_no_namespace() -> None: + """Test valid_project_name rejects names without namespace.""" + with pytest.raises(cv.Invalid, match="project name needs to have a namespace"): + valid_project_name("my_project") + + +def test_valid_project_name_multiple_dots() -> None: + """Test valid_project_name rejects names with multiple dots.""" + with pytest.raises(cv.Invalid, match="project name needs to have a namespace"): + valid_project_name("esphome.my.project") + + +def test_validate_hostname_valid() -> None: + """Test validate_hostname accepts valid hostnames.""" + config = {CONF_NAME: "my-device", CONF_NAME_ADD_MAC_SUFFIX: False} + assert validate_hostname(config) == config + + +def test_validate_hostname_too_long() -> None: + """Test validate_hostname rejects hostnames that are too long.""" + config = { + CONF_NAME: "a" * 32, # 32 chars, max is 31 + CONF_NAME_ADD_MAC_SUFFIX: False, + } + with pytest.raises(cv.Invalid, match="Hostnames can only be 31 characters long"): + validate_hostname(config) + + +def test_validate_hostname_too_long_with_mac_suffix() -> None: + """Test validate_hostname accounts for MAC suffix length.""" + config = { + CONF_NAME: "a" * 25, # 25 chars, max is 24 with MAC suffix + CONF_NAME_ADD_MAC_SUFFIX: True, + } + with pytest.raises(cv.Invalid, match="Hostnames can only be 24 characters long"): + validate_hostname(config) + + +def test_validate_hostname_with_underscore(caplog) -> None: + """Test validate_hostname warns about underscores.""" + config = {CONF_NAME: "my_device", CONF_NAME_ADD_MAC_SUFFIX: False} + assert validate_hostname(config) == config + assert ( + "Using the '_' (underscore) character in the hostname is discouraged" + in caplog.text + ) + + +def test_preload_core_config_basic(setup_core: Path) -> None: + """Test preload_core_config sets basic CORE attributes.""" + config = { + CONF_ESPHOME: { + CONF_NAME: "test_device", + }, + "esp32": {}, + } + result = {} + + platform = preload_core_config(config, result) + + assert CORE.name == "test_device" + assert platform == "esp32" + assert KEY_CORE in CORE.data + assert CONF_BUILD_PATH in config[CONF_ESPHOME] + # Verify default build path is "build/" + build_path = config[CONF_ESPHOME][CONF_BUILD_PATH] + assert build_path.endswith(os.path.join("build", "test_device")) + + +def test_preload_core_config_with_build_path(setup_core: Path) -> None: + """Test preload_core_config uses provided build path.""" + config = { + CONF_ESPHOME: { + CONF_NAME: "test_device", + CONF_BUILD_PATH: "/custom/build/path", + }, + "esp8266": {}, + } + result = {} + + platform = preload_core_config(config, result) + + assert config[CONF_ESPHOME][CONF_BUILD_PATH] == "/custom/build/path" + assert platform == "esp8266" + + +def test_preload_core_config_env_build_path(setup_core: Path) -> None: + """Test preload_core_config uses ESPHOME_BUILD_PATH env var.""" + config = { + CONF_ESPHOME: { + CONF_NAME: "test_device", + }, + "rp2040": {}, + } + result = {} + + with patch.dict(os.environ, {"ESPHOME_BUILD_PATH": "/env/build"}): + platform = preload_core_config(config, result) + + assert CONF_BUILD_PATH in config[CONF_ESPHOME] + assert "test_device" in config[CONF_ESPHOME][CONF_BUILD_PATH] + # Verify it uses the env var path with device name appended + build_path = config[CONF_ESPHOME][CONF_BUILD_PATH] + expected_path = os.path.join("/env/build", "test_device") + assert build_path == expected_path or build_path == expected_path.replace( + "/", os.sep + ) + assert platform == "rp2040" + + +def test_preload_core_config_no_platform(setup_core: Path) -> None: + """Test preload_core_config raises when no platform is specified.""" + config = { + CONF_ESPHOME: { + CONF_NAME: "test_device", + }, + } + result = {} + + # Mock _is_target_platform to avoid expensive component loading + with patch("esphome.core.config._is_target_platform") as mock_is_platform: + # Return True for known platforms + mock_is_platform.side_effect = lambda name: name in [ + "esp32", + "esp8266", + "rp2040", + ] + + with pytest.raises(cv.Invalid, match="Platform missing"): + preload_core_config(config, result) + + +def test_preload_core_config_multiple_platforms(setup_core: Path) -> None: + """Test preload_core_config raises when multiple platforms are specified.""" + config = { + CONF_ESPHOME: { + CONF_NAME: "test_device", + }, + "esp32": {}, + "esp8266": {}, + } + result = {} + + # Mock _is_target_platform to avoid expensive component loading + with patch("esphome.core.config._is_target_platform") as mock_is_platform: + # Return True for known platforms + mock_is_platform.side_effect = lambda name: name in [ + "esp32", + "esp8266", + "rp2040", + ] + + with pytest.raises(cv.Invalid, match="Found multiple target platform blocks"): + preload_core_config(config, result) + + +def test_include_file_header(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None: + """Test include_file adds include statement for header files.""" + src_file = tmp_path / "source.h" + src_file.write_text("// Header content") + + CORE.build_path = str(tmp_path / "build") + + with patch("esphome.core.config.cg") as mock_cg: + # Mock RawStatement to capture the text + mock_raw_statement = MagicMock() + mock_raw_statement.text = "" + + def raw_statement_side_effect(text): + mock_raw_statement.text = text + return mock_raw_statement + + mock_cg.RawStatement.side_effect = raw_statement_side_effect + + config.include_file(str(src_file), "test.h") + + mock_copy_file_if_changed.assert_called_once() + mock_cg.add_global.assert_called_once() + # Check that include statement was added + assert '#include "test.h"' in mock_raw_statement.text + + +def test_include_file_cpp(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None: + """Test include_file does not add include for cpp files.""" + src_file = tmp_path / "source.cpp" + src_file.write_text("// CPP content") + + CORE.build_path = str(tmp_path / "build") + + with patch("esphome.core.config.cg") as mock_cg: + config.include_file(str(src_file), "test.cpp") + + mock_copy_file_if_changed.assert_called_once() + # Should not add include statement for .cpp files + mock_cg.add_global.assert_not_called() + + +def test_get_usable_cpu_count() -> None: + """Test get_usable_cpu_count returns CPU count.""" + count = config.get_usable_cpu_count() + assert isinstance(count, int) + assert count > 0 + + +def test_get_usable_cpu_count_with_process_cpu_count() -> None: + """Test get_usable_cpu_count uses process_cpu_count when available.""" + # Test with process_cpu_count (Python 3.13+) + # Create a mock os module with process_cpu_count + + mock_os = types.SimpleNamespace(process_cpu_count=lambda: 8, cpu_count=lambda: 4) + + with patch("esphome.core.config.os", mock_os): + # When process_cpu_count exists, it should be used + count = config.get_usable_cpu_count() + assert count == 8 + + # Test fallback to cpu_count when process_cpu_count not available + mock_os_no_process = types.SimpleNamespace(cpu_count=lambda: 4) + + with patch("esphome.core.config.os", mock_os_no_process): + count = config.get_usable_cpu_count() + assert count == 4 + + +def test_list_target_platforms(tmp_path: Path) -> None: + """Test _list_target_platforms returns available platforms.""" + # Create mock components directory structure + components_dir = tmp_path / "components" + components_dir.mkdir() + + # Create platform and non-platform directories with __init__.py + platforms = ["esp32", "esp8266", "rp2040", "libretiny", "host"] + non_platforms = ["sensor"] + + for component in platforms + non_platforms: + component_dir = components_dir / component + component_dir.mkdir() + (component_dir / "__init__.py").touch() + + # Create a file (not a directory) + (components_dir / "README.md").touch() + + # Create a directory without __init__.py + (components_dir / "no_init").mkdir() + + # Mock Path(__file__).parents[1] to return our tmp_path + with patch("esphome.core.config.Path") as mock_path: + mock_file_path = MagicMock() + mock_file_path.parents = [MagicMock(), tmp_path] + mock_path.return_value = mock_file_path + + platforms = config._list_target_platforms() + + assert isinstance(platforms, list) + # Should include platform components + assert "esp32" in platforms + assert "esp8266" in platforms + assert "rp2040" in platforms + assert "libretiny" in platforms + assert "host" in platforms + # Should not include non-platform components + assert "sensor" not in platforms + assert "README.md" not in platforms + assert "no_init" not in platforms + + +def test_is_target_platform() -> None: + """Test _is_target_platform identifies valid platforms.""" + assert config._is_target_platform("esp32") is True + assert config._is_target_platform("esp8266") is True + assert config._is_target_platform("rp2040") is True + assert config._is_target_platform("invalid_platform") is False + assert config._is_target_platform("api") is False # Component but not platform diff --git a/tests/unit_tests/test_config_validation_paths.py b/tests/unit_tests/test_config_validation_paths.py new file mode 100644 index 0000000000..f8f038390e --- /dev/null +++ b/tests/unit_tests/test_config_validation_paths.py @@ -0,0 +1,187 @@ +"""Tests for config_validation.py path-related functions.""" + +from pathlib import Path + +import pytest +import voluptuous as vol + +from esphome import config_validation as cv + + +def test_directory_valid_path(setup_core: Path) -> None: + """Test directory validator with valid directory.""" + test_dir = setup_core / "test_directory" + test_dir.mkdir() + + result = cv.directory("test_directory") + + assert result == "test_directory" + + +def test_directory_absolute_path(setup_core: Path) -> None: + """Test directory validator with absolute path.""" + test_dir = setup_core / "test_directory" + test_dir.mkdir() + + result = cv.directory(str(test_dir)) + + assert result == str(test_dir) + + +def test_directory_nonexistent_path(setup_core: Path) -> None: + """Test directory validator raises error for non-existent directory.""" + with pytest.raises( + vol.Invalid, match="Could not find directory.*nonexistent_directory" + ): + cv.directory("nonexistent_directory") + + +def test_directory_file_instead_of_directory(setup_core: Path) -> None: + """Test directory validator raises error when path is a file.""" + test_file = setup_core / "test_file.txt" + test_file.write_text("content") + + with pytest.raises(vol.Invalid, match="is not a directory"): + cv.directory("test_file.txt") + + +def test_directory_with_parent_directory(setup_core: Path) -> None: + """Test directory validator with nested directory structure.""" + nested_dir = setup_core / "parent" / "child" / "grandchild" + nested_dir.mkdir(parents=True) + + result = cv.directory("parent/child/grandchild") + + assert result == "parent/child/grandchild" + + +def test_file_valid_path(setup_core: Path) -> None: + """Test file_ validator with valid file.""" + test_file = setup_core / "test_file.yaml" + test_file.write_text("test content") + + result = cv.file_("test_file.yaml") + + assert result == "test_file.yaml" + + +def test_file_absolute_path(setup_core: Path) -> None: + """Test file_ validator with absolute path.""" + test_file = setup_core / "test_file.yaml" + test_file.write_text("test content") + + result = cv.file_(str(test_file)) + + assert result == str(test_file) + + +def test_file_nonexistent_path(setup_core: Path) -> None: + """Test file_ validator raises error for non-existent file.""" + with pytest.raises(vol.Invalid, match="Could not find file.*nonexistent_file.yaml"): + cv.file_("nonexistent_file.yaml") + + +def test_file_directory_instead_of_file(setup_core: Path) -> None: + """Test file_ validator raises error when path is a directory.""" + test_dir = setup_core / "test_directory" + test_dir.mkdir() + + with pytest.raises(vol.Invalid, match="is not a file"): + cv.file_("test_directory") + + +def test_file_with_parent_directory(setup_core: Path) -> None: + """Test file_ validator with file in nested directory.""" + nested_dir = setup_core / "configs" / "sensors" + nested_dir.mkdir(parents=True) + test_file = nested_dir / "temperature.yaml" + test_file.write_text("sensor config") + + result = cv.file_("configs/sensors/temperature.yaml") + + assert result == "configs/sensors/temperature.yaml" + + +def test_directory_handles_trailing_slash(setup_core: Path) -> None: + """Test directory validator handles trailing slashes correctly.""" + test_dir = setup_core / "test_dir" + test_dir.mkdir() + + result = cv.directory("test_dir/") + assert result == "test_dir/" + + result = cv.directory("test_dir") + assert result == "test_dir" + + +def test_file_handles_various_extensions(setup_core: Path) -> None: + """Test file_ validator works with different file extensions.""" + yaml_file = setup_core / "config.yaml" + yaml_file.write_text("yaml content") + assert cv.file_("config.yaml") == "config.yaml" + + yml_file = setup_core / "config.yml" + yml_file.write_text("yml content") + assert cv.file_("config.yml") == "config.yml" + + txt_file = setup_core / "readme.txt" + txt_file.write_text("text content") + assert cv.file_("readme.txt") == "readme.txt" + + no_ext_file = setup_core / "LICENSE" + no_ext_file.write_text("license content") + assert cv.file_("LICENSE") == "LICENSE" + + +def test_directory_with_symlink(setup_core: Path) -> None: + """Test directory validator follows symlinks.""" + actual_dir = setup_core / "actual_directory" + actual_dir.mkdir() + + symlink_dir = setup_core / "symlink_directory" + symlink_dir.symlink_to(actual_dir) + + result = cv.directory("symlink_directory") + assert result == "symlink_directory" + + +def test_file_with_symlink(setup_core: Path) -> None: + """Test file_ validator follows symlinks.""" + actual_file = setup_core / "actual_file.txt" + actual_file.write_text("content") + + symlink_file = setup_core / "symlink_file.txt" + symlink_file.symlink_to(actual_file) + + result = cv.file_("symlink_file.txt") + assert result == "symlink_file.txt" + + +def test_directory_error_shows_full_path(setup_core: Path) -> None: + """Test directory validator error message includes full path.""" + with pytest.raises(vol.Invalid, match=".*missing_dir.*full path:.*"): + cv.directory("missing_dir") + + +def test_file_error_shows_full_path(setup_core: Path) -> None: + """Test file_ validator error message includes full path.""" + with pytest.raises(vol.Invalid, match=".*missing_file.yaml.*full path:.*"): + cv.file_("missing_file.yaml") + + +def test_directory_with_spaces_in_name(setup_core: Path) -> None: + """Test directory validator handles spaces in directory names.""" + dir_with_spaces = setup_core / "my test directory" + dir_with_spaces.mkdir() + + result = cv.directory("my test directory") + assert result == "my test directory" + + +def test_file_with_spaces_in_name(setup_core: Path) -> None: + """Test file_ validator handles spaces in file names.""" + file_with_spaces = setup_core / "my test file.yaml" + file_with_spaces.write_text("content") + + result = cv.file_("my test file.yaml") + assert result == "my test file.yaml" diff --git a/tests/unit_tests/test_external_files.py b/tests/unit_tests/test_external_files.py new file mode 100644 index 0000000000..3fa7de2f64 --- /dev/null +++ b/tests/unit_tests/test_external_files.py @@ -0,0 +1,196 @@ +"""Tests for external_files.py functions.""" + +from pathlib import Path +import time +from unittest.mock import MagicMock, patch + +import pytest +import requests + +from esphome import external_files +from esphome.config_validation import Invalid +from esphome.core import CORE, TimePeriod + + +def test_compute_local_file_dir(setup_core: Path) -> None: + """Test compute_local_file_dir creates and returns correct path.""" + domain = "font" + + result = external_files.compute_local_file_dir(domain) + + assert isinstance(result, Path) + assert result == Path(CORE.data_dir) / domain + assert result.exists() + assert result.is_dir() + + +def test_compute_local_file_dir_nested(setup_core: Path) -> None: + """Test compute_local_file_dir works with nested domains.""" + domain = "images/icons" + + result = external_files.compute_local_file_dir(domain) + + assert result == Path(CORE.data_dir) / "images" / "icons" + assert result.exists() + assert result.is_dir() + + +def test_is_file_recent_with_recent_file(setup_core: Path) -> None: + """Test is_file_recent returns True for recently created file.""" + test_file = setup_core / "recent.txt" + test_file.write_text("content") + + refresh = TimePeriod(seconds=3600) + + result = external_files.is_file_recent(str(test_file), refresh) + + assert result is True + + +def test_is_file_recent_with_old_file(setup_core: Path) -> None: + """Test is_file_recent returns False for old file.""" + test_file = setup_core / "old.txt" + test_file.write_text("content") + + old_time = time.time() - 7200 + + with patch("os.path.getctime", return_value=old_time): + refresh = TimePeriod(seconds=3600) + + result = external_files.is_file_recent(str(test_file), refresh) + + assert result is False + + +def test_is_file_recent_nonexistent_file(setup_core: Path) -> None: + """Test is_file_recent returns False for non-existent file.""" + test_file = setup_core / "nonexistent.txt" + refresh = TimePeriod(seconds=3600) + + result = external_files.is_file_recent(str(test_file), refresh) + + assert result is False + + +def test_is_file_recent_with_zero_refresh(setup_core: Path) -> None: + """Test is_file_recent with zero refresh period returns False.""" + test_file = setup_core / "test.txt" + test_file.write_text("content") + + # Mock getctime to return a time 10 seconds ago + with patch("os.path.getctime", return_value=time.time() - 10): + refresh = TimePeriod(seconds=0) + result = external_files.is_file_recent(str(test_file), refresh) + assert result is False + + +@patch("esphome.external_files.requests.head") +def test_has_remote_file_changed_not_modified( + mock_head: MagicMock, setup_core: Path +) -> None: + """Test has_remote_file_changed returns False when file not modified.""" + test_file = setup_core / "cached.txt" + test_file.write_text("cached content") + + mock_response = MagicMock() + mock_response.status_code = 304 + mock_head.return_value = mock_response + + url = "https://example.com/file.txt" + result = external_files.has_remote_file_changed(url, str(test_file)) + + assert result is False + mock_head.assert_called_once() + + call_args = mock_head.call_args + headers = call_args[1]["headers"] + assert external_files.IF_MODIFIED_SINCE in headers + assert external_files.CACHE_CONTROL in headers + + +@patch("esphome.external_files.requests.head") +def test_has_remote_file_changed_modified( + mock_head: MagicMock, setup_core: Path +) -> None: + """Test has_remote_file_changed returns True when file modified.""" + test_file = setup_core / "cached.txt" + test_file.write_text("cached content") + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_head.return_value = mock_response + + url = "https://example.com/file.txt" + result = external_files.has_remote_file_changed(url, str(test_file)) + + assert result is True + + +def test_has_remote_file_changed_no_local_file(setup_core: Path) -> None: + """Test has_remote_file_changed returns True when local file doesn't exist.""" + test_file = setup_core / "nonexistent.txt" + + url = "https://example.com/file.txt" + result = external_files.has_remote_file_changed(url, str(test_file)) + + assert result is True + + +@patch("esphome.external_files.requests.head") +def test_has_remote_file_changed_network_error( + mock_head: MagicMock, setup_core: Path +) -> None: + """Test has_remote_file_changed handles network errors gracefully.""" + test_file = setup_core / "cached.txt" + test_file.write_text("cached content") + + mock_head.side_effect = requests.exceptions.RequestException("Network error") + + url = "https://example.com/file.txt" + + with pytest.raises(Invalid, match="Could not check if.*Network error"): + external_files.has_remote_file_changed(url, str(test_file)) + + +@patch("esphome.external_files.requests.head") +def test_has_remote_file_changed_timeout( + mock_head: MagicMock, setup_core: Path +) -> None: + """Test has_remote_file_changed respects timeout.""" + test_file = setup_core / "cached.txt" + test_file.write_text("cached content") + + mock_response = MagicMock() + mock_response.status_code = 304 + mock_head.return_value = mock_response + + url = "https://example.com/file.txt" + external_files.has_remote_file_changed(url, str(test_file)) + + call_args = mock_head.call_args + assert call_args[1]["timeout"] == external_files.NETWORK_TIMEOUT + + +def test_compute_local_file_dir_creates_parent_dirs(setup_core: Path) -> None: + """Test compute_local_file_dir creates parent directories.""" + domain = "level1/level2/level3/level4" + + result = external_files.compute_local_file_dir(domain) + + assert result.exists() + assert result.is_dir() + assert result.parent.name == "level3" + assert result.parent.parent.name == "level2" + assert result.parent.parent.parent.name == "level1" + + +def test_is_file_recent_handles_float_seconds(setup_core: Path) -> None: + """Test is_file_recent works with float seconds in TimePeriod.""" + test_file = setup_core / "test.txt" + test_file.write_text("content") + + refresh = TimePeriod(seconds=3600.5) + + result = external_files.is_file_recent(str(test_file), refresh) + + assert result is True diff --git a/tests/unit_tests/test_platformio_api.py b/tests/unit_tests/test_platformio_api.py new file mode 100644 index 0000000000..7c7883d391 --- /dev/null +++ b/tests/unit_tests/test_platformio_api.py @@ -0,0 +1,636 @@ +"""Tests for platformio_api.py path functions.""" + +import json +import os +from pathlib import Path +import shutil +from types import SimpleNamespace +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from esphome import platformio_api +from esphome.core import CORE, EsphomeError + + +def test_idedata_firmware_elf_path(setup_core: Path) -> None: + """Test IDEData.firmware_elf_path returns correct path.""" + CORE.build_path = str(setup_core / "build" / "test") + CORE.name = "test" + raw_data = {"prog_path": "/path/to/firmware.elf"} + idedata = platformio_api.IDEData(raw_data) + + assert idedata.firmware_elf_path == "/path/to/firmware.elf" + + +def test_idedata_firmware_bin_path(setup_core: Path) -> None: + """Test IDEData.firmware_bin_path returns Path with .bin extension.""" + CORE.build_path = str(setup_core / "build" / "test") + CORE.name = "test" + prog_path = str(Path("/path/to/firmware.elf")) + raw_data = {"prog_path": prog_path} + idedata = platformio_api.IDEData(raw_data) + + result = idedata.firmware_bin_path + assert isinstance(result, str) + expected = str(Path("/path/to/firmware.bin")) + assert result == expected + assert result.endswith(".bin") + + +def test_idedata_firmware_bin_path_preserves_directory(setup_core: Path) -> None: + """Test firmware_bin_path preserves the directory structure.""" + CORE.build_path = str(setup_core / "build" / "test") + CORE.name = "test" + prog_path = str(Path("/complex/path/to/build/firmware.elf")) + raw_data = {"prog_path": prog_path} + idedata = platformio_api.IDEData(raw_data) + + result = idedata.firmware_bin_path + expected = str(Path("/complex/path/to/build/firmware.bin")) + assert result == expected + + +def test_idedata_extra_flash_images(setup_core: Path) -> None: + """Test IDEData.extra_flash_images returns list of FlashImage objects.""" + CORE.build_path = str(setup_core / "build" / "test") + CORE.name = "test" + raw_data = { + "prog_path": "/path/to/firmware.elf", + "extra": { + "flash_images": [ + {"path": "/path/to/bootloader.bin", "offset": "0x1000"}, + {"path": "/path/to/partition.bin", "offset": "0x8000"}, + ] + }, + } + idedata = platformio_api.IDEData(raw_data) + + images = idedata.extra_flash_images + assert len(images) == 2 + assert all(isinstance(img, platformio_api.FlashImage) for img in images) + assert images[0].path == "/path/to/bootloader.bin" + assert images[0].offset == "0x1000" + assert images[1].path == "/path/to/partition.bin" + assert images[1].offset == "0x8000" + + +def test_idedata_extra_flash_images_empty(setup_core: Path) -> None: + """Test extra_flash_images returns empty list when no extra images.""" + CORE.build_path = str(setup_core / "build" / "test") + CORE.name = "test" + raw_data = {"prog_path": "/path/to/firmware.elf", "extra": {"flash_images": []}} + idedata = platformio_api.IDEData(raw_data) + + images = idedata.extra_flash_images + assert images == [] + + +def test_idedata_cc_path(setup_core: Path) -> None: + """Test IDEData.cc_path returns compiler path.""" + CORE.build_path = str(setup_core / "build" / "test") + CORE.name = "test" + raw_data = { + "prog_path": "/path/to/firmware.elf", + "cc_path": "/Users/test/.platformio/packages/toolchain-xtensa32/bin/xtensa-esp32-elf-gcc", + } + idedata = platformio_api.IDEData(raw_data) + + assert ( + idedata.cc_path + == "/Users/test/.platformio/packages/toolchain-xtensa32/bin/xtensa-esp32-elf-gcc" + ) + + +def test_flash_image_dataclass() -> None: + """Test FlashImage dataclass stores path and offset correctly.""" + image = platformio_api.FlashImage(path="/path/to/image.bin", offset="0x10000") + + assert image.path == "/path/to/image.bin" + assert image.offset == "0x10000" + + +def test_load_idedata_returns_dict( + setup_core: Path, mock_run_platformio_cli_run +) -> None: + """Test _load_idedata returns parsed idedata dict when successful.""" + CORE.build_path = str(setup_core / "build" / "test") + CORE.name = "test" + + # Create required files + platformio_ini = setup_core / "build" / "test" / "platformio.ini" + platformio_ini.parent.mkdir(parents=True, exist_ok=True) + platformio_ini.touch() + + idedata_path = setup_core / ".esphome" / "idedata" / "test.json" + idedata_path.parent.mkdir(parents=True, exist_ok=True) + idedata_path.write_text('{"prog_path": "/test/firmware.elf"}') + + mock_run_platformio_cli_run.return_value = '{"prog_path": "/test/firmware.elf"}' + + config = {"name": "test"} + result = platformio_api._load_idedata(config) + + assert result is not None + assert isinstance(result, dict) + assert result["prog_path"] == "/test/firmware.elf" + + +def test_load_idedata_uses_cache_when_valid( + setup_core: Path, mock_run_platformio_cli_run: Mock +) -> None: + """Test _load_idedata uses cached data when unchanged.""" + CORE.build_path = str(setup_core / "build" / "test") + CORE.name = "test" + + # Create platformio.ini + platformio_ini = setup_core / "build" / "test" / "platformio.ini" + platformio_ini.parent.mkdir(parents=True, exist_ok=True) + platformio_ini.write_text("content") + + # Create idedata cache file that's newer + idedata_path = setup_core / ".esphome" / "idedata" / "test.json" + idedata_path.parent.mkdir(parents=True, exist_ok=True) + idedata_path.write_text('{"prog_path": "/cached/firmware.elf"}') + + # Make idedata newer than platformio.ini + platformio_ini_mtime = platformio_ini.stat().st_mtime + os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1)) + + config = {"name": "test"} + result = platformio_api._load_idedata(config) + + # Should not call _run_idedata since cache is valid + mock_run_platformio_cli_run.assert_not_called() + + assert result["prog_path"] == "/cached/firmware.elf" + + +def test_load_idedata_regenerates_when_platformio_ini_newer( + setup_core: Path, mock_run_platformio_cli_run: Mock +) -> None: + """Test _load_idedata regenerates when platformio.ini is newer.""" + CORE.build_path = str(setup_core / "build" / "test") + CORE.name = "test" + + # Create idedata cache file first + idedata_path = setup_core / ".esphome" / "idedata" / "test.json" + idedata_path.parent.mkdir(parents=True, exist_ok=True) + idedata_path.write_text('{"prog_path": "/old/firmware.elf"}') + + # Create platformio.ini that's newer + idedata_mtime = idedata_path.stat().st_mtime + platformio_ini = setup_core / "build" / "test" / "platformio.ini" + platformio_ini.parent.mkdir(parents=True, exist_ok=True) + platformio_ini.write_text("content") + # Make platformio.ini newer than idedata + os.utime(platformio_ini, (idedata_mtime + 1, idedata_mtime + 1)) + + # Mock platformio to return new data + new_data = {"prog_path": "/new/firmware.elf"} + mock_run_platformio_cli_run.return_value = json.dumps(new_data) + + config = {"name": "test"} + result = platformio_api._load_idedata(config) + + # Should call _run_idedata since platformio.ini is newer + mock_run_platformio_cli_run.assert_called_once() + + assert result["prog_path"] == "/new/firmware.elf" + + +def test_load_idedata_regenerates_on_corrupted_cache( + setup_core: Path, mock_run_platformio_cli_run: Mock +) -> None: + """Test _load_idedata regenerates when cache file is corrupted.""" + CORE.build_path = str(setup_core / "build" / "test") + CORE.name = "test" + + # Create platformio.ini + platformio_ini = setup_core / "build" / "test" / "platformio.ini" + platformio_ini.parent.mkdir(parents=True, exist_ok=True) + platformio_ini.write_text("content") + + # Create corrupted idedata cache file + idedata_path = setup_core / ".esphome" / "idedata" / "test.json" + idedata_path.parent.mkdir(parents=True, exist_ok=True) + idedata_path.write_text('{"prog_path": invalid json') + + # Make idedata newer so it would be used if valid + platformio_ini_mtime = platformio_ini.stat().st_mtime + os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1)) + + # Mock platformio to return new data + new_data = {"prog_path": "/new/firmware.elf"} + mock_run_platformio_cli_run.return_value = json.dumps(new_data) + + config = {"name": "test"} + result = platformio_api._load_idedata(config) + + # Should call _run_idedata since cache is corrupted + mock_run_platformio_cli_run.assert_called_once() + + assert result["prog_path"] == "/new/firmware.elf" + + +def test_run_idedata_parses_json_from_output( + setup_core: Path, mock_run_platformio_cli_run: Mock +) -> None: + """Test _run_idedata extracts JSON from platformio output.""" + config = {"name": "test"} + + expected_data = { + "prog_path": "/path/to/firmware.elf", + "cc_path": "/path/to/gcc", + "extra": {"flash_images": []}, + } + + # Simulate platformio output with JSON embedded + mock_run_platformio_cli_run.return_value = ( + f"Some preamble\n{json.dumps(expected_data)}\nSome postamble" + ) + + result = platformio_api._run_idedata(config) + + assert result == expected_data + + +def test_run_idedata_raises_on_no_json( + setup_core: Path, mock_run_platformio_cli_run: Mock +) -> None: + """Test _run_idedata raises EsphomeError when no JSON found.""" + config = {"name": "test"} + + mock_run_platformio_cli_run.return_value = "No JSON in this output" + + with pytest.raises(EsphomeError): + platformio_api._run_idedata(config) + + +def test_run_idedata_raises_on_invalid_json( + setup_core: Path, mock_run_platformio_cli_run: Mock +) -> None: + """Test _run_idedata raises on malformed JSON.""" + config = {"name": "test"} + mock_run_platformio_cli_run.return_value = '{"invalid": json"}' + + # The ValueError from json.loads is re-raised + with pytest.raises(ValueError): + platformio_api._run_idedata(config) + + +def test_run_platformio_cli_sets_environment_variables( + setup_core: Path, mock_run_external_command: Mock +) -> None: + """Test run_platformio_cli sets correct environment variables.""" + CORE.build_path = str(setup_core / "build" / "test") + + with patch.dict(os.environ, {}, clear=False): + mock_run_external_command.return_value = 0 + platformio_api.run_platformio_cli("test", "arg") + + # Check environment variables were set + assert os.environ["PLATFORMIO_FORCE_COLOR"] == "true" + assert ( + setup_core / "build" / "test" + in Path(os.environ["PLATFORMIO_BUILD_DIR"]).parents + or Path(os.environ["PLATFORMIO_BUILD_DIR"]) == setup_core / "build" / "test" + ) + assert "PLATFORMIO_LIBDEPS_DIR" in os.environ + assert "PYTHONWARNINGS" in os.environ + + # Check command was called correctly + mock_run_external_command.assert_called_once() + args = mock_run_external_command.call_args[0] + assert "platformio" in args + assert "test" in args + assert "arg" in args + + +def test_run_platformio_cli_run_builds_command( + setup_core: Path, mock_run_platformio_cli: Mock +) -> None: + """Test run_platformio_cli_run builds correct command.""" + CORE.build_path = str(setup_core / "build" / "test") + mock_run_platformio_cli.return_value = 0 + + config = {"name": "test"} + platformio_api.run_platformio_cli_run(config, True, "extra", "args") + + mock_run_platformio_cli.assert_called_once_with( + "run", "-d", CORE.build_path, "-v", "extra", "args" + ) + + +def test_run_compile(setup_core: Path, mock_run_platformio_cli_run: Mock) -> None: + """Test run_compile with process limit.""" + from esphome.const import CONF_COMPILE_PROCESS_LIMIT, CONF_ESPHOME + + CORE.build_path = str(setup_core / "build" / "test") + config = {CONF_ESPHOME: {CONF_COMPILE_PROCESS_LIMIT: 4}} + mock_run_platformio_cli_run.return_value = 0 + + platformio_api.run_compile(config, verbose=True) + + mock_run_platformio_cli_run.assert_called_once_with(config, True, "-j4") + + +def test_get_idedata_caches_result( + setup_core: Path, mock_run_platformio_cli_run: Mock +) -> None: + """Test get_idedata caches result in CORE.data.""" + from esphome.const import KEY_CORE + + CORE.build_path = str(setup_core / "build" / "test") + CORE.name = "test" + CORE.data[KEY_CORE] = {} + + # Create platformio.ini to avoid regeneration + platformio_ini = setup_core / "build" / "test" / "platformio.ini" + platformio_ini.parent.mkdir(parents=True, exist_ok=True) + platformio_ini.write_text("content") + + # Mock platformio to return data + idedata = {"prog_path": "/test/firmware.elf"} + mock_run_platformio_cli_run.return_value = json.dumps(idedata) + + config = {"name": "test"} + + # First call should load and cache + result1 = platformio_api.get_idedata(config) + mock_run_platformio_cli_run.assert_called_once() + + # Second call should use cache from CORE.data + result2 = platformio_api.get_idedata(config) + mock_run_platformio_cli_run.assert_called_once() # Still only called once + + assert result1 is result2 + assert isinstance(result1, platformio_api.IDEData) + assert result1.firmware_elf_path == "/test/firmware.elf" + + +def test_idedata_addr2line_path_windows(setup_core: Path) -> None: + """Test IDEData.addr2line_path on Windows.""" + raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "C:\\tools\\gcc.exe"} + idedata = platformio_api.IDEData(raw_data) + + result = idedata.addr2line_path + assert result == "C:\\tools\\addr2line.exe" + + +def test_idedata_addr2line_path_unix(setup_core: Path) -> None: + """Test IDEData.addr2line_path on Unix.""" + raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "/usr/bin/gcc"} + idedata = platformio_api.IDEData(raw_data) + + result = idedata.addr2line_path + assert result == "/usr/bin/addr2line" + + +def test_patch_structhash(setup_core: Path) -> None: + """Test patch_structhash monkey patches platformio functions.""" + # Create simple namespace objects to act as modules + mock_cli = SimpleNamespace() + mock_helpers = SimpleNamespace() + mock_run = SimpleNamespace(cli=mock_cli, helpers=mock_helpers) + + # Mock platformio modules + with patch.dict( + "sys.modules", + { + "platformio.run.cli": mock_cli, + "platformio.run.helpers": mock_helpers, + "platformio.run": mock_run, + "platformio.project.helpers": MagicMock(), + "platformio.fs": MagicMock(), + "platformio": MagicMock(), + }, + ): + # Call patch_structhash + platformio_api.patch_structhash() + + # Verify both modules had clean_build_dir patched + # Check that clean_build_dir was set on both modules + assert hasattr(mock_cli, "clean_build_dir") + assert hasattr(mock_helpers, "clean_build_dir") + + # Verify they got the same function assigned + assert mock_cli.clean_build_dir is mock_helpers.clean_build_dir + + # Verify it's a real function (not a Mock) + assert callable(mock_cli.clean_build_dir) + assert mock_cli.clean_build_dir.__name__ == "patched_clean_build_dir" + + +def test_patched_clean_build_dir_removes_outdated(setup_core: Path) -> None: + """Test patched_clean_build_dir removes build dir when platformio.ini is newer.""" + build_dir = setup_core / "build" + build_dir.mkdir() + platformio_ini = setup_core / "platformio.ini" + platformio_ini.write_text("config") + + # Make platformio.ini newer than build_dir + build_mtime = build_dir.stat().st_mtime + os.utime(platformio_ini, (build_mtime + 1, build_mtime + 1)) + + # Track if directory was removed + removed_paths: list[str] = [] + + def track_rmtree(path: str) -> None: + removed_paths.append(path) + shutil.rmtree(path) + + # Create mock modules that patch_structhash expects + mock_cli = SimpleNamespace() + mock_helpers = SimpleNamespace() + mock_project_helpers = MagicMock() + mock_project_helpers.get_project_dir.return_value = str(setup_core) + mock_fs = SimpleNamespace(rmtree=track_rmtree) + + with patch.dict( + "sys.modules", + { + "platformio": SimpleNamespace(fs=mock_fs), + "platformio.fs": mock_fs, + "platformio.project.helpers": mock_project_helpers, + "platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers), + "platformio.run.cli": mock_cli, + "platformio.run.helpers": mock_helpers, + }, + ): + # Call patch_structhash to install the patched function + platformio_api.patch_structhash() + + # Call the patched function + mock_helpers.clean_build_dir(str(build_dir), []) + + # Verify directory was removed and recreated + assert len(removed_paths) == 1 + assert removed_paths[0] == str(build_dir) + assert build_dir.exists() # makedirs recreated it + + +def test_patched_clean_build_dir_keeps_updated(setup_core: Path) -> None: + """Test patched_clean_build_dir keeps build dir when it's up to date.""" + build_dir = setup_core / "build" + build_dir.mkdir() + test_file = build_dir / "test.txt" + test_file.write_text("test content") + + platformio_ini = setup_core / "platformio.ini" + platformio_ini.write_text("config") + + # Make build_dir newer than platformio.ini + ini_mtime = platformio_ini.stat().st_mtime + os.utime(build_dir, (ini_mtime + 1, ini_mtime + 1)) + + # Track if rmtree is called + removed_paths: list[str] = [] + + def track_rmtree(path: str) -> None: + removed_paths.append(path) + + # Create mock modules + mock_cli = SimpleNamespace() + mock_helpers = SimpleNamespace() + mock_project_helpers = MagicMock() + mock_project_helpers.get_project_dir.return_value = str(setup_core) + mock_fs = SimpleNamespace(rmtree=track_rmtree) + + with patch.dict( + "sys.modules", + { + "platformio": SimpleNamespace(fs=mock_fs), + "platformio.fs": mock_fs, + "platformio.project.helpers": mock_project_helpers, + "platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers), + "platformio.run.cli": mock_cli, + "platformio.run.helpers": mock_helpers, + }, + ): + # Call patch_structhash to install the patched function + platformio_api.patch_structhash() + + # Call the patched function + mock_helpers.clean_build_dir(str(build_dir), []) + + # Verify rmtree was NOT called + assert len(removed_paths) == 0 + + # Verify directory and file still exist + assert build_dir.exists() + assert test_file.exists() + assert test_file.read_text() == "test content" + + +def test_patched_clean_build_dir_creates_missing(setup_core: Path) -> None: + """Test patched_clean_build_dir creates build dir when it doesn't exist.""" + build_dir = setup_core / "build" + platformio_ini = setup_core / "platformio.ini" + platformio_ini.write_text("config") + + # Ensure build_dir doesn't exist + assert not build_dir.exists() + + # Track if rmtree is called + removed_paths: list[str] = [] + + def track_rmtree(path: str) -> None: + removed_paths.append(path) + + # Create mock modules + mock_cli = SimpleNamespace() + mock_helpers = SimpleNamespace() + mock_project_helpers = MagicMock() + mock_project_helpers.get_project_dir.return_value = str(setup_core) + mock_fs = SimpleNamespace(rmtree=track_rmtree) + + with patch.dict( + "sys.modules", + { + "platformio": SimpleNamespace(fs=mock_fs), + "platformio.fs": mock_fs, + "platformio.project.helpers": mock_project_helpers, + "platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers), + "platformio.run.cli": mock_cli, + "platformio.run.helpers": mock_helpers, + }, + ): + # Call patch_structhash to install the patched function + platformio_api.patch_structhash() + + # Call the patched function + mock_helpers.clean_build_dir(str(build_dir), []) + + # Verify rmtree was NOT called + assert len(removed_paths) == 0 + + # Verify directory was created + assert build_dir.exists() + + +def test_process_stacktrace_esp8266_exception(setup_core: Path, caplog) -> None: + """Test process_stacktrace handles ESP8266 exceptions.""" + config = {"name": "test"} + + # Test exception type parsing + line = "Exception (28):" + backtrace_state = False + + result = platformio_api.process_stacktrace(config, line, backtrace_state) + + assert "Access to invalid address: LOAD (wild pointer?)" in caplog.text + assert result is False + + +def test_process_stacktrace_esp8266_backtrace( + setup_core: Path, mock_decode_pc: Mock +) -> None: + """Test process_stacktrace handles ESP8266 multi-line backtrace.""" + config = {"name": "test"} + + # Start of backtrace + line1 = ">>>stack>>>" + state = platformio_api.process_stacktrace(config, line1, False) + assert state is True + + # Backtrace content with addresses + line2 = "40201234 40205678" + state = platformio_api.process_stacktrace(config, line2, state) + assert state is True + assert mock_decode_pc.call_count == 2 + + # End of backtrace + line3 = "<< None: + """Test process_stacktrace handles ESP32 single-line backtrace.""" + config = {"name": "test"} + + line = "Backtrace: 0x40081234:0x3ffb1234 0x40085678:0x3ffb5678" + state = platformio_api.process_stacktrace(config, line, False) + + # Should decode both addresses + assert mock_decode_pc.call_count == 2 + mock_decode_pc.assert_any_call(config, "40081234") + mock_decode_pc.assert_any_call(config, "40085678") + assert state is False + + +def test_process_stacktrace_bad_alloc( + setup_core: Path, mock_decode_pc: Mock, caplog +) -> None: + """Test process_stacktrace handles bad alloc messages.""" + config = {"name": "test"} + + line = "last failed alloc call: 40201234(512)" + state = platformio_api.process_stacktrace(config, line, False) + + assert "Memory allocation of 512 bytes failed at 40201234" in caplog.text + mock_decode_pc.assert_called_once_with(config, "40201234") + assert state is False diff --git a/tests/unit_tests/test_storage_json.py b/tests/unit_tests/test_storage_json.py new file mode 100644 index 0000000000..e1abe565b1 --- /dev/null +++ b/tests/unit_tests/test_storage_json.py @@ -0,0 +1,660 @@ +"""Tests for storage_json.py path functions.""" + +from datetime import datetime +import json +from pathlib import Path +import sys +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from esphome import storage_json +from esphome.const import CONF_DISABLED, CONF_MDNS +from esphome.core import CORE + + +def test_storage_path(setup_core: Path) -> None: + """Test storage_path returns correct path for current config.""" + CORE.config_path = str(setup_core / "my_device.yaml") + + result = storage_json.storage_path() + + data_dir = Path(CORE.data_dir) + expected = str(data_dir / "storage" / "my_device.yaml.json") + assert result == expected + + +def test_ext_storage_path(setup_core: Path) -> None: + """Test ext_storage_path returns correct path for given filename.""" + result = storage_json.ext_storage_path("other_device.yaml") + + data_dir = Path(CORE.data_dir) + expected = str(data_dir / "storage" / "other_device.yaml.json") + assert result == expected + + +def test_ext_storage_path_handles_various_extensions(setup_core: Path) -> None: + """Test ext_storage_path works with different file extensions.""" + result_yml = storage_json.ext_storage_path("device.yml") + assert result_yml.endswith("device.yml.json") + + result_no_ext = storage_json.ext_storage_path("device") + assert result_no_ext.endswith("device.json") + + result_path = storage_json.ext_storage_path("my/device.yaml") + assert result_path.endswith("device.yaml.json") + + +def test_esphome_storage_path(setup_core: Path) -> None: + """Test esphome_storage_path returns correct path.""" + result = storage_json.esphome_storage_path() + + data_dir = Path(CORE.data_dir) + expected = str(data_dir / "esphome.json") + assert result == expected + + +def test_ignored_devices_storage_path(setup_core: Path) -> None: + """Test ignored_devices_storage_path returns correct path.""" + result = storage_json.ignored_devices_storage_path() + + data_dir = Path(CORE.data_dir) + expected = str(data_dir / "ignored-devices.json") + assert result == expected + + +def test_trash_storage_path(setup_core: Path) -> None: + """Test trash_storage_path returns correct path.""" + CORE.config_path = str(setup_core / "configs" / "device.yaml") + + result = storage_json.trash_storage_path() + + expected = str(setup_core / "configs" / "trash") + assert result == expected + + +def test_archive_storage_path(setup_core: Path) -> None: + """Test archive_storage_path returns correct path.""" + CORE.config_path = str(setup_core / "configs" / "device.yaml") + + result = storage_json.archive_storage_path() + + expected = str(setup_core / "configs" / "archive") + assert result == expected + + +def test_storage_path_with_subdirectory(setup_core: Path) -> None: + """Test storage paths work correctly when config is in subdirectory.""" + subdir = setup_core / "configs" / "basement" + subdir.mkdir(parents=True, exist_ok=True) + CORE.config_path = str(subdir / "sensor.yaml") + + result = storage_json.storage_path() + + data_dir = Path(CORE.data_dir) + expected = str(data_dir / "storage" / "sensor.yaml.json") + assert result == expected + + +def test_storage_json_firmware_bin_path_property(setup_core: Path) -> None: + """Test StorageJSON firmware_bin_path property.""" + storage = storage_json.StorageJSON( + storage_version=1, + name="test_device", + friendly_name="Test Device", + comment=None, + esphome_version="2024.1.0", + src_version=None, + address="192.168.1.100", + web_port=80, + target_platform="ESP32", + build_path="build/test_device", + firmware_bin_path="/path/to/firmware.bin", + loaded_integrations={"wifi", "api"}, + loaded_platforms=set(), + no_mdns=False, + ) + + assert storage.firmware_bin_path == "/path/to/firmware.bin" + + +def test_storage_json_save_creates_directory( + setup_core: Path, tmp_path: Path, mock_write_file_if_changed: Mock +) -> None: + """Test StorageJSON.save creates storage directory if it doesn't exist.""" + storage_dir = tmp_path / "new_data" / "storage" + storage_file = storage_dir / "test.json" + + assert not storage_dir.exists() + + storage = storage_json.StorageJSON( + storage_version=1, + name="test", + friendly_name="Test", + comment=None, + esphome_version="2024.1.0", + src_version=None, + address="test.local", + web_port=None, + target_platform="ESP8266", + build_path=None, + firmware_bin_path=None, + loaded_integrations=set(), + loaded_platforms=set(), + no_mdns=False, + ) + + storage.save(str(storage_file)) + mock_write_file_if_changed.assert_called_once() + call_args = mock_write_file_if_changed.call_args[0] + assert call_args[0] == str(storage_file) + + +def test_storage_json_from_wizard(setup_core: Path) -> None: + """Test StorageJSON.from_wizard creates correct storage object.""" + storage = storage_json.StorageJSON.from_wizard( + name="my_device", + friendly_name="My Device", + address="my_device.local", + platform="ESP32", + ) + + assert storage.name == "my_device" + assert storage.friendly_name == "My Device" + assert storage.address == "my_device.local" + assert storage.target_platform == "ESP32" + assert storage.build_path is None + assert storage.firmware_bin_path is None + + +@pytest.mark.skipif(sys.platform == "win32", reason="HA addons don't run on Windows") +@patch("esphome.core.is_ha_addon") +def test_storage_paths_with_ha_addon(mock_is_ha_addon: bool, tmp_path: Path) -> None: + """Test storage paths when running as Home Assistant addon.""" + mock_is_ha_addon.return_value = True + + CORE.config_path = str(tmp_path / "test.yaml") + + result = storage_json.storage_path() + # When is_ha_addon is True, CORE.data_dir returns "/data" + # This is the standard mount point for HA addon containers + expected = str(Path("/data") / "storage" / "test.yaml.json") + assert result == expected + + result = storage_json.esphome_storage_path() + expected = str(Path("/data") / "esphome.json") + assert result == expected + + +def test_storage_json_as_dict() -> None: + """Test StorageJSON.as_dict returns correct dictionary.""" + storage = storage_json.StorageJSON( + storage_version=1, + name="test_device", + friendly_name="Test Device", + comment="Test comment", + esphome_version="2024.1.0", + src_version=1, + address="192.168.1.100", + web_port=80, + target_platform="ESP32", + build_path="/path/to/build", + firmware_bin_path="/path/to/firmware.bin", + loaded_integrations={"wifi", "api", "ota"}, + loaded_platforms={"sensor", "binary_sensor"}, + no_mdns=True, + framework="arduino", + core_platform="esp32", + ) + + result = storage.as_dict() + + assert result["storage_version"] == 1 + assert result["name"] == "test_device" + assert result["friendly_name"] == "Test Device" + assert result["comment"] == "Test comment" + assert result["esphome_version"] == "2024.1.0" + assert result["src_version"] == 1 + assert result["address"] == "192.168.1.100" + assert result["web_port"] == 80 + assert result["esp_platform"] == "ESP32" + assert result["build_path"] == "/path/to/build" + assert result["firmware_bin_path"] == "/path/to/firmware.bin" + assert "api" in result["loaded_integrations"] + assert "wifi" in result["loaded_integrations"] + assert "ota" in result["loaded_integrations"] + assert result["loaded_integrations"] == sorted( + ["wifi", "api", "ota"] + ) # Should be sorted + assert "sensor" in result["loaded_platforms"] + assert result["loaded_platforms"] == sorted( + ["sensor", "binary_sensor"] + ) # Should be sorted + assert result["no_mdns"] is True + assert result["framework"] == "arduino" + assert result["core_platform"] == "esp32" + + +def test_storage_json_to_json() -> None: + """Test StorageJSON.to_json returns valid JSON string.""" + storage = storage_json.StorageJSON( + storage_version=1, + name="test", + friendly_name="Test", + comment=None, + esphome_version="2024.1.0", + src_version=None, + address="test.local", + web_port=None, + target_platform="ESP8266", + build_path=None, + firmware_bin_path=None, + loaded_integrations=set(), + loaded_platforms=set(), + no_mdns=False, + ) + + json_str = storage.to_json() + + # Should be valid JSON + parsed = json.loads(json_str) + assert parsed["name"] == "test" + assert parsed["storage_version"] == 1 + + # Should end with newline + assert json_str.endswith("\n") + + +def test_storage_json_save(tmp_path: Path) -> None: + """Test StorageJSON.save writes file correctly.""" + storage = storage_json.StorageJSON( + storage_version=1, + name="test", + friendly_name="Test", + comment=None, + esphome_version="2024.1.0", + src_version=None, + address="test.local", + web_port=None, + target_platform="ESP32", + build_path=None, + firmware_bin_path=None, + loaded_integrations=set(), + loaded_platforms=set(), + no_mdns=False, + ) + + save_path = tmp_path / "test.json" + + with patch("esphome.storage_json.write_file_if_changed") as mock_write: + storage.save(str(save_path)) + mock_write.assert_called_once_with(str(save_path), storage.to_json()) + + +def test_storage_json_from_esphome_core(setup_core: Path) -> None: + """Test StorageJSON.from_esphome_core creates correct storage object.""" + # Mock CORE object + mock_core = MagicMock() + mock_core.name = "my_device" + mock_core.friendly_name = "My Device" + mock_core.comment = "A test device" + mock_core.address = "192.168.1.50" + mock_core.web_port = 8080 + mock_core.target_platform = "esp32" + mock_core.is_esp32 = True + mock_core.build_path = "/build/my_device" + mock_core.firmware_bin = "/build/my_device/firmware.bin" + mock_core.loaded_integrations = {"wifi", "api"} + mock_core.loaded_platforms = {"sensor"} + mock_core.config = {CONF_MDNS: {CONF_DISABLED: True}} + mock_core.target_framework = "esp-idf" + + with patch("esphome.components.esp32.get_esp32_variant") as mock_variant: + mock_variant.return_value = "ESP32-C3" + + result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None) + + assert result.name == "my_device" + assert result.friendly_name == "My Device" + assert result.comment == "A test device" + assert result.address == "192.168.1.50" + assert result.web_port == 8080 + assert result.target_platform == "ESP32-C3" + assert result.build_path == "/build/my_device" + assert result.firmware_bin_path == "/build/my_device/firmware.bin" + assert result.loaded_integrations == {"wifi", "api"} + assert result.loaded_platforms == {"sensor"} + assert result.no_mdns is True + assert result.framework == "esp-idf" + assert result.core_platform == "esp32" + + +def test_storage_json_from_esphome_core_mdns_enabled(setup_core: Path) -> None: + """Test from_esphome_core with mDNS enabled.""" + mock_core = MagicMock() + mock_core.name = "test" + mock_core.friendly_name = "Test" + mock_core.comment = None + mock_core.address = "test.local" + mock_core.web_port = None + mock_core.target_platform = "esp8266" + mock_core.is_esp32 = False + mock_core.build_path = "/build" + mock_core.firmware_bin = "/build/firmware.bin" + mock_core.loaded_integrations = set() + mock_core.loaded_platforms = set() + mock_core.config = {} # No MDNS config means enabled + mock_core.target_framework = "arduino" + + result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None) + + assert result.no_mdns is False + + +def test_storage_json_load_valid_file(tmp_path: Path) -> None: + """Test StorageJSON.load with valid JSON file.""" + storage_data = { + "storage_version": 1, + "name": "loaded_device", + "friendly_name": "Loaded Device", + "comment": "Loaded from file", + "esphome_version": "2024.1.0", + "src_version": 2, + "address": "10.0.0.1", + "web_port": 8080, + "esp_platform": "ESP32", + "build_path": "/loaded/build", + "firmware_bin_path": "/loaded/firmware.bin", + "loaded_integrations": ["wifi", "api"], + "loaded_platforms": ["sensor"], + "no_mdns": True, + "framework": "arduino", + "core_platform": "esp32", + } + + file_path = tmp_path / "storage.json" + file_path.write_text(json.dumps(storage_data)) + + result = storage_json.StorageJSON.load(str(file_path)) + + assert result is not None + assert result.name == "loaded_device" + assert result.friendly_name == "Loaded Device" + assert result.comment == "Loaded from file" + assert result.esphome_version == "2024.1.0" + assert result.src_version == 2 + assert result.address == "10.0.0.1" + assert result.web_port == 8080 + assert result.target_platform == "ESP32" + assert result.build_path == "/loaded/build" + assert result.firmware_bin_path == "/loaded/firmware.bin" + assert result.loaded_integrations == {"wifi", "api"} + assert result.loaded_platforms == {"sensor"} + assert result.no_mdns is True + assert result.framework == "arduino" + assert result.core_platform == "esp32" + + +def test_storage_json_load_invalid_file(tmp_path: Path) -> None: + """Test StorageJSON.load with invalid JSON file.""" + file_path = tmp_path / "invalid.json" + file_path.write_text("not valid json{") + + result = storage_json.StorageJSON.load(str(file_path)) + + assert result is None + + +def test_storage_json_load_nonexistent_file() -> None: + """Test StorageJSON.load with non-existent file.""" + result = storage_json.StorageJSON.load("/nonexistent/file.json") + + assert result is None + + +def test_storage_json_equality() -> None: + """Test StorageJSON equality comparison.""" + storage1 = storage_json.StorageJSON( + storage_version=1, + name="test", + friendly_name="Test", + comment=None, + esphome_version="2024.1.0", + src_version=1, + address="test.local", + web_port=80, + target_platform="ESP32", + build_path="/build", + firmware_bin_path="/firmware.bin", + loaded_integrations={"wifi"}, + loaded_platforms=set(), + no_mdns=False, + ) + + storage2 = storage_json.StorageJSON( + storage_version=1, + name="test", + friendly_name="Test", + comment=None, + esphome_version="2024.1.0", + src_version=1, + address="test.local", + web_port=80, + target_platform="ESP32", + build_path="/build", + firmware_bin_path="/firmware.bin", + loaded_integrations={"wifi"}, + loaded_platforms=set(), + no_mdns=False, + ) + + storage3 = storage_json.StorageJSON( + storage_version=1, + name="different", # Different name + friendly_name="Test", + comment=None, + esphome_version="2024.1.0", + src_version=1, + address="test.local", + web_port=80, + target_platform="ESP32", + build_path="/build", + firmware_bin_path="/firmware.bin", + loaded_integrations={"wifi"}, + loaded_platforms=set(), + no_mdns=False, + ) + + assert storage1 == storage2 + assert storage1 != storage3 + assert storage1 != "not a storage object" + + +def test_esphome_storage_json_as_dict() -> None: + """Test EsphomeStorageJSON.as_dict returns correct dictionary.""" + storage = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="secret123", + last_update_check="2024-01-15T10:30:00", + remote_version="2024.1.1", + ) + + result = storage.as_dict() + + assert result["storage_version"] == 1 + assert result["cookie_secret"] == "secret123" + assert result["last_update_check"] == "2024-01-15T10:30:00" + assert result["remote_version"] == "2024.1.1" + + +def test_esphome_storage_json_last_update_check_property() -> None: + """Test EsphomeStorageJSON.last_update_check property.""" + storage = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="secret", + last_update_check="2024-01-15T10:30:00", + remote_version=None, + ) + + # Test getter + result = storage.last_update_check + assert isinstance(result, datetime) + assert result.year == 2024 + assert result.month == 1 + assert result.day == 15 + assert result.hour == 10 + assert result.minute == 30 + + # Test setter + new_date = datetime(2024, 2, 20, 15, 45, 30) + storage.last_update_check = new_date + assert storage.last_update_check_str == "2024-02-20T15:45:30" + + +def test_esphome_storage_json_last_update_check_invalid() -> None: + """Test EsphomeStorageJSON.last_update_check with invalid date.""" + storage = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="secret", + last_update_check="invalid date", + remote_version=None, + ) + + result = storage.last_update_check + assert result is None + + +def test_esphome_storage_json_to_json() -> None: + """Test EsphomeStorageJSON.to_json returns valid JSON string.""" + storage = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="mysecret", + last_update_check="2024-01-15T10:30:00", + remote_version="2024.1.1", + ) + + json_str = storage.to_json() + + # Should be valid JSON + parsed = json.loads(json_str) + assert parsed["cookie_secret"] == "mysecret" + assert parsed["storage_version"] == 1 + + # Should end with newline + assert json_str.endswith("\n") + + +def test_esphome_storage_json_save(tmp_path: Path) -> None: + """Test EsphomeStorageJSON.save writes file correctly.""" + storage = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="secret", + last_update_check=None, + remote_version=None, + ) + + save_path = tmp_path / "esphome.json" + + with patch("esphome.storage_json.write_file_if_changed") as mock_write: + storage.save(str(save_path)) + mock_write.assert_called_once_with(str(save_path), storage.to_json()) + + +def test_esphome_storage_json_load_valid_file(tmp_path: Path) -> None: + """Test EsphomeStorageJSON.load with valid JSON file.""" + storage_data = { + "storage_version": 1, + "cookie_secret": "loaded_secret", + "last_update_check": "2024-01-20T14:30:00", + "remote_version": "2024.1.2", + } + + file_path = tmp_path / "esphome.json" + file_path.write_text(json.dumps(storage_data)) + + result = storage_json.EsphomeStorageJSON.load(str(file_path)) + + assert result is not None + assert result.storage_version == 1 + assert result.cookie_secret == "loaded_secret" + assert result.last_update_check_str == "2024-01-20T14:30:00" + assert result.remote_version == "2024.1.2" + + +def test_esphome_storage_json_load_invalid_file(tmp_path: Path) -> None: + """Test EsphomeStorageJSON.load with invalid JSON file.""" + file_path = tmp_path / "invalid.json" + file_path.write_text("not valid json{") + + result = storage_json.EsphomeStorageJSON.load(str(file_path)) + + assert result is None + + +def test_esphome_storage_json_load_nonexistent_file() -> None: + """Test EsphomeStorageJSON.load with non-existent file.""" + result = storage_json.EsphomeStorageJSON.load("/nonexistent/file.json") + + assert result is None + + +def test_esphome_storage_json_get_default() -> None: + """Test EsphomeStorageJSON.get_default creates default storage.""" + with patch("esphome.storage_json.os.urandom") as mock_urandom: + # Mock urandom to return predictable bytes + mock_urandom.return_value = b"test" * 16 # 64 bytes + + result = storage_json.EsphomeStorageJSON.get_default() + + assert result.storage_version == 1 + assert len(result.cookie_secret) == 128 # 64 bytes hex = 128 chars + assert result.last_update_check is None + assert result.remote_version is None + + +def test_esphome_storage_json_equality() -> None: + """Test EsphomeStorageJSON equality comparison.""" + storage1 = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="secret", + last_update_check="2024-01-15T10:30:00", + remote_version="2024.1.1", + ) + + storage2 = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="secret", + last_update_check="2024-01-15T10:30:00", + remote_version="2024.1.1", + ) + + storage3 = storage_json.EsphomeStorageJSON( + storage_version=1, + cookie_secret="different", # Different secret + last_update_check="2024-01-15T10:30:00", + remote_version="2024.1.1", + ) + + assert storage1 == storage2 + assert storage1 != storage3 + assert storage1 != "not a storage object" + + +def test_storage_json_load_legacy_esphomeyaml_version(tmp_path: Path) -> None: + """Test loading storage with legacy esphomeyaml_version field.""" + storage_data = { + "storage_version": 1, + "name": "legacy_device", + "friendly_name": "Legacy Device", + "esphomeyaml_version": "1.14.0", # Legacy field name + "address": "legacy.local", + "esp_platform": "ESP8266", + } + + file_path = tmp_path / "legacy.json" + file_path.write_text(json.dumps(storage_data)) + + result = storage_json.StorageJSON.load(str(file_path)) + + assert result is not None + assert result.esphome_version == "1.14.0" # Should map to esphome_version