mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-31 07:03:55 +00:00 
			
		
		
		
	Merge branch 'dev' into dashboard_dns_lookup_delay
This commit is contained in:
		
							
								
								
									
										188
									
								
								tests/unit_tests/build_gen/test_platformio.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										188
									
								
								tests/unit_tests/build_gen/test_platformio.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,188 @@ | ||||
| """Tests for esphome.build_gen.platformio module.""" | ||||
|  | ||||
| from __future__ import annotations | ||||
|  | ||||
| from collections.abc import Generator | ||||
| from pathlib import Path | ||||
| from unittest.mock import MagicMock, patch | ||||
|  | ||||
| import pytest | ||||
|  | ||||
| from esphome.build_gen import platformio | ||||
| from esphome.core import CORE | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_update_storage_json() -> Generator[MagicMock]: | ||||
|     """Mock update_storage_json for all tests.""" | ||||
|     with patch("esphome.build_gen.platformio.update_storage_json") as mock: | ||||
|         yield mock | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_write_file_if_changed() -> Generator[MagicMock]: | ||||
|     """Mock write_file_if_changed for tests.""" | ||||
|     with patch("esphome.build_gen.platformio.write_file_if_changed") as mock: | ||||
|         yield mock | ||||
|  | ||||
|  | ||||
| def test_write_ini_creates_new_file( | ||||
|     tmp_path: Path, mock_update_storage_json: MagicMock | ||||
| ) -> None: | ||||
|     """Test write_ini creates a new platformio.ini file.""" | ||||
|     CORE.build_path = str(tmp_path) | ||||
|  | ||||
|     content = """ | ||||
| [env:test] | ||||
| platform = espressif32 | ||||
| board = esp32dev | ||||
| framework = arduino | ||||
| """ | ||||
|  | ||||
|     platformio.write_ini(content) | ||||
|  | ||||
|     ini_file = tmp_path / "platformio.ini" | ||||
|     assert ini_file.exists() | ||||
|  | ||||
|     file_content = ini_file.read_text() | ||||
|     assert content in file_content | ||||
|     assert platformio.INI_AUTO_GENERATE_BEGIN in file_content | ||||
|     assert platformio.INI_AUTO_GENERATE_END in file_content | ||||
|  | ||||
|  | ||||
| def test_write_ini_updates_existing_file( | ||||
|     tmp_path: Path, mock_update_storage_json: MagicMock | ||||
| ) -> None: | ||||
|     """Test write_ini updates existing platformio.ini file.""" | ||||
|     CORE.build_path = str(tmp_path) | ||||
|  | ||||
|     # Create existing file with custom content | ||||
|     ini_file = tmp_path / "platformio.ini" | ||||
|     existing_content = f""" | ||||
| ; Custom header | ||||
| [platformio] | ||||
| default_envs = test | ||||
|  | ||||
| {platformio.INI_AUTO_GENERATE_BEGIN} | ||||
| ; Old auto-generated content | ||||
| [env:old] | ||||
| platform = old | ||||
| {platformio.INI_AUTO_GENERATE_END} | ||||
|  | ||||
| ; Custom footer | ||||
| """ | ||||
|     ini_file.write_text(existing_content) | ||||
|  | ||||
|     # New content to write | ||||
|     new_content = """ | ||||
| [env:test] | ||||
| platform = espressif32 | ||||
| board = esp32dev | ||||
| framework = arduino | ||||
| """ | ||||
|  | ||||
|     platformio.write_ini(new_content) | ||||
|  | ||||
|     file_content = ini_file.read_text() | ||||
|  | ||||
|     # Check that custom parts are preserved | ||||
|     assert "; Custom header" in file_content | ||||
|     assert "[platformio]" in file_content | ||||
|     assert "default_envs = test" in file_content | ||||
|     assert "; Custom footer" in file_content | ||||
|  | ||||
|     # Check that new content replaced old auto-generated content | ||||
|     assert new_content in file_content | ||||
|     assert "[env:old]" not in file_content | ||||
|     assert "platform = old" not in file_content | ||||
|  | ||||
|  | ||||
| def test_write_ini_preserves_custom_sections( | ||||
|     tmp_path: Path, mock_update_storage_json: MagicMock | ||||
| ) -> None: | ||||
|     """Test write_ini preserves custom sections outside auto-generate markers.""" | ||||
|     CORE.build_path = str(tmp_path) | ||||
|  | ||||
|     # Create existing file with multiple custom sections | ||||
|     ini_file = tmp_path / "platformio.ini" | ||||
|     existing_content = f""" | ||||
| [platformio] | ||||
| src_dir = . | ||||
| include_dir = . | ||||
|  | ||||
| [common] | ||||
| lib_deps = | ||||
|     Wire | ||||
|     SPI | ||||
|  | ||||
| {platformio.INI_AUTO_GENERATE_BEGIN} | ||||
| [env:old] | ||||
| platform = old | ||||
| {platformio.INI_AUTO_GENERATE_END} | ||||
|  | ||||
| [env:custom] | ||||
| upload_speed = 921600 | ||||
| monitor_speed = 115200 | ||||
| """ | ||||
|     ini_file.write_text(existing_content) | ||||
|  | ||||
|     new_content = "[env:auto]\nplatform = new" | ||||
|  | ||||
|     platformio.write_ini(new_content) | ||||
|  | ||||
|     file_content = ini_file.read_text() | ||||
|  | ||||
|     # All custom sections should be preserved | ||||
|     assert "[platformio]" in file_content | ||||
|     assert "src_dir = ." in file_content | ||||
|     assert "[common]" in file_content | ||||
|     assert "lib_deps" in file_content | ||||
|     assert "[env:custom]" in file_content | ||||
|     assert "upload_speed = 921600" in file_content | ||||
|  | ||||
|     # New auto-generated content should replace old | ||||
|     assert "[env:auto]" in file_content | ||||
|     assert "platform = new" in file_content | ||||
|     assert "[env:old]" not in file_content | ||||
|  | ||||
|  | ||||
| def test_write_ini_no_change_when_content_same( | ||||
|     tmp_path: Path, | ||||
|     mock_update_storage_json: MagicMock, | ||||
|     mock_write_file_if_changed: MagicMock, | ||||
| ) -> None: | ||||
|     """Test write_ini doesn't rewrite file when content is unchanged.""" | ||||
|     CORE.build_path = str(tmp_path) | ||||
|  | ||||
|     content = "[env:test]\nplatform = esp32" | ||||
|     full_content = ( | ||||
|         f"{platformio.INI_BASE_FORMAT[0]}" | ||||
|         f"{platformio.INI_AUTO_GENERATE_BEGIN}\n" | ||||
|         f"{content}" | ||||
|         f"{platformio.INI_AUTO_GENERATE_END}" | ||||
|         f"{platformio.INI_BASE_FORMAT[1]}" | ||||
|     ) | ||||
|  | ||||
|     ini_file = tmp_path / "platformio.ini" | ||||
|     ini_file.write_text(full_content) | ||||
|  | ||||
|     mock_write_file_if_changed.return_value = False  # Indicate no change | ||||
|     platformio.write_ini(content) | ||||
|  | ||||
|     # write_file_if_changed should be called with the same content | ||||
|     mock_write_file_if_changed.assert_called_once() | ||||
|     call_args = mock_write_file_if_changed.call_args[0] | ||||
|     assert call_args[0] == str(ini_file) | ||||
|     assert content in call_args[1] | ||||
|  | ||||
|  | ||||
| def test_write_ini_calls_update_storage_json( | ||||
|     tmp_path: Path, mock_update_storage_json: MagicMock | ||||
| ) -> None: | ||||
|     """Test write_ini calls update_storage_json.""" | ||||
|     CORE.build_path = str(tmp_path) | ||||
|  | ||||
|     content = "[env:test]\nplatform = esp32" | ||||
|  | ||||
|     platformio.write_ini(content) | ||||
|     mock_update_storage_json.assert_called_once() | ||||
| @@ -35,6 +35,22 @@ from .common import load_config_from_fixture | ||||
| FIXTURES_DIR = Path(__file__).parent.parent / "fixtures" / "core" / "config" | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_cg_with_include_capture() -> tuple[Mock, list[str]]: | ||||
|     """Mock code generation with include capture.""" | ||||
|     includes_added: list[str] = [] | ||||
|  | ||||
|     with patch("esphome.core.config.cg") as mock_cg: | ||||
|         mock_raw_statement = MagicMock() | ||||
|  | ||||
|         def capture_include(text: str) -> MagicMock: | ||||
|             includes_added.append(text) | ||||
|             return mock_raw_statement | ||||
|  | ||||
|         mock_cg.RawStatement.side_effect = capture_include | ||||
|         yield mock_cg, includes_added | ||||
|  | ||||
|  | ||||
| def test_validate_area_config_with_string() -> None: | ||||
|     """Test that string area config is converted to structured format.""" | ||||
|     result = validate_area_config("Living Room") | ||||
| @@ -568,3 +584,262 @@ def test_is_target_platform() -> None: | ||||
|     assert config._is_target_platform("rp2040") is True | ||||
|     assert config._is_target_platform("invalid_platform") is False | ||||
|     assert config._is_target_platform("api") is False  # Component but not platform | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_add_includes_with_single_file( | ||||
|     tmp_path: Path, | ||||
|     mock_copy_file_if_changed: Mock, | ||||
|     mock_cg_with_include_capture: tuple[Mock, list[str]], | ||||
| ) -> None: | ||||
|     """Test add_includes copies a single header file to build directory.""" | ||||
|     CORE.config_path = str(tmp_path / "config.yaml") | ||||
|     CORE.build_path = str(tmp_path / "build") | ||||
|     os.makedirs(CORE.build_path, exist_ok=True) | ||||
|  | ||||
|     # Create include file | ||||
|     include_file = tmp_path / "my_header.h" | ||||
|     include_file.write_text("#define MY_CONSTANT 42") | ||||
|  | ||||
|     mock_cg, includes_added = mock_cg_with_include_capture | ||||
|  | ||||
|     await config.add_includes([str(include_file)]) | ||||
|  | ||||
|     # Verify copy_file_if_changed was called to copy the file | ||||
|     # Note: add_includes adds files to a src/ subdirectory | ||||
|     mock_copy_file_if_changed.assert_called_once_with( | ||||
|         str(include_file), str(Path(CORE.build_path) / "src" / "my_header.h") | ||||
|     ) | ||||
|  | ||||
|     # Verify include statement was added | ||||
|     assert any('#include "my_header.h"' in inc for inc in includes_added) | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| @pytest.mark.skipif(os.name == "nt", reason="Unix-specific test") | ||||
| async def test_add_includes_with_directory_unix( | ||||
|     tmp_path: Path, | ||||
|     mock_copy_file_if_changed: Mock, | ||||
|     mock_cg_with_include_capture: tuple[Mock, list[str]], | ||||
| ) -> None: | ||||
|     """Test add_includes copies all files from a directory on Unix.""" | ||||
|     CORE.config_path = str(tmp_path / "config.yaml") | ||||
|     CORE.build_path = str(tmp_path / "build") | ||||
|     os.makedirs(CORE.build_path, exist_ok=True) | ||||
|  | ||||
|     # Create include directory with files | ||||
|     include_dir = tmp_path / "includes" | ||||
|     include_dir.mkdir() | ||||
|     (include_dir / "header1.h").write_text("#define HEADER1") | ||||
|     (include_dir / "header2.hpp").write_text("#define HEADER2") | ||||
|     (include_dir / "source.cpp").write_text("// Implementation") | ||||
|     (include_dir / "README.md").write_text( | ||||
|         "# Documentation" | ||||
|     )  # Should be copied but not included | ||||
|  | ||||
|     # Create subdirectory with files | ||||
|     subdir = include_dir / "subdir" | ||||
|     subdir.mkdir() | ||||
|     (subdir / "nested.h").write_text("#define NESTED") | ||||
|  | ||||
|     mock_cg, includes_added = mock_cg_with_include_capture | ||||
|  | ||||
|     await config.add_includes([str(include_dir)]) | ||||
|  | ||||
|     # Verify copy_file_if_changed was called for all files | ||||
|     assert mock_copy_file_if_changed.call_count == 5  # 4 code files + 1 README | ||||
|  | ||||
|     # Verify include statements were added for valid extensions | ||||
|     include_strings = " ".join(includes_added) | ||||
|     assert "includes/header1.h" in include_strings | ||||
|     assert "includes/header2.hpp" in include_strings | ||||
|     assert "includes/subdir/nested.h" in include_strings | ||||
|     # CPP files are copied but not included | ||||
|     assert "source.cpp" not in include_strings or "#include" not in include_strings | ||||
|     # README.md should not have an include statement | ||||
|     assert "README.md" not in include_strings | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| @pytest.mark.skipif(os.name != "nt", reason="Windows-specific test") | ||||
| async def test_add_includes_with_directory_windows( | ||||
|     tmp_path: Path, | ||||
|     mock_copy_file_if_changed: Mock, | ||||
|     mock_cg_with_include_capture: tuple[Mock, list[str]], | ||||
| ) -> None: | ||||
|     """Test add_includes copies all files from a directory on Windows.""" | ||||
|     CORE.config_path = str(tmp_path / "config.yaml") | ||||
|     CORE.build_path = str(tmp_path / "build") | ||||
|     os.makedirs(CORE.build_path, exist_ok=True) | ||||
|  | ||||
|     # Create include directory with files | ||||
|     include_dir = tmp_path / "includes" | ||||
|     include_dir.mkdir() | ||||
|     (include_dir / "header1.h").write_text("#define HEADER1") | ||||
|     (include_dir / "header2.hpp").write_text("#define HEADER2") | ||||
|     (include_dir / "source.cpp").write_text("// Implementation") | ||||
|     (include_dir / "README.md").write_text( | ||||
|         "# Documentation" | ||||
|     )  # Should be copied but not included | ||||
|  | ||||
|     # Create subdirectory with files | ||||
|     subdir = include_dir / "subdir" | ||||
|     subdir.mkdir() | ||||
|     (subdir / "nested.h").write_text("#define NESTED") | ||||
|  | ||||
|     mock_cg, includes_added = mock_cg_with_include_capture | ||||
|  | ||||
|     await config.add_includes([str(include_dir)]) | ||||
|  | ||||
|     # Verify copy_file_if_changed was called for all files | ||||
|     assert mock_copy_file_if_changed.call_count == 5  # 4 code files + 1 README | ||||
|  | ||||
|     # Verify include statements were added for valid extensions | ||||
|     include_strings = " ".join(includes_added) | ||||
|     assert "includes\\header1.h" in include_strings | ||||
|     assert "includes\\header2.hpp" in include_strings | ||||
|     assert "includes\\subdir\\nested.h" in include_strings | ||||
|     # CPP files are copied but not included | ||||
|     assert "source.cpp" not in include_strings or "#include" not in include_strings | ||||
|     # README.md should not have an include statement | ||||
|     assert "README.md" not in include_strings | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_add_includes_with_multiple_sources( | ||||
|     tmp_path: Path, mock_copy_file_if_changed: Mock | ||||
| ) -> None: | ||||
|     """Test add_includes with multiple files and directories.""" | ||||
|     CORE.config_path = str(tmp_path / "config.yaml") | ||||
|     CORE.build_path = str(tmp_path / "build") | ||||
|     os.makedirs(CORE.build_path, exist_ok=True) | ||||
|  | ||||
|     # Create various include sources | ||||
|     single_file = tmp_path / "single.h" | ||||
|     single_file.write_text("#define SINGLE") | ||||
|  | ||||
|     dir1 = tmp_path / "dir1" | ||||
|     dir1.mkdir() | ||||
|     (dir1 / "file1.h").write_text("#define FILE1") | ||||
|  | ||||
|     dir2 = tmp_path / "dir2" | ||||
|     dir2.mkdir() | ||||
|     (dir2 / "file2.cpp").write_text("// File2") | ||||
|  | ||||
|     with patch("esphome.core.config.cg"): | ||||
|         await config.add_includes([str(single_file), str(dir1), str(dir2)]) | ||||
|  | ||||
|     # Verify copy_file_if_changed was called for all files | ||||
|     assert mock_copy_file_if_changed.call_count == 3  # 3 files total | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_add_includes_empty_directory( | ||||
|     tmp_path: Path, mock_copy_file_if_changed: Mock | ||||
| ) -> None: | ||||
|     """Test add_includes with an empty directory doesn't fail.""" | ||||
|     CORE.config_path = str(tmp_path / "config.yaml") | ||||
|     CORE.build_path = str(tmp_path / "build") | ||||
|     os.makedirs(CORE.build_path, exist_ok=True) | ||||
|  | ||||
|     # Create empty directory | ||||
|     empty_dir = tmp_path / "empty" | ||||
|     empty_dir.mkdir() | ||||
|  | ||||
|     with patch("esphome.core.config.cg"): | ||||
|         # Should not raise any errors | ||||
|         await config.add_includes([str(empty_dir)]) | ||||
|  | ||||
|     # No files to copy from empty directory | ||||
|     mock_copy_file_if_changed.assert_not_called() | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| @pytest.mark.skipif(os.name == "nt", reason="Unix-specific test") | ||||
| async def test_add_includes_preserves_directory_structure_unix( | ||||
|     tmp_path: Path, mock_copy_file_if_changed: Mock | ||||
| ) -> None: | ||||
|     """Test that add_includes preserves relative directory structure on Unix.""" | ||||
|     CORE.config_path = str(tmp_path / "config.yaml") | ||||
|     CORE.build_path = str(tmp_path / "build") | ||||
|     os.makedirs(CORE.build_path, exist_ok=True) | ||||
|  | ||||
|     # Create nested directory structure | ||||
|     lib_dir = tmp_path / "lib" | ||||
|     lib_dir.mkdir() | ||||
|  | ||||
|     src_dir = lib_dir / "src" | ||||
|     src_dir.mkdir() | ||||
|     (src_dir / "core.h").write_text("#define CORE") | ||||
|  | ||||
|     utils_dir = lib_dir / "utils" | ||||
|     utils_dir.mkdir() | ||||
|     (utils_dir / "helper.h").write_text("#define HELPER") | ||||
|  | ||||
|     with patch("esphome.core.config.cg"): | ||||
|         await config.add_includes([str(lib_dir)]) | ||||
|  | ||||
|     # Verify copy_file_if_changed was called with correct paths | ||||
|     calls = mock_copy_file_if_changed.call_args_list | ||||
|     dest_paths = [call[0][1] for call in calls] | ||||
|  | ||||
|     # Check that relative paths are preserved | ||||
|     assert any("lib/src/core.h" in path for path in dest_paths) | ||||
|     assert any("lib/utils/helper.h" in path for path in dest_paths) | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| @pytest.mark.skipif(os.name != "nt", reason="Windows-specific test") | ||||
| async def test_add_includes_preserves_directory_structure_windows( | ||||
|     tmp_path: Path, mock_copy_file_if_changed: Mock | ||||
| ) -> None: | ||||
|     """Test that add_includes preserves relative directory structure on Windows.""" | ||||
|     CORE.config_path = str(tmp_path / "config.yaml") | ||||
|     CORE.build_path = str(tmp_path / "build") | ||||
|     os.makedirs(CORE.build_path, exist_ok=True) | ||||
|  | ||||
|     # Create nested directory structure | ||||
|     lib_dir = tmp_path / "lib" | ||||
|     lib_dir.mkdir() | ||||
|  | ||||
|     src_dir = lib_dir / "src" | ||||
|     src_dir.mkdir() | ||||
|     (src_dir / "core.h").write_text("#define CORE") | ||||
|  | ||||
|     utils_dir = lib_dir / "utils" | ||||
|     utils_dir.mkdir() | ||||
|     (utils_dir / "helper.h").write_text("#define HELPER") | ||||
|  | ||||
|     with patch("esphome.core.config.cg"): | ||||
|         await config.add_includes([str(lib_dir)]) | ||||
|  | ||||
|     # Verify copy_file_if_changed was called with correct paths | ||||
|     calls = mock_copy_file_if_changed.call_args_list | ||||
|     dest_paths = [call[0][1] for call in calls] | ||||
|  | ||||
|     # Check that relative paths are preserved | ||||
|     assert any("lib\\src\\core.h" in path for path in dest_paths) | ||||
|     assert any("lib\\utils\\helper.h" in path for path in dest_paths) | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_add_includes_overwrites_existing_files( | ||||
|     tmp_path: Path, mock_copy_file_if_changed: Mock | ||||
| ) -> None: | ||||
|     """Test that add_includes overwrites existing files in build directory.""" | ||||
|     CORE.config_path = str(tmp_path / "config.yaml") | ||||
|     CORE.build_path = str(tmp_path / "build") | ||||
|     os.makedirs(CORE.build_path, exist_ok=True) | ||||
|  | ||||
|     # Create include file | ||||
|     include_file = tmp_path / "header.h" | ||||
|     include_file.write_text("#define NEW_VALUE 42") | ||||
|  | ||||
|     with patch("esphome.core.config.cg"): | ||||
|         await config.add_includes([str(include_file)]) | ||||
|  | ||||
|     # Verify copy_file_if_changed was called (it handles overwriting) | ||||
|     # Note: add_includes adds files to a src/ subdirectory | ||||
|     mock_copy_file_if_changed.assert_called_once_with( | ||||
|         str(include_file), str(Path(CORE.build_path) / "src" / "header.h") | ||||
|     ) | ||||
|   | ||||
| @@ -1,3 +1,6 @@ | ||||
| import os | ||||
| from unittest.mock import patch | ||||
|  | ||||
| from hypothesis import given | ||||
| import pytest | ||||
| from strategies import mac_addr_strings | ||||
| @@ -577,3 +580,83 @@ class TestEsphomeCore: | ||||
|  | ||||
|         assert target.is_esp32 is False | ||||
|         assert target.is_esp8266 is True | ||||
|  | ||||
|     @pytest.mark.skipif(os.name == "nt", reason="Unix-specific test") | ||||
|     def test_data_dir_default_unix(self, target): | ||||
|         """Test data_dir returns .esphome in config directory by default on Unix.""" | ||||
|         target.config_path = "/home/user/config.yaml" | ||||
|         assert target.data_dir == "/home/user/.esphome" | ||||
|  | ||||
|     @pytest.mark.skipif(os.name != "nt", reason="Windows-specific test") | ||||
|     def test_data_dir_default_windows(self, target): | ||||
|         """Test data_dir returns .esphome in config directory by default on Windows.""" | ||||
|         target.config_path = "D:\\home\\user\\config.yaml" | ||||
|         assert target.data_dir == "D:\\home\\user\\.esphome" | ||||
|  | ||||
|     def test_data_dir_ha_addon(self, target): | ||||
|         """Test data_dir returns /data when running as Home Assistant addon.""" | ||||
|         target.config_path = "/config/test.yaml" | ||||
|  | ||||
|         with patch.dict(os.environ, {"ESPHOME_IS_HA_ADDON": "true"}): | ||||
|             assert target.data_dir == "/data" | ||||
|  | ||||
|     def test_data_dir_env_override(self, target): | ||||
|         """Test data_dir uses ESPHOME_DATA_DIR environment variable when set.""" | ||||
|         target.config_path = "/home/user/config.yaml" | ||||
|  | ||||
|         with patch.dict(os.environ, {"ESPHOME_DATA_DIR": "/custom/data/path"}): | ||||
|             assert target.data_dir == "/custom/data/path" | ||||
|  | ||||
|     @pytest.mark.skipif(os.name == "nt", reason="Unix-specific test") | ||||
|     def test_data_dir_priority_unix(self, target): | ||||
|         """Test data_dir priority on Unix: HA addon > env var > default.""" | ||||
|         target.config_path = "/config/test.yaml" | ||||
|         expected_default = "/config/.esphome" | ||||
|  | ||||
|         # Test HA addon takes priority over env var | ||||
|         with patch.dict( | ||||
|             os.environ, | ||||
|             {"ESPHOME_IS_HA_ADDON": "true", "ESPHOME_DATA_DIR": "/custom/path"}, | ||||
|         ): | ||||
|             assert target.data_dir == "/data" | ||||
|  | ||||
|         # Test env var is used when not HA addon | ||||
|         with patch.dict( | ||||
|             os.environ, | ||||
|             {"ESPHOME_IS_HA_ADDON": "false", "ESPHOME_DATA_DIR": "/custom/path"}, | ||||
|         ): | ||||
|             assert target.data_dir == "/custom/path" | ||||
|  | ||||
|         # Test default when neither is set | ||||
|         with patch.dict(os.environ, {}, clear=True): | ||||
|             # Ensure these env vars are not set | ||||
|             os.environ.pop("ESPHOME_IS_HA_ADDON", None) | ||||
|             os.environ.pop("ESPHOME_DATA_DIR", None) | ||||
|             assert target.data_dir == expected_default | ||||
|  | ||||
|     @pytest.mark.skipif(os.name != "nt", reason="Windows-specific test") | ||||
|     def test_data_dir_priority_windows(self, target): | ||||
|         """Test data_dir priority on Windows: HA addon > env var > default.""" | ||||
|         target.config_path = "D:\\config\\test.yaml" | ||||
|         expected_default = "D:\\config\\.esphome" | ||||
|  | ||||
|         # Test HA addon takes priority over env var | ||||
|         with patch.dict( | ||||
|             os.environ, | ||||
|             {"ESPHOME_IS_HA_ADDON": "true", "ESPHOME_DATA_DIR": "/custom/path"}, | ||||
|         ): | ||||
|             assert target.data_dir == "/data" | ||||
|  | ||||
|         # Test env var is used when not HA addon | ||||
|         with patch.dict( | ||||
|             os.environ, | ||||
|             {"ESPHOME_IS_HA_ADDON": "false", "ESPHOME_DATA_DIR": "/custom/path"}, | ||||
|         ): | ||||
|             assert target.data_dir == "/custom/path" | ||||
|  | ||||
|         # Test default when neither is set | ||||
|         with patch.dict(os.environ, {}, clear=True): | ||||
|             # Ensure these env vars are not set | ||||
|             os.environ.pop("ESPHOME_IS_HA_ADDON", None) | ||||
|             os.environ.pop("ESPHOME_DATA_DIR", None) | ||||
|             assert target.data_dir == expected_default | ||||
|   | ||||
| @@ -1,5 +1,8 @@ | ||||
| import logging | ||||
| import os | ||||
| from pathlib import Path | ||||
| import socket | ||||
| import stat | ||||
| from unittest.mock import patch | ||||
|  | ||||
| from aioesphomeapi.host_resolver import AddrInfo, IPv4Sockaddr, IPv6Sockaddr | ||||
| @@ -555,6 +558,239 @@ def test_addr_preference_ipv6_link_local_with_scope() -> None: | ||||
|     assert helpers.addr_preference_(addr_info) == 1  # Has scope, so it's usable | ||||
|  | ||||
|  | ||||
| def test_mkdir_p(tmp_path: Path) -> None: | ||||
|     """Test mkdir_p creates directories recursively.""" | ||||
|     # Test creating nested directories | ||||
|     nested_path = tmp_path / "level1" / "level2" / "level3" | ||||
|     helpers.mkdir_p(nested_path) | ||||
|     assert nested_path.exists() | ||||
|     assert nested_path.is_dir() | ||||
|  | ||||
|     # Test that mkdir_p is idempotent (doesn't fail if directory exists) | ||||
|     helpers.mkdir_p(nested_path) | ||||
|     assert nested_path.exists() | ||||
|  | ||||
|     # Test with empty path (should do nothing) | ||||
|     helpers.mkdir_p("") | ||||
|  | ||||
|     # Test with existing directory | ||||
|     existing_dir = tmp_path / "existing" | ||||
|     existing_dir.mkdir() | ||||
|     helpers.mkdir_p(existing_dir) | ||||
|     assert existing_dir.exists() | ||||
|  | ||||
|  | ||||
| def test_mkdir_p_file_exists_error(tmp_path: Path) -> None: | ||||
|     """Test mkdir_p raises error when path is a file.""" | ||||
|     # Create a file | ||||
|     file_path = tmp_path / "test_file.txt" | ||||
|     file_path.write_text("test content") | ||||
|  | ||||
|     # Try to create directory with same name as existing file | ||||
|     with pytest.raises(EsphomeError, match=r"Error creating directories"): | ||||
|         helpers.mkdir_p(file_path) | ||||
|  | ||||
|  | ||||
| def test_mkdir_p_with_existing_file_raises_error(tmp_path: Path) -> None: | ||||
|     """Test mkdir_p raises error when trying to create dir over existing file.""" | ||||
|     # Create a file where we want to create a directory | ||||
|     file_path = tmp_path / "existing_file" | ||||
|     file_path.write_text("content") | ||||
|  | ||||
|     # Try to create a directory with a path that goes through the file | ||||
|     dir_path = file_path / "subdir" | ||||
|  | ||||
|     with pytest.raises(EsphomeError, match=r"Error creating directories"): | ||||
|         helpers.mkdir_p(dir_path) | ||||
|  | ||||
|  | ||||
| @pytest.mark.skipif(os.name == "nt", reason="Unix-specific test") | ||||
| def test_read_file_unix(tmp_path: Path) -> None: | ||||
|     """Test read_file reads file content correctly on Unix.""" | ||||
|     # Test reading regular file | ||||
|     test_file = tmp_path / "test.txt" | ||||
|     expected_content = "Test content\nLine 2\n" | ||||
|     test_file.write_text(expected_content) | ||||
|  | ||||
|     content = helpers.read_file(test_file) | ||||
|     assert content == expected_content | ||||
|  | ||||
|     # Test reading file with UTF-8 characters | ||||
|     utf8_file = tmp_path / "utf8.txt" | ||||
|     utf8_content = "Hello 世界 🌍" | ||||
|     utf8_file.write_text(utf8_content, encoding="utf-8") | ||||
|  | ||||
|     content = helpers.read_file(utf8_file) | ||||
|     assert content == utf8_content | ||||
|  | ||||
|  | ||||
| @pytest.mark.skipif(os.name != "nt", reason="Windows-specific test") | ||||
| def test_read_file_windows(tmp_path: Path) -> None: | ||||
|     """Test read_file reads file content correctly on Windows.""" | ||||
|     # Test reading regular file | ||||
|     test_file = tmp_path / "test.txt" | ||||
|     expected_content = "Test content\nLine 2\n" | ||||
|     test_file.write_text(expected_content) | ||||
|  | ||||
|     content = helpers.read_file(test_file) | ||||
|     # On Windows, text mode reading converts \n to \r\n | ||||
|     assert content == expected_content.replace("\n", "\r\n") | ||||
|  | ||||
|     # Test reading file with UTF-8 characters | ||||
|     utf8_file = tmp_path / "utf8.txt" | ||||
|     utf8_content = "Hello 世界 🌍" | ||||
|     utf8_file.write_text(utf8_content, encoding="utf-8") | ||||
|  | ||||
|     content = helpers.read_file(utf8_file) | ||||
|     assert content == utf8_content | ||||
|  | ||||
|  | ||||
| def test_read_file_not_found() -> None: | ||||
|     """Test read_file raises error for non-existent file.""" | ||||
|     with pytest.raises(EsphomeError, match=r"Error reading file"): | ||||
|         helpers.read_file("/nonexistent/file.txt") | ||||
|  | ||||
|  | ||||
| def test_read_file_unicode_decode_error(tmp_path: Path) -> None: | ||||
|     """Test read_file raises error for invalid UTF-8.""" | ||||
|     test_file = tmp_path / "invalid.txt" | ||||
|     # Write invalid UTF-8 bytes | ||||
|     test_file.write_bytes(b"\xff\xfe") | ||||
|  | ||||
|     with pytest.raises(EsphomeError, match=r"Error reading file"): | ||||
|         helpers.read_file(test_file) | ||||
|  | ||||
|  | ||||
| @pytest.mark.skipif(os.name == "nt", reason="Unix-specific test") | ||||
| def test_write_file_unix(tmp_path: Path) -> None: | ||||
|     """Test write_file writes content correctly on Unix.""" | ||||
|     # Test writing string content | ||||
|     test_file = tmp_path / "test.txt" | ||||
|     content = "Test content\nLine 2" | ||||
|     helpers.write_file(test_file, content) | ||||
|  | ||||
|     assert test_file.read_text() == content | ||||
|     # Check file permissions | ||||
|     assert oct(test_file.stat().st_mode)[-3:] == "644" | ||||
|  | ||||
|     # Test overwriting existing file | ||||
|     new_content = "New content" | ||||
|     helpers.write_file(test_file, new_content) | ||||
|     assert test_file.read_text() == new_content | ||||
|  | ||||
|     # Test writing to nested directories (should create them) | ||||
|     nested_file = tmp_path / "dir1" / "dir2" / "file.txt" | ||||
|     helpers.write_file(nested_file, content) | ||||
|     assert nested_file.read_text() == content | ||||
|  | ||||
|  | ||||
| @pytest.mark.skipif(os.name != "nt", reason="Windows-specific test") | ||||
| def test_write_file_windows(tmp_path: Path) -> None: | ||||
|     """Test write_file writes content correctly on Windows.""" | ||||
|     # Test writing string content | ||||
|     test_file = tmp_path / "test.txt" | ||||
|     content = "Test content\nLine 2" | ||||
|     helpers.write_file(test_file, content) | ||||
|  | ||||
|     assert test_file.read_text() == content | ||||
|     # Windows doesn't have Unix-style 644 permissions | ||||
|  | ||||
|     # Test overwriting existing file | ||||
|     new_content = "New content" | ||||
|     helpers.write_file(test_file, new_content) | ||||
|     assert test_file.read_text() == new_content | ||||
|  | ||||
|     # Test writing to nested directories (should create them) | ||||
|     nested_file = tmp_path / "dir1" / "dir2" / "file.txt" | ||||
|     helpers.write_file(nested_file, content) | ||||
|     assert nested_file.read_text() == content | ||||
|  | ||||
|  | ||||
| @pytest.mark.skipif(os.name == "nt", reason="Unix-specific permission test") | ||||
| def test_write_file_to_non_writable_directory_unix(tmp_path: Path) -> None: | ||||
|     """Test write_file raises error when directory is not writable on Unix.""" | ||||
|     # Create a directory and make it read-only | ||||
|     read_only_dir = tmp_path / "readonly" | ||||
|     read_only_dir.mkdir() | ||||
|     test_file = read_only_dir / "test.txt" | ||||
|  | ||||
|     # Make directory read-only (no write permission) | ||||
|     read_only_dir.chmod(0o555) | ||||
|  | ||||
|     try: | ||||
|         with pytest.raises(EsphomeError, match=r"Could not write file"): | ||||
|             helpers.write_file(test_file, "content") | ||||
|     finally: | ||||
|         # Restore write permissions for cleanup | ||||
|         read_only_dir.chmod(0o755) | ||||
|  | ||||
|  | ||||
| @pytest.mark.skipif(os.name != "nt", reason="Windows-specific test") | ||||
| def test_write_file_to_non_writable_directory_windows(tmp_path: Path) -> None: | ||||
|     """Test write_file error handling on Windows.""" | ||||
|     # Windows handles permissions differently - test a different error case | ||||
|     # Try to write to a file path that contains an existing file as a directory component | ||||
|     existing_file = tmp_path / "file.txt" | ||||
|     existing_file.write_text("content") | ||||
|  | ||||
|     # Try to write to a path that treats the file as a directory | ||||
|     invalid_path = existing_file / "subdir" / "test.txt" | ||||
|  | ||||
|     with pytest.raises(EsphomeError, match=r"Could not write file"): | ||||
|         helpers.write_file(invalid_path, "content") | ||||
|  | ||||
|  | ||||
| @pytest.mark.skipif(os.name == "nt", reason="Unix-specific permission test") | ||||
| def test_write_file_with_permission_bits_unix(tmp_path: Path) -> None: | ||||
|     """Test that write_file sets correct permissions on Unix.""" | ||||
|     test_file = tmp_path / "test.txt" | ||||
|     helpers.write_file(test_file, "content") | ||||
|  | ||||
|     # Check that file has 644 permissions | ||||
|     file_mode = test_file.stat().st_mode | ||||
|     assert stat.S_IMODE(file_mode) == 0o644 | ||||
|  | ||||
|  | ||||
| @pytest.mark.skipif(os.name == "nt", reason="Unix-specific permission test") | ||||
| def test_copy_file_if_changed_permission_recovery_unix(tmp_path: Path) -> None: | ||||
|     """Test copy_file_if_changed handles permission errors correctly on Unix.""" | ||||
|     # Test with read-only destination file | ||||
|     src = tmp_path / "source.txt" | ||||
|     dst = tmp_path / "dest.txt" | ||||
|     src.write_text("new content") | ||||
|     dst.write_text("old content") | ||||
|     dst.chmod(0o444)  # Make destination read-only | ||||
|  | ||||
|     try: | ||||
|         # Should handle permission error by deleting and retrying | ||||
|         helpers.copy_file_if_changed(src, dst) | ||||
|         assert dst.read_text() == "new content" | ||||
|     finally: | ||||
|         # Restore write permissions for cleanup | ||||
|         if dst.exists(): | ||||
|             dst.chmod(0o644) | ||||
|  | ||||
|  | ||||
| def test_copy_file_if_changed_creates_directories(tmp_path: Path) -> None: | ||||
|     """Test copy_file_if_changed creates missing directories.""" | ||||
|     src = tmp_path / "source.txt" | ||||
|     dst = tmp_path / "subdir" / "nested" / "dest.txt" | ||||
|     src.write_text("content") | ||||
|  | ||||
|     helpers.copy_file_if_changed(src, dst) | ||||
|     assert dst.exists() | ||||
|     assert dst.read_text() == "content" | ||||
|  | ||||
|  | ||||
| def test_copy_file_if_changed_nonexistent_source(tmp_path: Path) -> None: | ||||
|     """Test copy_file_if_changed with non-existent source.""" | ||||
|     src = tmp_path / "nonexistent.txt" | ||||
|     dst = tmp_path / "dest.txt" | ||||
|  | ||||
|     with pytest.raises(EsphomeError, match=r"Error copying file"): | ||||
|         helpers.copy_file_if_changed(src, dst) | ||||
|  | ||||
|  | ||||
| def test_resolve_ip_address_sorting() -> None: | ||||
|     """Test that results are sorted by preference.""" | ||||
|     # Create multiple address infos with different preferences | ||||
|   | ||||
| @@ -1,5 +1,7 @@ | ||||
| """Tests for esphome.util module.""" | ||||
|  | ||||
| from __future__ import annotations | ||||
|  | ||||
| from pathlib import Path | ||||
|  | ||||
| import pytest | ||||
| @@ -308,3 +310,85 @@ def test_filter_yaml_files_case_sensitive() -> None: | ||||
|     assert "/path/to/config.YAML" not in result | ||||
|     assert "/path/to/config.YML" not in result | ||||
|     assert "/path/to/config.Yaml" not in result | ||||
|  | ||||
|  | ||||
| @pytest.mark.parametrize( | ||||
|     ("input_str", "expected"), | ||||
|     [ | ||||
|         # Empty string | ||||
|         ("", "''"), | ||||
|         # Simple strings that don't need quoting | ||||
|         ("hello", "hello"), | ||||
|         ("test123", "test123"), | ||||
|         ("file.txt", "file.txt"), | ||||
|         ("/path/to/file", "/path/to/file"), | ||||
|         ("user@host", "user@host"), | ||||
|         ("value:123", "value:123"), | ||||
|         ("item,list", "item,list"), | ||||
|         ("path-with-dash", "path-with-dash"), | ||||
|         # Strings that need quoting | ||||
|         ("hello world", "'hello world'"), | ||||
|         ("test\ttab", "'test\ttab'"), | ||||
|         ("line\nbreak", "'line\nbreak'"), | ||||
|         ("semicolon;here", "'semicolon;here'"), | ||||
|         ("pipe|symbol", "'pipe|symbol'"), | ||||
|         ("redirect>file", "'redirect>file'"), | ||||
|         ("redirect<file", "'redirect<file'"), | ||||
|         ("background&", "'background&'"), | ||||
|         ("dollar$sign", "'dollar$sign'"), | ||||
|         ("backtick`cmd", "'backtick`cmd'"), | ||||
|         ('double"quote', "'double\"quote'"), | ||||
|         ("backslash\\path", "'backslash\\path'"), | ||||
|         ("question?mark", "'question?mark'"), | ||||
|         ("asterisk*wild", "'asterisk*wild'"), | ||||
|         ("bracket[test]", "'bracket[test]'"), | ||||
|         ("paren(test)", "'paren(test)'"), | ||||
|         ("curly{brace}", "'curly{brace}'"), | ||||
|         # Single quotes in string (special escaping) | ||||
|         ("it's", "'it'\"'\"'s'"), | ||||
|         ("don't", "'don'\"'\"'t'"), | ||||
|         ("'quoted'", "''\"'\"'quoted'\"'\"''"), | ||||
|         # Complex combinations | ||||
|         ("test 'with' quotes", "'test '\"'\"'with'\"'\"' quotes'"), | ||||
|         ("path/to/file's.txt", "'path/to/file'\"'\"'s.txt'"), | ||||
|     ], | ||||
| ) | ||||
| def test_shlex_quote(input_str: str, expected: str) -> None: | ||||
|     """Test shlex_quote properly escapes shell arguments.""" | ||||
|     assert util.shlex_quote(input_str) == expected | ||||
|  | ||||
|  | ||||
| def test_shlex_quote_safe_characters() -> None: | ||||
|     """Test that safe characters are not quoted.""" | ||||
|     # These characters are considered safe and shouldn't be quoted | ||||
|     safe_chars = ( | ||||
|         "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789@%+=:,./-_" | ||||
|     ) | ||||
|     for char in safe_chars: | ||||
|         assert util.shlex_quote(char) == char | ||||
|         assert util.shlex_quote(f"test{char}test") == f"test{char}test" | ||||
|  | ||||
|  | ||||
| def test_shlex_quote_unsafe_characters() -> None: | ||||
|     """Test that unsafe characters trigger quoting.""" | ||||
|     # These characters should trigger quoting | ||||
|     unsafe_chars = ' \t\n;|>&<$`"\\?*[](){}!#~^' | ||||
|     for char in unsafe_chars: | ||||
|         result = util.shlex_quote(f"test{char}test") | ||||
|         assert result.startswith("'") | ||||
|         assert result.endswith("'") | ||||
|  | ||||
|  | ||||
| def test_shlex_quote_edge_cases() -> None: | ||||
|     """Test edge cases for shlex_quote.""" | ||||
|     # Multiple single quotes | ||||
|     assert util.shlex_quote("'''") == "''\"'\"''\"'\"''\"'\"''" | ||||
|  | ||||
|     # Mixed quotes | ||||
|     assert util.shlex_quote('"\'"') == "'\"'\"'\"'\"'" | ||||
|  | ||||
|     # Only whitespace | ||||
|     assert util.shlex_quote(" ") == "' '" | ||||
|     assert util.shlex_quote("\t") == "'\t'" | ||||
|     assert util.shlex_quote("\n") == "'\n'" | ||||
|     assert util.shlex_quote("   ") == "'   '" | ||||
|   | ||||
		Reference in New Issue
	
	Block a user