mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-31 07:03:55 +00:00 
			
		
		
		
	Add additional test coverage ahead of Path conversion (#10700)
This commit is contained in:
		
				
					committed by
					
						 Jesse Hills
						Jesse Hills
					
				
			
			
				
	
			
			
			
						parent
						
							4b3a997a8e
						
					
				
				
					commit
					8890071360
				
			| @@ -9,8 +9,10 @@ not be part of a unit test suite. | ||||
|  | ||||
| """ | ||||
|  | ||||
| from collections.abc import Generator | ||||
| from pathlib import Path | ||||
| import sys | ||||
| from unittest.mock import Mock, patch | ||||
|  | ||||
| import pytest | ||||
|  | ||||
| @@ -43,3 +45,45 @@ def setup_core(tmp_path: Path) -> Path: | ||||
|     """Set up CORE with test paths.""" | ||||
|     CORE.config_path = str(tmp_path / "test.yaml") | ||||
|     return tmp_path | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_write_file_if_changed() -> Generator[Mock, None, None]: | ||||
|     """Mock write_file_if_changed for storage_json.""" | ||||
|     with patch("esphome.storage_json.write_file_if_changed") as mock: | ||||
|         yield mock | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_copy_file_if_changed() -> Generator[Mock, None, None]: | ||||
|     """Mock copy_file_if_changed for core.config.""" | ||||
|     with patch("esphome.core.config.copy_file_if_changed") as mock: | ||||
|         yield mock | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_run_platformio_cli() -> Generator[Mock, None, None]: | ||||
|     """Mock run_platformio_cli for platformio_api.""" | ||||
|     with patch("esphome.platformio_api.run_platformio_cli") as mock: | ||||
|         yield mock | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_run_platformio_cli_run() -> Generator[Mock, None, None]: | ||||
|     """Mock run_platformio_cli_run for platformio_api.""" | ||||
|     with patch("esphome.platformio_api.run_platformio_cli_run") as mock: | ||||
|         yield mock | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_decode_pc() -> Generator[Mock, None, None]: | ||||
|     """Mock _decode_pc for platformio_api.""" | ||||
|     with patch("esphome.platformio_api._decode_pc") as mock: | ||||
|         yield mock | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_run_external_command() -> Generator[Mock, None, None]: | ||||
|     """Mock run_external_command for platformio_api.""" | ||||
|     with patch("esphome.platformio_api.run_external_command") as mock: | ||||
|         yield mock | ||||
|   | ||||
| @@ -1,15 +1,34 @@ | ||||
| """Unit tests for core config functionality including areas and devices.""" | ||||
|  | ||||
| from collections.abc import Callable | ||||
| import os | ||||
| from pathlib import Path | ||||
| import types | ||||
| from typing import Any | ||||
| from unittest.mock import MagicMock, Mock, patch | ||||
|  | ||||
| import pytest | ||||
|  | ||||
| from esphome import config_validation as cv, core | ||||
| from esphome.const import CONF_AREA, CONF_AREAS, CONF_DEVICES | ||||
| from esphome.core import config | ||||
| from esphome.core.config import Area, validate_area_config | ||||
| from esphome.const import ( | ||||
|     CONF_AREA, | ||||
|     CONF_AREAS, | ||||
|     CONF_BUILD_PATH, | ||||
|     CONF_DEVICES, | ||||
|     CONF_ESPHOME, | ||||
|     CONF_NAME, | ||||
|     CONF_NAME_ADD_MAC_SUFFIX, | ||||
|     KEY_CORE, | ||||
| ) | ||||
| from esphome.core import CORE, config | ||||
| from esphome.core.config import ( | ||||
|     Area, | ||||
|     preload_core_config, | ||||
|     valid_include, | ||||
|     valid_project_name, | ||||
|     validate_area_config, | ||||
|     validate_hostname, | ||||
| ) | ||||
|  | ||||
| from .common import load_config_from_fixture | ||||
|  | ||||
| @@ -245,3 +264,307 @@ def test_add_platform_defines_priority() -> None: | ||||
|         f"_add_platform_defines priority ({config._add_platform_defines.priority}) must be lower than " | ||||
|         f"globals priority ({globals_to_code.priority}) to fix issue #10431 (sensor count bug with lambdas)" | ||||
|     ) | ||||
|  | ||||
|  | ||||
| def test_valid_include_with_angle_brackets() -> None: | ||||
|     """Test valid_include accepts angle bracket includes.""" | ||||
|     assert valid_include("<ArduinoJson.h>") == "<ArduinoJson.h>" | ||||
|  | ||||
|  | ||||
| def test_valid_include_with_valid_file(tmp_path: Path) -> None: | ||||
|     """Test valid_include accepts valid include files.""" | ||||
|     CORE.config_path = str(tmp_path / "test.yaml") | ||||
|     include_file = tmp_path / "include.h" | ||||
|     include_file.touch() | ||||
|  | ||||
|     assert valid_include(str(include_file)) == str(include_file) | ||||
|  | ||||
|  | ||||
| def test_valid_include_with_valid_directory(tmp_path: Path) -> None: | ||||
|     """Test valid_include accepts valid directories.""" | ||||
|     CORE.config_path = str(tmp_path / "test.yaml") | ||||
|     include_dir = tmp_path / "includes" | ||||
|     include_dir.mkdir() | ||||
|  | ||||
|     assert valid_include(str(include_dir)) == str(include_dir) | ||||
|  | ||||
|  | ||||
| def test_valid_include_invalid_extension(tmp_path: Path) -> None: | ||||
|     """Test valid_include rejects files with invalid extensions.""" | ||||
|     CORE.config_path = str(tmp_path / "test.yaml") | ||||
|     invalid_file = tmp_path / "file.txt" | ||||
|     invalid_file.touch() | ||||
|  | ||||
|     with pytest.raises(cv.Invalid, match="Include has invalid file extension"): | ||||
|         valid_include(str(invalid_file)) | ||||
|  | ||||
|  | ||||
| def test_valid_project_name_valid() -> None: | ||||
|     """Test valid_project_name accepts valid project names.""" | ||||
|     assert valid_project_name("esphome.my_project") == "esphome.my_project" | ||||
|  | ||||
|  | ||||
| def test_valid_project_name_no_namespace() -> None: | ||||
|     """Test valid_project_name rejects names without namespace.""" | ||||
|     with pytest.raises(cv.Invalid, match="project name needs to have a namespace"): | ||||
|         valid_project_name("my_project") | ||||
|  | ||||
|  | ||||
| def test_valid_project_name_multiple_dots() -> None: | ||||
|     """Test valid_project_name rejects names with multiple dots.""" | ||||
|     with pytest.raises(cv.Invalid, match="project name needs to have a namespace"): | ||||
|         valid_project_name("esphome.my.project") | ||||
|  | ||||
|  | ||||
| def test_validate_hostname_valid() -> None: | ||||
|     """Test validate_hostname accepts valid hostnames.""" | ||||
|     config = {CONF_NAME: "my-device", CONF_NAME_ADD_MAC_SUFFIX: False} | ||||
|     assert validate_hostname(config) == config | ||||
|  | ||||
|  | ||||
| def test_validate_hostname_too_long() -> None: | ||||
|     """Test validate_hostname rejects hostnames that are too long.""" | ||||
|     config = { | ||||
|         CONF_NAME: "a" * 32,  # 32 chars, max is 31 | ||||
|         CONF_NAME_ADD_MAC_SUFFIX: False, | ||||
|     } | ||||
|     with pytest.raises(cv.Invalid, match="Hostnames can only be 31 characters long"): | ||||
|         validate_hostname(config) | ||||
|  | ||||
|  | ||||
| def test_validate_hostname_too_long_with_mac_suffix() -> None: | ||||
|     """Test validate_hostname accounts for MAC suffix length.""" | ||||
|     config = { | ||||
|         CONF_NAME: "a" * 25,  # 25 chars, max is 24 with MAC suffix | ||||
|         CONF_NAME_ADD_MAC_SUFFIX: True, | ||||
|     } | ||||
|     with pytest.raises(cv.Invalid, match="Hostnames can only be 24 characters long"): | ||||
|         validate_hostname(config) | ||||
|  | ||||
|  | ||||
| def test_validate_hostname_with_underscore(caplog) -> None: | ||||
|     """Test validate_hostname warns about underscores.""" | ||||
|     config = {CONF_NAME: "my_device", CONF_NAME_ADD_MAC_SUFFIX: False} | ||||
|     assert validate_hostname(config) == config | ||||
|     assert ( | ||||
|         "Using the '_' (underscore) character in the hostname is discouraged" | ||||
|         in caplog.text | ||||
|     ) | ||||
|  | ||||
|  | ||||
| def test_preload_core_config_basic(setup_core: Path) -> None: | ||||
|     """Test preload_core_config sets basic CORE attributes.""" | ||||
|     config = { | ||||
|         CONF_ESPHOME: { | ||||
|             CONF_NAME: "test_device", | ||||
|         }, | ||||
|         "esp32": {}, | ||||
|     } | ||||
|     result = {} | ||||
|  | ||||
|     platform = preload_core_config(config, result) | ||||
|  | ||||
|     assert CORE.name == "test_device" | ||||
|     assert platform == "esp32" | ||||
|     assert KEY_CORE in CORE.data | ||||
|     assert CONF_BUILD_PATH in config[CONF_ESPHOME] | ||||
|  | ||||
|  | ||||
| def test_preload_core_config_with_build_path(setup_core: Path) -> None: | ||||
|     """Test preload_core_config uses provided build path.""" | ||||
|     config = { | ||||
|         CONF_ESPHOME: { | ||||
|             CONF_NAME: "test_device", | ||||
|             CONF_BUILD_PATH: "/custom/build/path", | ||||
|         }, | ||||
|         "esp8266": {}, | ||||
|     } | ||||
|     result = {} | ||||
|  | ||||
|     platform = preload_core_config(config, result) | ||||
|  | ||||
|     assert config[CONF_ESPHOME][CONF_BUILD_PATH] == "/custom/build/path" | ||||
|     assert platform == "esp8266" | ||||
|  | ||||
|  | ||||
| def test_preload_core_config_env_build_path(setup_core: Path) -> None: | ||||
|     """Test preload_core_config uses ESPHOME_BUILD_PATH env var.""" | ||||
|     config = { | ||||
|         CONF_ESPHOME: { | ||||
|             CONF_NAME: "test_device", | ||||
|         }, | ||||
|         "rp2040": {}, | ||||
|     } | ||||
|     result = {} | ||||
|  | ||||
|     with patch.dict(os.environ, {"ESPHOME_BUILD_PATH": "/env/build"}): | ||||
|         platform = preload_core_config(config, result) | ||||
|  | ||||
|     assert CONF_BUILD_PATH in config[CONF_ESPHOME] | ||||
|     assert "test_device" in config[CONF_ESPHOME][CONF_BUILD_PATH] | ||||
|     assert platform == "rp2040" | ||||
|  | ||||
|  | ||||
| def test_preload_core_config_no_platform(setup_core: Path) -> None: | ||||
|     """Test preload_core_config raises when no platform is specified.""" | ||||
|     config = { | ||||
|         CONF_ESPHOME: { | ||||
|             CONF_NAME: "test_device", | ||||
|         }, | ||||
|     } | ||||
|     result = {} | ||||
|  | ||||
|     # Mock _is_target_platform to avoid expensive component loading | ||||
|     with patch("esphome.core.config._is_target_platform") as mock_is_platform: | ||||
|         # Return True for known platforms | ||||
|         mock_is_platform.side_effect = lambda name: name in [ | ||||
|             "esp32", | ||||
|             "esp8266", | ||||
|             "rp2040", | ||||
|         ] | ||||
|  | ||||
|         with pytest.raises(cv.Invalid, match="Platform missing"): | ||||
|             preload_core_config(config, result) | ||||
|  | ||||
|  | ||||
| def test_preload_core_config_multiple_platforms(setup_core: Path) -> None: | ||||
|     """Test preload_core_config raises when multiple platforms are specified.""" | ||||
|     config = { | ||||
|         CONF_ESPHOME: { | ||||
|             CONF_NAME: "test_device", | ||||
|         }, | ||||
|         "esp32": {}, | ||||
|         "esp8266": {}, | ||||
|     } | ||||
|     result = {} | ||||
|  | ||||
|     # Mock _is_target_platform to avoid expensive component loading | ||||
|     with patch("esphome.core.config._is_target_platform") as mock_is_platform: | ||||
|         # Return True for known platforms | ||||
|         mock_is_platform.side_effect = lambda name: name in [ | ||||
|             "esp32", | ||||
|             "esp8266", | ||||
|             "rp2040", | ||||
|         ] | ||||
|  | ||||
|         with pytest.raises(cv.Invalid, match="Found multiple target platform blocks"): | ||||
|             preload_core_config(config, result) | ||||
|  | ||||
|  | ||||
| def test_include_file_header(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None: | ||||
|     """Test include_file adds include statement for header files.""" | ||||
|     src_file = tmp_path / "source.h" | ||||
|     src_file.write_text("// Header content") | ||||
|  | ||||
|     CORE.build_path = str(tmp_path / "build") | ||||
|  | ||||
|     with patch("esphome.core.config.cg") as mock_cg: | ||||
|         # Mock RawStatement to capture the text | ||||
|         mock_raw_statement = MagicMock() | ||||
|         mock_raw_statement.text = "" | ||||
|  | ||||
|         def raw_statement_side_effect(text): | ||||
|             mock_raw_statement.text = text | ||||
|             return mock_raw_statement | ||||
|  | ||||
|         mock_cg.RawStatement.side_effect = raw_statement_side_effect | ||||
|  | ||||
|         config.include_file(str(src_file), "test.h") | ||||
|  | ||||
|         mock_copy_file_if_changed.assert_called_once() | ||||
|         mock_cg.add_global.assert_called_once() | ||||
|         # Check that include statement was added | ||||
|         assert '#include "test.h"' in mock_raw_statement.text | ||||
|  | ||||
|  | ||||
| def test_include_file_cpp(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None: | ||||
|     """Test include_file does not add include for cpp files.""" | ||||
|     src_file = tmp_path / "source.cpp" | ||||
|     src_file.write_text("// CPP content") | ||||
|  | ||||
|     CORE.build_path = str(tmp_path / "build") | ||||
|  | ||||
|     with patch("esphome.core.config.cg") as mock_cg: | ||||
|         config.include_file(str(src_file), "test.cpp") | ||||
|  | ||||
|         mock_copy_file_if_changed.assert_called_once() | ||||
|         # Should not add include statement for .cpp files | ||||
|         mock_cg.add_global.assert_not_called() | ||||
|  | ||||
|  | ||||
| def test_get_usable_cpu_count() -> None: | ||||
|     """Test get_usable_cpu_count returns CPU count.""" | ||||
|     count = config.get_usable_cpu_count() | ||||
|     assert isinstance(count, int) | ||||
|     assert count > 0 | ||||
|  | ||||
|  | ||||
| def test_get_usable_cpu_count_with_process_cpu_count() -> None: | ||||
|     """Test get_usable_cpu_count uses process_cpu_count when available.""" | ||||
|     # Test with process_cpu_count (Python 3.13+) | ||||
|     # Create a mock os module with process_cpu_count | ||||
|  | ||||
|     mock_os = types.SimpleNamespace(process_cpu_count=lambda: 8, cpu_count=lambda: 4) | ||||
|  | ||||
|     with patch("esphome.core.config.os", mock_os): | ||||
|         # When process_cpu_count exists, it should be used | ||||
|         count = config.get_usable_cpu_count() | ||||
|         assert count == 8 | ||||
|  | ||||
|     # Test fallback to cpu_count when process_cpu_count not available | ||||
|     mock_os_no_process = types.SimpleNamespace(cpu_count=lambda: 4) | ||||
|  | ||||
|     with patch("esphome.core.config.os", mock_os_no_process): | ||||
|         count = config.get_usable_cpu_count() | ||||
|         assert count == 4 | ||||
|  | ||||
|  | ||||
| def test_list_target_platforms(tmp_path: Path) -> None: | ||||
|     """Test _list_target_platforms returns available platforms.""" | ||||
|     # Create mock components directory structure | ||||
|     components_dir = tmp_path / "components" | ||||
|     components_dir.mkdir() | ||||
|  | ||||
|     # Create platform and non-platform directories with __init__.py | ||||
|     platforms = ["esp32", "esp8266", "rp2040", "libretiny", "host"] | ||||
|     non_platforms = ["sensor"] | ||||
|  | ||||
|     for component in platforms + non_platforms: | ||||
|         component_dir = components_dir / component | ||||
|         component_dir.mkdir() | ||||
|         (component_dir / "__init__.py").touch() | ||||
|  | ||||
|     # Create a file (not a directory) | ||||
|     (components_dir / "README.md").touch() | ||||
|  | ||||
|     # Create a directory without __init__.py | ||||
|     (components_dir / "no_init").mkdir() | ||||
|  | ||||
|     # Mock Path(__file__).parents[1] to return our tmp_path | ||||
|     with patch("esphome.core.config.Path") as mock_path: | ||||
|         mock_file_path = MagicMock() | ||||
|         mock_file_path.parents = [MagicMock(), tmp_path] | ||||
|         mock_path.return_value = mock_file_path | ||||
|  | ||||
|         platforms = config._list_target_platforms() | ||||
|  | ||||
|     assert isinstance(platforms, list) | ||||
|     # Should include platform components | ||||
|     assert "esp32" in platforms | ||||
|     assert "esp8266" in platforms | ||||
|     assert "rp2040" in platforms | ||||
|     assert "libretiny" in platforms | ||||
|     assert "host" in platforms | ||||
|     # Should not include non-platform components | ||||
|     assert "sensor" not in platforms | ||||
|     assert "README.md" not in platforms | ||||
|     assert "no_init" not in platforms | ||||
|  | ||||
|  | ||||
| def test_is_target_platform() -> None: | ||||
|     """Test _is_target_platform identifies valid platforms.""" | ||||
|     assert config._is_target_platform("esp32") is True | ||||
|     assert config._is_target_platform("esp8266") is True | ||||
|     assert config._is_target_platform("rp2040") is True | ||||
|     assert config._is_target_platform("invalid_platform") is False | ||||
|     assert config._is_target_platform("api") is False  # Component but not platform | ||||
|   | ||||
| @@ -1,10 +1,16 @@ | ||||
| """Tests for platformio_api.py path functions.""" | ||||
|  | ||||
| import json | ||||
| import os | ||||
| from pathlib import Path | ||||
| from unittest.mock import patch | ||||
| import shutil | ||||
| from types import SimpleNamespace | ||||
| from unittest.mock import MagicMock, Mock, patch | ||||
|  | ||||
| import pytest | ||||
|  | ||||
| from esphome import platformio_api | ||||
| from esphome.core import CORE | ||||
| from esphome.core import CORE, EsphomeError | ||||
|  | ||||
|  | ||||
| def test_idedata_firmware_elf_path(setup_core: Path) -> None: | ||||
| @@ -104,7 +110,9 @@ def test_flash_image_dataclass() -> None: | ||||
|     assert image.offset == "0x10000" | ||||
|  | ||||
|  | ||||
| def test_load_idedata_returns_dict(setup_core: Path) -> None: | ||||
| def test_load_idedata_returns_dict( | ||||
|     setup_core: Path, mock_run_platformio_cli_run | ||||
| ) -> None: | ||||
|     """Test _load_idedata returns parsed idedata dict when successful.""" | ||||
|     CORE.build_path = str(setup_core / "build" / "test") | ||||
|     CORE.name = "test" | ||||
| @@ -118,12 +126,511 @@ def test_load_idedata_returns_dict(setup_core: Path) -> None: | ||||
|     idedata_path.parent.mkdir(parents=True, exist_ok=True) | ||||
|     idedata_path.write_text('{"prog_path": "/test/firmware.elf"}') | ||||
|  | ||||
|     with patch("esphome.platformio_api.run_platformio_cli_run") as mock_run: | ||||
|         mock_run.return_value = '{"prog_path": "/test/firmware.elf"}' | ||||
|     mock_run_platformio_cli_run.return_value = '{"prog_path": "/test/firmware.elf"}' | ||||
|  | ||||
|         config = {"name": "test"} | ||||
|         result = platformio_api._load_idedata(config) | ||||
|     config = {"name": "test"} | ||||
|     result = platformio_api._load_idedata(config) | ||||
|  | ||||
|     assert result is not None | ||||
|     assert isinstance(result, dict) | ||||
|     assert result["prog_path"] == "/test/firmware.elf" | ||||
|  | ||||
|  | ||||
| def test_load_idedata_uses_cache_when_valid( | ||||
|     setup_core: Path, mock_run_platformio_cli_run: Mock | ||||
| ) -> None: | ||||
|     """Test _load_idedata uses cached data when unchanged.""" | ||||
|     CORE.build_path = str(setup_core / "build" / "test") | ||||
|     CORE.name = "test" | ||||
|  | ||||
|     # Create platformio.ini | ||||
|     platformio_ini = setup_core / "build" / "test" / "platformio.ini" | ||||
|     platformio_ini.parent.mkdir(parents=True, exist_ok=True) | ||||
|     platformio_ini.write_text("content") | ||||
|  | ||||
|     # Create idedata cache file that's newer | ||||
|     idedata_path = setup_core / ".esphome" / "idedata" / "test.json" | ||||
|     idedata_path.parent.mkdir(parents=True, exist_ok=True) | ||||
|     idedata_path.write_text('{"prog_path": "/cached/firmware.elf"}') | ||||
|  | ||||
|     # Make idedata newer than platformio.ini | ||||
|     platformio_ini_mtime = platformio_ini.stat().st_mtime | ||||
|     os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1)) | ||||
|  | ||||
|     config = {"name": "test"} | ||||
|     result = platformio_api._load_idedata(config) | ||||
|  | ||||
|     # Should not call _run_idedata since cache is valid | ||||
|     mock_run_platformio_cli_run.assert_not_called() | ||||
|  | ||||
|     assert result["prog_path"] == "/cached/firmware.elf" | ||||
|  | ||||
|  | ||||
| def test_load_idedata_regenerates_when_platformio_ini_newer( | ||||
|     setup_core: Path, mock_run_platformio_cli_run: Mock | ||||
| ) -> None: | ||||
|     """Test _load_idedata regenerates when platformio.ini is newer.""" | ||||
|     CORE.build_path = str(setup_core / "build" / "test") | ||||
|     CORE.name = "test" | ||||
|  | ||||
|     # Create idedata cache file first | ||||
|     idedata_path = setup_core / ".esphome" / "idedata" / "test.json" | ||||
|     idedata_path.parent.mkdir(parents=True, exist_ok=True) | ||||
|     idedata_path.write_text('{"prog_path": "/old/firmware.elf"}') | ||||
|  | ||||
|     # Create platformio.ini that's newer | ||||
|     idedata_mtime = idedata_path.stat().st_mtime | ||||
|     platformio_ini = setup_core / "build" / "test" / "platformio.ini" | ||||
|     platformio_ini.parent.mkdir(parents=True, exist_ok=True) | ||||
|     platformio_ini.write_text("content") | ||||
|     # Make platformio.ini newer than idedata | ||||
|     os.utime(platformio_ini, (idedata_mtime + 1, idedata_mtime + 1)) | ||||
|  | ||||
|     # Mock platformio to return new data | ||||
|     new_data = {"prog_path": "/new/firmware.elf"} | ||||
|     mock_run_platformio_cli_run.return_value = json.dumps(new_data) | ||||
|  | ||||
|     config = {"name": "test"} | ||||
|     result = platformio_api._load_idedata(config) | ||||
|  | ||||
|     # Should call _run_idedata since platformio.ini is newer | ||||
|     mock_run_platformio_cli_run.assert_called_once() | ||||
|  | ||||
|     assert result["prog_path"] == "/new/firmware.elf" | ||||
|  | ||||
|  | ||||
| def test_load_idedata_regenerates_on_corrupted_cache( | ||||
|     setup_core: Path, mock_run_platformio_cli_run: Mock | ||||
| ) -> None: | ||||
|     """Test _load_idedata regenerates when cache file is corrupted.""" | ||||
|     CORE.build_path = str(setup_core / "build" / "test") | ||||
|     CORE.name = "test" | ||||
|  | ||||
|     # Create platformio.ini | ||||
|     platformio_ini = setup_core / "build" / "test" / "platformio.ini" | ||||
|     platformio_ini.parent.mkdir(parents=True, exist_ok=True) | ||||
|     platformio_ini.write_text("content") | ||||
|  | ||||
|     # Create corrupted idedata cache file | ||||
|     idedata_path = setup_core / ".esphome" / "idedata" / "test.json" | ||||
|     idedata_path.parent.mkdir(parents=True, exist_ok=True) | ||||
|     idedata_path.write_text('{"prog_path": invalid json') | ||||
|  | ||||
|     # Make idedata newer so it would be used if valid | ||||
|     platformio_ini_mtime = platformio_ini.stat().st_mtime | ||||
|     os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1)) | ||||
|  | ||||
|     # Mock platformio to return new data | ||||
|     new_data = {"prog_path": "/new/firmware.elf"} | ||||
|     mock_run_platformio_cli_run.return_value = json.dumps(new_data) | ||||
|  | ||||
|     config = {"name": "test"} | ||||
|     result = platformio_api._load_idedata(config) | ||||
|  | ||||
|     # Should call _run_idedata since cache is corrupted | ||||
|     mock_run_platformio_cli_run.assert_called_once() | ||||
|  | ||||
|     assert result["prog_path"] == "/new/firmware.elf" | ||||
|  | ||||
|  | ||||
| def test_run_idedata_parses_json_from_output( | ||||
|     setup_core: Path, mock_run_platformio_cli_run: Mock | ||||
| ) -> None: | ||||
|     """Test _run_idedata extracts JSON from platformio output.""" | ||||
|     config = {"name": "test"} | ||||
|  | ||||
|     expected_data = { | ||||
|         "prog_path": "/path/to/firmware.elf", | ||||
|         "cc_path": "/path/to/gcc", | ||||
|         "extra": {"flash_images": []}, | ||||
|     } | ||||
|  | ||||
|     # Simulate platformio output with JSON embedded | ||||
|     mock_run_platformio_cli_run.return_value = ( | ||||
|         f"Some preamble\n{json.dumps(expected_data)}\nSome postamble" | ||||
|     ) | ||||
|  | ||||
|     result = platformio_api._run_idedata(config) | ||||
|  | ||||
|     assert result == expected_data | ||||
|  | ||||
|  | ||||
| def test_run_idedata_raises_on_no_json( | ||||
|     setup_core: Path, mock_run_platformio_cli_run: Mock | ||||
| ) -> None: | ||||
|     """Test _run_idedata raises EsphomeError when no JSON found.""" | ||||
|     config = {"name": "test"} | ||||
|  | ||||
|     mock_run_platformio_cli_run.return_value = "No JSON in this output" | ||||
|  | ||||
|     with pytest.raises(EsphomeError): | ||||
|         platformio_api._run_idedata(config) | ||||
|  | ||||
|  | ||||
| def test_run_idedata_raises_on_invalid_json( | ||||
|     setup_core: Path, mock_run_platformio_cli_run: Mock | ||||
| ) -> None: | ||||
|     """Test _run_idedata raises on malformed JSON.""" | ||||
|     config = {"name": "test"} | ||||
|     mock_run_platformio_cli_run.return_value = '{"invalid": json"}' | ||||
|  | ||||
|     # The ValueError from json.loads is re-raised | ||||
|     with pytest.raises(ValueError): | ||||
|         platformio_api._run_idedata(config) | ||||
|  | ||||
|  | ||||
| def test_run_platformio_cli_sets_environment_variables( | ||||
|     setup_core: Path, mock_run_external_command: Mock | ||||
| ) -> None: | ||||
|     """Test run_platformio_cli sets correct environment variables.""" | ||||
|     CORE.build_path = str(setup_core / "build" / "test") | ||||
|  | ||||
|     with patch.dict(os.environ, {}, clear=False): | ||||
|         mock_run_external_command.return_value = 0 | ||||
|         platformio_api.run_platformio_cli("test", "arg") | ||||
|  | ||||
|         # Check environment variables were set | ||||
|         assert os.environ["PLATFORMIO_FORCE_COLOR"] == "true" | ||||
|         assert ( | ||||
|             setup_core / "build" / "test" | ||||
|             in Path(os.environ["PLATFORMIO_BUILD_DIR"]).parents | ||||
|             or Path(os.environ["PLATFORMIO_BUILD_DIR"]) == setup_core / "build" / "test" | ||||
|         ) | ||||
|         assert "PLATFORMIO_LIBDEPS_DIR" in os.environ | ||||
|         assert "PYTHONWARNINGS" in os.environ | ||||
|  | ||||
|         # Check command was called correctly | ||||
|         mock_run_external_command.assert_called_once() | ||||
|         args = mock_run_external_command.call_args[0] | ||||
|         assert "platformio" in args | ||||
|         assert "test" in args | ||||
|         assert "arg" in args | ||||
|  | ||||
|  | ||||
| def test_run_platformio_cli_run_builds_command( | ||||
|     setup_core: Path, mock_run_platformio_cli: Mock | ||||
| ) -> None: | ||||
|     """Test run_platformio_cli_run builds correct command.""" | ||||
|     CORE.build_path = str(setup_core / "build" / "test") | ||||
|     mock_run_platformio_cli.return_value = 0 | ||||
|  | ||||
|     config = {"name": "test"} | ||||
|     platformio_api.run_platformio_cli_run(config, True, "extra", "args") | ||||
|  | ||||
|     mock_run_platformio_cli.assert_called_once_with( | ||||
|         "run", "-d", CORE.build_path, "-v", "extra", "args" | ||||
|     ) | ||||
|  | ||||
|  | ||||
| def test_run_compile(setup_core: Path, mock_run_platformio_cli_run: Mock) -> None: | ||||
|     """Test run_compile with process limit.""" | ||||
|     from esphome.const import CONF_COMPILE_PROCESS_LIMIT, CONF_ESPHOME | ||||
|  | ||||
|     CORE.build_path = str(setup_core / "build" / "test") | ||||
|     config = {CONF_ESPHOME: {CONF_COMPILE_PROCESS_LIMIT: 4}} | ||||
|     mock_run_platformio_cli_run.return_value = 0 | ||||
|  | ||||
|     platformio_api.run_compile(config, verbose=True) | ||||
|  | ||||
|     mock_run_platformio_cli_run.assert_called_once_with(config, True, "-j4") | ||||
|  | ||||
|  | ||||
| def test_get_idedata_caches_result( | ||||
|     setup_core: Path, mock_run_platformio_cli_run: Mock | ||||
| ) -> None: | ||||
|     """Test get_idedata caches result in CORE.data.""" | ||||
|     from esphome.const import KEY_CORE | ||||
|  | ||||
|     CORE.build_path = str(setup_core / "build" / "test") | ||||
|     CORE.name = "test" | ||||
|     CORE.data[KEY_CORE] = {} | ||||
|  | ||||
|     # Create platformio.ini to avoid regeneration | ||||
|     platformio_ini = setup_core / "build" / "test" / "platformio.ini" | ||||
|     platformio_ini.parent.mkdir(parents=True, exist_ok=True) | ||||
|     platformio_ini.write_text("content") | ||||
|  | ||||
|     # Mock platformio to return data | ||||
|     idedata = {"prog_path": "/test/firmware.elf"} | ||||
|     mock_run_platformio_cli_run.return_value = json.dumps(idedata) | ||||
|  | ||||
|     config = {"name": "test"} | ||||
|  | ||||
|     # First call should load and cache | ||||
|     result1 = platformio_api.get_idedata(config) | ||||
|     mock_run_platformio_cli_run.assert_called_once() | ||||
|  | ||||
|     # Second call should use cache from CORE.data | ||||
|     result2 = platformio_api.get_idedata(config) | ||||
|     mock_run_platformio_cli_run.assert_called_once()  # Still only called once | ||||
|  | ||||
|     assert result1 is result2 | ||||
|     assert isinstance(result1, platformio_api.IDEData) | ||||
|     assert result1.firmware_elf_path == "/test/firmware.elf" | ||||
|  | ||||
|  | ||||
| def test_idedata_addr2line_path_windows(setup_core: Path) -> None: | ||||
|     """Test IDEData.addr2line_path on Windows.""" | ||||
|     raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "C:\\tools\\gcc.exe"} | ||||
|     idedata = platformio_api.IDEData(raw_data) | ||||
|  | ||||
|     result = idedata.addr2line_path | ||||
|     assert result == "C:\\tools\\addr2line.exe" | ||||
|  | ||||
|  | ||||
| def test_idedata_addr2line_path_unix(setup_core: Path) -> None: | ||||
|     """Test IDEData.addr2line_path on Unix.""" | ||||
|     raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "/usr/bin/gcc"} | ||||
|     idedata = platformio_api.IDEData(raw_data) | ||||
|  | ||||
|     result = idedata.addr2line_path | ||||
|     assert result == "/usr/bin/addr2line" | ||||
|  | ||||
|  | ||||
| def test_patch_structhash(setup_core: Path) -> None: | ||||
|     """Test patch_structhash monkey patches platformio functions.""" | ||||
|     # Create simple namespace objects to act as modules | ||||
|     mock_cli = SimpleNamespace() | ||||
|     mock_helpers = SimpleNamespace() | ||||
|     mock_run = SimpleNamespace(cli=mock_cli, helpers=mock_helpers) | ||||
|  | ||||
|     # Mock platformio modules | ||||
|     with patch.dict( | ||||
|         "sys.modules", | ||||
|         { | ||||
|             "platformio.run.cli": mock_cli, | ||||
|             "platformio.run.helpers": mock_helpers, | ||||
|             "platformio.run": mock_run, | ||||
|             "platformio.project.helpers": MagicMock(), | ||||
|             "platformio.fs": MagicMock(), | ||||
|             "platformio": MagicMock(), | ||||
|         }, | ||||
|     ): | ||||
|         # Call patch_structhash | ||||
|         platformio_api.patch_structhash() | ||||
|  | ||||
|         # Verify both modules had clean_build_dir patched | ||||
|         # Check that clean_build_dir was set on both modules | ||||
|         assert hasattr(mock_cli, "clean_build_dir") | ||||
|         assert hasattr(mock_helpers, "clean_build_dir") | ||||
|  | ||||
|         # Verify they got the same function assigned | ||||
|         assert mock_cli.clean_build_dir is mock_helpers.clean_build_dir | ||||
|  | ||||
|         # Verify it's a real function (not a Mock) | ||||
|         assert callable(mock_cli.clean_build_dir) | ||||
|         assert mock_cli.clean_build_dir.__name__ == "patched_clean_build_dir" | ||||
|  | ||||
|  | ||||
| def test_patched_clean_build_dir_removes_outdated(setup_core: Path) -> None: | ||||
|     """Test patched_clean_build_dir removes build dir when platformio.ini is newer.""" | ||||
|     build_dir = setup_core / "build" | ||||
|     build_dir.mkdir() | ||||
|     platformio_ini = setup_core / "platformio.ini" | ||||
|     platformio_ini.write_text("config") | ||||
|  | ||||
|     # Make platformio.ini newer than build_dir | ||||
|     build_mtime = build_dir.stat().st_mtime | ||||
|     os.utime(platformio_ini, (build_mtime + 1, build_mtime + 1)) | ||||
|  | ||||
|     # Track if directory was removed | ||||
|     removed_paths: list[str] = [] | ||||
|  | ||||
|     def track_rmtree(path: str) -> None: | ||||
|         removed_paths.append(path) | ||||
|         shutil.rmtree(path) | ||||
|  | ||||
|     # Create mock modules that patch_structhash expects | ||||
|     mock_cli = SimpleNamespace() | ||||
|     mock_helpers = SimpleNamespace() | ||||
|     mock_project_helpers = MagicMock() | ||||
|     mock_project_helpers.get_project_dir.return_value = str(setup_core) | ||||
|     mock_fs = SimpleNamespace(rmtree=track_rmtree) | ||||
|  | ||||
|     with patch.dict( | ||||
|         "sys.modules", | ||||
|         { | ||||
|             "platformio": SimpleNamespace(fs=mock_fs), | ||||
|             "platformio.fs": mock_fs, | ||||
|             "platformio.project.helpers": mock_project_helpers, | ||||
|             "platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers), | ||||
|             "platformio.run.cli": mock_cli, | ||||
|             "platformio.run.helpers": mock_helpers, | ||||
|         }, | ||||
|     ): | ||||
|         # Call patch_structhash to install the patched function | ||||
|         platformio_api.patch_structhash() | ||||
|  | ||||
|         # Call the patched function | ||||
|         mock_helpers.clean_build_dir(str(build_dir), []) | ||||
|  | ||||
|         # Verify directory was removed and recreated | ||||
|         assert len(removed_paths) == 1 | ||||
|         assert removed_paths[0] == str(build_dir) | ||||
|         assert build_dir.exists()  # makedirs recreated it | ||||
|  | ||||
|  | ||||
| def test_patched_clean_build_dir_keeps_updated(setup_core: Path) -> None: | ||||
|     """Test patched_clean_build_dir keeps build dir when it's up to date.""" | ||||
|     build_dir = setup_core / "build" | ||||
|     build_dir.mkdir() | ||||
|     test_file = build_dir / "test.txt" | ||||
|     test_file.write_text("test content") | ||||
|  | ||||
|     platformio_ini = setup_core / "platformio.ini" | ||||
|     platformio_ini.write_text("config") | ||||
|  | ||||
|     # Make build_dir newer than platformio.ini | ||||
|     ini_mtime = platformio_ini.stat().st_mtime | ||||
|     os.utime(build_dir, (ini_mtime + 1, ini_mtime + 1)) | ||||
|  | ||||
|     # Track if rmtree is called | ||||
|     removed_paths: list[str] = [] | ||||
|  | ||||
|     def track_rmtree(path: str) -> None: | ||||
|         removed_paths.append(path) | ||||
|  | ||||
|     # Create mock modules | ||||
|     mock_cli = SimpleNamespace() | ||||
|     mock_helpers = SimpleNamespace() | ||||
|     mock_project_helpers = MagicMock() | ||||
|     mock_project_helpers.get_project_dir.return_value = str(setup_core) | ||||
|     mock_fs = SimpleNamespace(rmtree=track_rmtree) | ||||
|  | ||||
|     with patch.dict( | ||||
|         "sys.modules", | ||||
|         { | ||||
|             "platformio": SimpleNamespace(fs=mock_fs), | ||||
|             "platformio.fs": mock_fs, | ||||
|             "platformio.project.helpers": mock_project_helpers, | ||||
|             "platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers), | ||||
|             "platformio.run.cli": mock_cli, | ||||
|             "platformio.run.helpers": mock_helpers, | ||||
|         }, | ||||
|     ): | ||||
|         # Call patch_structhash to install the patched function | ||||
|         platformio_api.patch_structhash() | ||||
|  | ||||
|         # Call the patched function | ||||
|         mock_helpers.clean_build_dir(str(build_dir), []) | ||||
|  | ||||
|         # Verify rmtree was NOT called | ||||
|         assert len(removed_paths) == 0 | ||||
|  | ||||
|         # Verify directory and file still exist | ||||
|         assert build_dir.exists() | ||||
|         assert test_file.exists() | ||||
|         assert test_file.read_text() == "test content" | ||||
|  | ||||
|  | ||||
| def test_patched_clean_build_dir_creates_missing(setup_core: Path) -> None: | ||||
|     """Test patched_clean_build_dir creates build dir when it doesn't exist.""" | ||||
|     build_dir = setup_core / "build" | ||||
|     platformio_ini = setup_core / "platformio.ini" | ||||
|     platformio_ini.write_text("config") | ||||
|  | ||||
|     # Ensure build_dir doesn't exist | ||||
|     assert not build_dir.exists() | ||||
|  | ||||
|     # Track if rmtree is called | ||||
|     removed_paths: list[str] = [] | ||||
|  | ||||
|     def track_rmtree(path: str) -> None: | ||||
|         removed_paths.append(path) | ||||
|  | ||||
|     # Create mock modules | ||||
|     mock_cli = SimpleNamespace() | ||||
|     mock_helpers = SimpleNamespace() | ||||
|     mock_project_helpers = MagicMock() | ||||
|     mock_project_helpers.get_project_dir.return_value = str(setup_core) | ||||
|     mock_fs = SimpleNamespace(rmtree=track_rmtree) | ||||
|  | ||||
|     with patch.dict( | ||||
|         "sys.modules", | ||||
|         { | ||||
|             "platformio": SimpleNamespace(fs=mock_fs), | ||||
|             "platformio.fs": mock_fs, | ||||
|             "platformio.project.helpers": mock_project_helpers, | ||||
|             "platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers), | ||||
|             "platformio.run.cli": mock_cli, | ||||
|             "platformio.run.helpers": mock_helpers, | ||||
|         }, | ||||
|     ): | ||||
|         # Call patch_structhash to install the patched function | ||||
|         platformio_api.patch_structhash() | ||||
|  | ||||
|         # Call the patched function | ||||
|         mock_helpers.clean_build_dir(str(build_dir), []) | ||||
|  | ||||
|         # Verify rmtree was NOT called | ||||
|         assert len(removed_paths) == 0 | ||||
|  | ||||
|         # Verify directory was created | ||||
|         assert build_dir.exists() | ||||
|  | ||||
|  | ||||
| def test_process_stacktrace_esp8266_exception(setup_core: Path, caplog) -> None: | ||||
|     """Test process_stacktrace handles ESP8266 exceptions.""" | ||||
|     config = {"name": "test"} | ||||
|  | ||||
|     # Test exception type parsing | ||||
|     line = "Exception (28):" | ||||
|     backtrace_state = False | ||||
|  | ||||
|     result = platformio_api.process_stacktrace(config, line, backtrace_state) | ||||
|  | ||||
|     assert "Access to invalid address: LOAD (wild pointer?)" in caplog.text | ||||
|     assert result is False | ||||
|  | ||||
|  | ||||
| def test_process_stacktrace_esp8266_backtrace( | ||||
|     setup_core: Path, mock_decode_pc: Mock | ||||
| ) -> None: | ||||
|     """Test process_stacktrace handles ESP8266 multi-line backtrace.""" | ||||
|     config = {"name": "test"} | ||||
|  | ||||
|     # Start of backtrace | ||||
|     line1 = ">>>stack>>>" | ||||
|     state = platformio_api.process_stacktrace(config, line1, False) | ||||
|     assert state is True | ||||
|  | ||||
|     # Backtrace content with addresses | ||||
|     line2 = "40201234 40205678" | ||||
|     state = platformio_api.process_stacktrace(config, line2, state) | ||||
|     assert state is True | ||||
|     assert mock_decode_pc.call_count == 2 | ||||
|  | ||||
|     # End of backtrace | ||||
|     line3 = "<<<stack<<<" | ||||
|     state = platformio_api.process_stacktrace(config, line3, state) | ||||
|     assert state is False | ||||
|  | ||||
|  | ||||
| def test_process_stacktrace_esp32_backtrace( | ||||
|     setup_core: Path, mock_decode_pc: Mock | ||||
| ) -> None: | ||||
|     """Test process_stacktrace handles ESP32 single-line backtrace.""" | ||||
|     config = {"name": "test"} | ||||
|  | ||||
|     line = "Backtrace: 0x40081234:0x3ffb1234 0x40085678:0x3ffb5678" | ||||
|     state = platformio_api.process_stacktrace(config, line, False) | ||||
|  | ||||
|     # Should decode both addresses | ||||
|     assert mock_decode_pc.call_count == 2 | ||||
|     mock_decode_pc.assert_any_call(config, "40081234") | ||||
|     mock_decode_pc.assert_any_call(config, "40085678") | ||||
|     assert state is False | ||||
|  | ||||
|  | ||||
| def test_process_stacktrace_bad_alloc( | ||||
|     setup_core: Path, mock_decode_pc: Mock, caplog | ||||
| ) -> None: | ||||
|     """Test process_stacktrace handles bad alloc messages.""" | ||||
|     config = {"name": "test"} | ||||
|  | ||||
|     line = "last failed alloc call: 40201234(512)" | ||||
|     state = platformio_api.process_stacktrace(config, line, False) | ||||
|  | ||||
|     assert "Memory allocation of 512 bytes failed at 40201234" in caplog.text | ||||
|     mock_decode_pc.assert_called_once_with(config, "40201234") | ||||
|     assert state is False | ||||
|   | ||||
| @@ -1,12 +1,15 @@ | ||||
| """Tests for storage_json.py path functions.""" | ||||
|  | ||||
| from datetime import datetime | ||||
| import json | ||||
| from pathlib import Path | ||||
| import sys | ||||
| from unittest.mock import patch | ||||
| from unittest.mock import MagicMock, Mock, patch | ||||
|  | ||||
| import pytest | ||||
|  | ||||
| from esphome import storage_json | ||||
| from esphome.const import CONF_DISABLED, CONF_MDNS | ||||
| from esphome.core import CORE | ||||
|  | ||||
|  | ||||
| @@ -115,7 +118,9 @@ def test_storage_json_firmware_bin_path_property(setup_core: Path) -> None: | ||||
|     assert storage.firmware_bin_path == "/path/to/firmware.bin" | ||||
|  | ||||
|  | ||||
| def test_storage_json_save_creates_directory(setup_core: Path, tmp_path: Path) -> None: | ||||
| def test_storage_json_save_creates_directory( | ||||
|     setup_core: Path, tmp_path: Path, mock_write_file_if_changed: Mock | ||||
| ) -> None: | ||||
|     """Test StorageJSON.save creates storage directory if it doesn't exist.""" | ||||
|     storage_dir = tmp_path / "new_data" / "storage" | ||||
|     storage_file = storage_dir / "test.json" | ||||
| @@ -139,11 +144,10 @@ def test_storage_json_save_creates_directory(setup_core: Path, tmp_path: Path) - | ||||
|         no_mdns=False, | ||||
|     ) | ||||
|  | ||||
|     with patch("esphome.storage_json.write_file_if_changed") as mock_write: | ||||
|         storage.save(str(storage_file)) | ||||
|         mock_write.assert_called_once() | ||||
|         call_args = mock_write.call_args[0] | ||||
|         assert call_args[0] == str(storage_file) | ||||
|     storage.save(str(storage_file)) | ||||
|     mock_write_file_if_changed.assert_called_once() | ||||
|     call_args = mock_write_file_if_changed.call_args[0] | ||||
|     assert call_args[0] == str(storage_file) | ||||
|  | ||||
|  | ||||
| def test_storage_json_from_wizard(setup_core: Path) -> None: | ||||
| @@ -180,3 +184,477 @@ def test_storage_paths_with_ha_addon(mock_is_ha_addon: bool, tmp_path: Path) -> | ||||
|     result = storage_json.esphome_storage_path() | ||||
|     expected = str(Path("/data") / "esphome.json") | ||||
|     assert result == expected | ||||
|  | ||||
|  | ||||
| def test_storage_json_as_dict() -> None: | ||||
|     """Test StorageJSON.as_dict returns correct dictionary.""" | ||||
|     storage = storage_json.StorageJSON( | ||||
|         storage_version=1, | ||||
|         name="test_device", | ||||
|         friendly_name="Test Device", | ||||
|         comment="Test comment", | ||||
|         esphome_version="2024.1.0", | ||||
|         src_version=1, | ||||
|         address="192.168.1.100", | ||||
|         web_port=80, | ||||
|         target_platform="ESP32", | ||||
|         build_path="/path/to/build", | ||||
|         firmware_bin_path="/path/to/firmware.bin", | ||||
|         loaded_integrations={"wifi", "api", "ota"}, | ||||
|         loaded_platforms={"sensor", "binary_sensor"}, | ||||
|         no_mdns=True, | ||||
|         framework="arduino", | ||||
|         core_platform="esp32", | ||||
|     ) | ||||
|  | ||||
|     result = storage.as_dict() | ||||
|  | ||||
|     assert result["storage_version"] == 1 | ||||
|     assert result["name"] == "test_device" | ||||
|     assert result["friendly_name"] == "Test Device" | ||||
|     assert result["comment"] == "Test comment" | ||||
|     assert result["esphome_version"] == "2024.1.0" | ||||
|     assert result["src_version"] == 1 | ||||
|     assert result["address"] == "192.168.1.100" | ||||
|     assert result["web_port"] == 80 | ||||
|     assert result["esp_platform"] == "ESP32" | ||||
|     assert result["build_path"] == "/path/to/build" | ||||
|     assert result["firmware_bin_path"] == "/path/to/firmware.bin" | ||||
|     assert "api" in result["loaded_integrations"] | ||||
|     assert "wifi" in result["loaded_integrations"] | ||||
|     assert "ota" in result["loaded_integrations"] | ||||
|     assert result["loaded_integrations"] == sorted( | ||||
|         ["wifi", "api", "ota"] | ||||
|     )  # Should be sorted | ||||
|     assert "sensor" in result["loaded_platforms"] | ||||
|     assert result["loaded_platforms"] == sorted( | ||||
|         ["sensor", "binary_sensor"] | ||||
|     )  # Should be sorted | ||||
|     assert result["no_mdns"] is True | ||||
|     assert result["framework"] == "arduino" | ||||
|     assert result["core_platform"] == "esp32" | ||||
|  | ||||
|  | ||||
| def test_storage_json_to_json() -> None: | ||||
|     """Test StorageJSON.to_json returns valid JSON string.""" | ||||
|     storage = storage_json.StorageJSON( | ||||
|         storage_version=1, | ||||
|         name="test", | ||||
|         friendly_name="Test", | ||||
|         comment=None, | ||||
|         esphome_version="2024.1.0", | ||||
|         src_version=None, | ||||
|         address="test.local", | ||||
|         web_port=None, | ||||
|         target_platform="ESP8266", | ||||
|         build_path=None, | ||||
|         firmware_bin_path=None, | ||||
|         loaded_integrations=set(), | ||||
|         loaded_platforms=set(), | ||||
|         no_mdns=False, | ||||
|     ) | ||||
|  | ||||
|     json_str = storage.to_json() | ||||
|  | ||||
|     # Should be valid JSON | ||||
|     parsed = json.loads(json_str) | ||||
|     assert parsed["name"] == "test" | ||||
|     assert parsed["storage_version"] == 1 | ||||
|  | ||||
|     # Should end with newline | ||||
|     assert json_str.endswith("\n") | ||||
|  | ||||
|  | ||||
| def test_storage_json_save(tmp_path: Path) -> None: | ||||
|     """Test StorageJSON.save writes file correctly.""" | ||||
|     storage = storage_json.StorageJSON( | ||||
|         storage_version=1, | ||||
|         name="test", | ||||
|         friendly_name="Test", | ||||
|         comment=None, | ||||
|         esphome_version="2024.1.0", | ||||
|         src_version=None, | ||||
|         address="test.local", | ||||
|         web_port=None, | ||||
|         target_platform="ESP32", | ||||
|         build_path=None, | ||||
|         firmware_bin_path=None, | ||||
|         loaded_integrations=set(), | ||||
|         loaded_platforms=set(), | ||||
|         no_mdns=False, | ||||
|     ) | ||||
|  | ||||
|     save_path = tmp_path / "test.json" | ||||
|  | ||||
|     with patch("esphome.storage_json.write_file_if_changed") as mock_write: | ||||
|         storage.save(str(save_path)) | ||||
|         mock_write.assert_called_once_with(str(save_path), storage.to_json()) | ||||
|  | ||||
|  | ||||
| def test_storage_json_from_esphome_core(setup_core: Path) -> None: | ||||
|     """Test StorageJSON.from_esphome_core creates correct storage object.""" | ||||
|     # Mock CORE object | ||||
|     mock_core = MagicMock() | ||||
|     mock_core.name = "my_device" | ||||
|     mock_core.friendly_name = "My Device" | ||||
|     mock_core.comment = "A test device" | ||||
|     mock_core.address = "192.168.1.50" | ||||
|     mock_core.web_port = 8080 | ||||
|     mock_core.target_platform = "esp32" | ||||
|     mock_core.is_esp32 = True | ||||
|     mock_core.build_path = "/build/my_device" | ||||
|     mock_core.firmware_bin = "/build/my_device/firmware.bin" | ||||
|     mock_core.loaded_integrations = {"wifi", "api"} | ||||
|     mock_core.loaded_platforms = {"sensor"} | ||||
|     mock_core.config = {CONF_MDNS: {CONF_DISABLED: True}} | ||||
|     mock_core.target_framework = "esp-idf" | ||||
|  | ||||
|     with patch("esphome.components.esp32.get_esp32_variant") as mock_variant: | ||||
|         mock_variant.return_value = "ESP32-C3" | ||||
|  | ||||
|         result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None) | ||||
|  | ||||
|     assert result.name == "my_device" | ||||
|     assert result.friendly_name == "My Device" | ||||
|     assert result.comment == "A test device" | ||||
|     assert result.address == "192.168.1.50" | ||||
|     assert result.web_port == 8080 | ||||
|     assert result.target_platform == "ESP32-C3" | ||||
|     assert result.build_path == "/build/my_device" | ||||
|     assert result.firmware_bin_path == "/build/my_device/firmware.bin" | ||||
|     assert result.loaded_integrations == {"wifi", "api"} | ||||
|     assert result.loaded_platforms == {"sensor"} | ||||
|     assert result.no_mdns is True | ||||
|     assert result.framework == "esp-idf" | ||||
|     assert result.core_platform == "esp32" | ||||
|  | ||||
|  | ||||
| def test_storage_json_from_esphome_core_mdns_enabled(setup_core: Path) -> None: | ||||
|     """Test from_esphome_core with mDNS enabled.""" | ||||
|     mock_core = MagicMock() | ||||
|     mock_core.name = "test" | ||||
|     mock_core.friendly_name = "Test" | ||||
|     mock_core.comment = None | ||||
|     mock_core.address = "test.local" | ||||
|     mock_core.web_port = None | ||||
|     mock_core.target_platform = "esp8266" | ||||
|     mock_core.is_esp32 = False | ||||
|     mock_core.build_path = "/build" | ||||
|     mock_core.firmware_bin = "/build/firmware.bin" | ||||
|     mock_core.loaded_integrations = set() | ||||
|     mock_core.loaded_platforms = set() | ||||
|     mock_core.config = {}  # No MDNS config means enabled | ||||
|     mock_core.target_framework = "arduino" | ||||
|  | ||||
|     result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None) | ||||
|  | ||||
|     assert result.no_mdns is False | ||||
|  | ||||
|  | ||||
| def test_storage_json_load_valid_file(tmp_path: Path) -> None: | ||||
|     """Test StorageJSON.load with valid JSON file.""" | ||||
|     storage_data = { | ||||
|         "storage_version": 1, | ||||
|         "name": "loaded_device", | ||||
|         "friendly_name": "Loaded Device", | ||||
|         "comment": "Loaded from file", | ||||
|         "esphome_version": "2024.1.0", | ||||
|         "src_version": 2, | ||||
|         "address": "10.0.0.1", | ||||
|         "web_port": 8080, | ||||
|         "esp_platform": "ESP32", | ||||
|         "build_path": "/loaded/build", | ||||
|         "firmware_bin_path": "/loaded/firmware.bin", | ||||
|         "loaded_integrations": ["wifi", "api"], | ||||
|         "loaded_platforms": ["sensor"], | ||||
|         "no_mdns": True, | ||||
|         "framework": "arduino", | ||||
|         "core_platform": "esp32", | ||||
|     } | ||||
|  | ||||
|     file_path = tmp_path / "storage.json" | ||||
|     file_path.write_text(json.dumps(storage_data)) | ||||
|  | ||||
|     result = storage_json.StorageJSON.load(str(file_path)) | ||||
|  | ||||
|     assert result is not None | ||||
|     assert result.name == "loaded_device" | ||||
|     assert result.friendly_name == "Loaded Device" | ||||
|     assert result.comment == "Loaded from file" | ||||
|     assert result.esphome_version == "2024.1.0" | ||||
|     assert result.src_version == 2 | ||||
|     assert result.address == "10.0.0.1" | ||||
|     assert result.web_port == 8080 | ||||
|     assert result.target_platform == "ESP32" | ||||
|     assert result.build_path == "/loaded/build" | ||||
|     assert result.firmware_bin_path == "/loaded/firmware.bin" | ||||
|     assert result.loaded_integrations == {"wifi", "api"} | ||||
|     assert result.loaded_platforms == {"sensor"} | ||||
|     assert result.no_mdns is True | ||||
|     assert result.framework == "arduino" | ||||
|     assert result.core_platform == "esp32" | ||||
|  | ||||
|  | ||||
| def test_storage_json_load_invalid_file(tmp_path: Path) -> None: | ||||
|     """Test StorageJSON.load with invalid JSON file.""" | ||||
|     file_path = tmp_path / "invalid.json" | ||||
|     file_path.write_text("not valid json{") | ||||
|  | ||||
|     result = storage_json.StorageJSON.load(str(file_path)) | ||||
|  | ||||
|     assert result is None | ||||
|  | ||||
|  | ||||
| def test_storage_json_load_nonexistent_file() -> None: | ||||
|     """Test StorageJSON.load with non-existent file.""" | ||||
|     result = storage_json.StorageJSON.load("/nonexistent/file.json") | ||||
|  | ||||
|     assert result is None | ||||
|  | ||||
|  | ||||
| def test_storage_json_equality() -> None: | ||||
|     """Test StorageJSON equality comparison.""" | ||||
|     storage1 = storage_json.StorageJSON( | ||||
|         storage_version=1, | ||||
|         name="test", | ||||
|         friendly_name="Test", | ||||
|         comment=None, | ||||
|         esphome_version="2024.1.0", | ||||
|         src_version=1, | ||||
|         address="test.local", | ||||
|         web_port=80, | ||||
|         target_platform="ESP32", | ||||
|         build_path="/build", | ||||
|         firmware_bin_path="/firmware.bin", | ||||
|         loaded_integrations={"wifi"}, | ||||
|         loaded_platforms=set(), | ||||
|         no_mdns=False, | ||||
|     ) | ||||
|  | ||||
|     storage2 = storage_json.StorageJSON( | ||||
|         storage_version=1, | ||||
|         name="test", | ||||
|         friendly_name="Test", | ||||
|         comment=None, | ||||
|         esphome_version="2024.1.0", | ||||
|         src_version=1, | ||||
|         address="test.local", | ||||
|         web_port=80, | ||||
|         target_platform="ESP32", | ||||
|         build_path="/build", | ||||
|         firmware_bin_path="/firmware.bin", | ||||
|         loaded_integrations={"wifi"}, | ||||
|         loaded_platforms=set(), | ||||
|         no_mdns=False, | ||||
|     ) | ||||
|  | ||||
|     storage3 = storage_json.StorageJSON( | ||||
|         storage_version=1, | ||||
|         name="different",  # Different name | ||||
|         friendly_name="Test", | ||||
|         comment=None, | ||||
|         esphome_version="2024.1.0", | ||||
|         src_version=1, | ||||
|         address="test.local", | ||||
|         web_port=80, | ||||
|         target_platform="ESP32", | ||||
|         build_path="/build", | ||||
|         firmware_bin_path="/firmware.bin", | ||||
|         loaded_integrations={"wifi"}, | ||||
|         loaded_platforms=set(), | ||||
|         no_mdns=False, | ||||
|     ) | ||||
|  | ||||
|     assert storage1 == storage2 | ||||
|     assert storage1 != storage3 | ||||
|     assert storage1 != "not a storage object" | ||||
|  | ||||
|  | ||||
| def test_esphome_storage_json_as_dict() -> None: | ||||
|     """Test EsphomeStorageJSON.as_dict returns correct dictionary.""" | ||||
|     storage = storage_json.EsphomeStorageJSON( | ||||
|         storage_version=1, | ||||
|         cookie_secret="secret123", | ||||
|         last_update_check="2024-01-15T10:30:00", | ||||
|         remote_version="2024.1.1", | ||||
|     ) | ||||
|  | ||||
|     result = storage.as_dict() | ||||
|  | ||||
|     assert result["storage_version"] == 1 | ||||
|     assert result["cookie_secret"] == "secret123" | ||||
|     assert result["last_update_check"] == "2024-01-15T10:30:00" | ||||
|     assert result["remote_version"] == "2024.1.1" | ||||
|  | ||||
|  | ||||
| def test_esphome_storage_json_last_update_check_property() -> None: | ||||
|     """Test EsphomeStorageJSON.last_update_check property.""" | ||||
|     storage = storage_json.EsphomeStorageJSON( | ||||
|         storage_version=1, | ||||
|         cookie_secret="secret", | ||||
|         last_update_check="2024-01-15T10:30:00", | ||||
|         remote_version=None, | ||||
|     ) | ||||
|  | ||||
|     # Test getter | ||||
|     result = storage.last_update_check | ||||
|     assert isinstance(result, datetime) | ||||
|     assert result.year == 2024 | ||||
|     assert result.month == 1 | ||||
|     assert result.day == 15 | ||||
|     assert result.hour == 10 | ||||
|     assert result.minute == 30 | ||||
|  | ||||
|     # Test setter | ||||
|     new_date = datetime(2024, 2, 20, 15, 45, 30) | ||||
|     storage.last_update_check = new_date | ||||
|     assert storage.last_update_check_str == "2024-02-20T15:45:30" | ||||
|  | ||||
|  | ||||
| def test_esphome_storage_json_last_update_check_invalid() -> None: | ||||
|     """Test EsphomeStorageJSON.last_update_check with invalid date.""" | ||||
|     storage = storage_json.EsphomeStorageJSON( | ||||
|         storage_version=1, | ||||
|         cookie_secret="secret", | ||||
|         last_update_check="invalid date", | ||||
|         remote_version=None, | ||||
|     ) | ||||
|  | ||||
|     result = storage.last_update_check | ||||
|     assert result is None | ||||
|  | ||||
|  | ||||
| def test_esphome_storage_json_to_json() -> None: | ||||
|     """Test EsphomeStorageJSON.to_json returns valid JSON string.""" | ||||
|     storage = storage_json.EsphomeStorageJSON( | ||||
|         storage_version=1, | ||||
|         cookie_secret="mysecret", | ||||
|         last_update_check="2024-01-15T10:30:00", | ||||
|         remote_version="2024.1.1", | ||||
|     ) | ||||
|  | ||||
|     json_str = storage.to_json() | ||||
|  | ||||
|     # Should be valid JSON | ||||
|     parsed = json.loads(json_str) | ||||
|     assert parsed["cookie_secret"] == "mysecret" | ||||
|     assert parsed["storage_version"] == 1 | ||||
|  | ||||
|     # Should end with newline | ||||
|     assert json_str.endswith("\n") | ||||
|  | ||||
|  | ||||
| def test_esphome_storage_json_save(tmp_path: Path) -> None: | ||||
|     """Test EsphomeStorageJSON.save writes file correctly.""" | ||||
|     storage = storage_json.EsphomeStorageJSON( | ||||
|         storage_version=1, | ||||
|         cookie_secret="secret", | ||||
|         last_update_check=None, | ||||
|         remote_version=None, | ||||
|     ) | ||||
|  | ||||
|     save_path = tmp_path / "esphome.json" | ||||
|  | ||||
|     with patch("esphome.storage_json.write_file_if_changed") as mock_write: | ||||
|         storage.save(str(save_path)) | ||||
|         mock_write.assert_called_once_with(str(save_path), storage.to_json()) | ||||
|  | ||||
|  | ||||
| def test_esphome_storage_json_load_valid_file(tmp_path: Path) -> None: | ||||
|     """Test EsphomeStorageJSON.load with valid JSON file.""" | ||||
|     storage_data = { | ||||
|         "storage_version": 1, | ||||
|         "cookie_secret": "loaded_secret", | ||||
|         "last_update_check": "2024-01-20T14:30:00", | ||||
|         "remote_version": "2024.1.2", | ||||
|     } | ||||
|  | ||||
|     file_path = tmp_path / "esphome.json" | ||||
|     file_path.write_text(json.dumps(storage_data)) | ||||
|  | ||||
|     result = storage_json.EsphomeStorageJSON.load(str(file_path)) | ||||
|  | ||||
|     assert result is not None | ||||
|     assert result.storage_version == 1 | ||||
|     assert result.cookie_secret == "loaded_secret" | ||||
|     assert result.last_update_check_str == "2024-01-20T14:30:00" | ||||
|     assert result.remote_version == "2024.1.2" | ||||
|  | ||||
|  | ||||
| def test_esphome_storage_json_load_invalid_file(tmp_path: Path) -> None: | ||||
|     """Test EsphomeStorageJSON.load with invalid JSON file.""" | ||||
|     file_path = tmp_path / "invalid.json" | ||||
|     file_path.write_text("not valid json{") | ||||
|  | ||||
|     result = storage_json.EsphomeStorageJSON.load(str(file_path)) | ||||
|  | ||||
|     assert result is None | ||||
|  | ||||
|  | ||||
| def test_esphome_storage_json_load_nonexistent_file() -> None: | ||||
|     """Test EsphomeStorageJSON.load with non-existent file.""" | ||||
|     result = storage_json.EsphomeStorageJSON.load("/nonexistent/file.json") | ||||
|  | ||||
|     assert result is None | ||||
|  | ||||
|  | ||||
| def test_esphome_storage_json_get_default() -> None: | ||||
|     """Test EsphomeStorageJSON.get_default creates default storage.""" | ||||
|     with patch("esphome.storage_json.os.urandom") as mock_urandom: | ||||
|         # Mock urandom to return predictable bytes | ||||
|         mock_urandom.return_value = b"test" * 16  # 64 bytes | ||||
|  | ||||
|         result = storage_json.EsphomeStorageJSON.get_default() | ||||
|  | ||||
|     assert result.storage_version == 1 | ||||
|     assert len(result.cookie_secret) == 128  # 64 bytes hex = 128 chars | ||||
|     assert result.last_update_check is None | ||||
|     assert result.remote_version is None | ||||
|  | ||||
|  | ||||
| def test_esphome_storage_json_equality() -> None: | ||||
|     """Test EsphomeStorageJSON equality comparison.""" | ||||
|     storage1 = storage_json.EsphomeStorageJSON( | ||||
|         storage_version=1, | ||||
|         cookie_secret="secret", | ||||
|         last_update_check="2024-01-15T10:30:00", | ||||
|         remote_version="2024.1.1", | ||||
|     ) | ||||
|  | ||||
|     storage2 = storage_json.EsphomeStorageJSON( | ||||
|         storage_version=1, | ||||
|         cookie_secret="secret", | ||||
|         last_update_check="2024-01-15T10:30:00", | ||||
|         remote_version="2024.1.1", | ||||
|     ) | ||||
|  | ||||
|     storage3 = storage_json.EsphomeStorageJSON( | ||||
|         storage_version=1, | ||||
|         cookie_secret="different",  # Different secret | ||||
|         last_update_check="2024-01-15T10:30:00", | ||||
|         remote_version="2024.1.1", | ||||
|     ) | ||||
|  | ||||
|     assert storage1 == storage2 | ||||
|     assert storage1 != storage3 | ||||
|     assert storage1 != "not a storage object" | ||||
|  | ||||
|  | ||||
| def test_storage_json_load_legacy_esphomeyaml_version(tmp_path: Path) -> None: | ||||
|     """Test loading storage with legacy esphomeyaml_version field.""" | ||||
|     storage_data = { | ||||
|         "storage_version": 1, | ||||
|         "name": "legacy_device", | ||||
|         "friendly_name": "Legacy Device", | ||||
|         "esphomeyaml_version": "1.14.0",  # Legacy field name | ||||
|         "address": "legacy.local", | ||||
|         "esp_platform": "ESP8266", | ||||
|     } | ||||
|  | ||||
|     file_path = tmp_path / "legacy.json" | ||||
|     file_path.write_text(json.dumps(storage_data)) | ||||
|  | ||||
|     result = storage_json.StorageJSON.load(str(file_path)) | ||||
|  | ||||
|     assert result is not None | ||||
|     assert result.esphome_version == "1.14.0"  # Should map to esphome_version | ||||
|   | ||||
		Reference in New Issue
	
	Block a user