mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-30 22:53:59 +00:00 
			
		
		
		
	Merge branch 'dev' into sha256_ota
This commit is contained in:
		| @@ -6,6 +6,7 @@ from collections.abc import Callable, Generator | ||||
| from pathlib import Path | ||||
| import sys | ||||
| from typing import Any | ||||
| from unittest import mock | ||||
|  | ||||
| import pytest | ||||
|  | ||||
| @@ -17,6 +18,7 @@ from esphome.const import ( | ||||
|     PlatformFramework, | ||||
| ) | ||||
| from esphome.types import ConfigType | ||||
| from esphome.util import OrderedDict | ||||
|  | ||||
| # Add package root to python path | ||||
| here = Path(__file__).parent | ||||
| @@ -135,3 +137,29 @@ def generate_main() -> Generator[Callable[[str | Path], str]]: | ||||
|         return CORE.cpp_main_section | ||||
|  | ||||
|     yield generator | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_clone_or_update() -> Generator[Any]: | ||||
|     """Mock git.clone_or_update for testing.""" | ||||
|     with mock.patch("esphome.git.clone_or_update") as mock_func: | ||||
|         # Default return value | ||||
|         mock_func.return_value = (Path("/tmp/test"), None) | ||||
|         yield mock_func | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_load_yaml() -> Generator[Any]: | ||||
|     """Mock yaml_util.load_yaml for testing.""" | ||||
|  | ||||
|     with mock.patch("esphome.yaml_util.load_yaml") as mock_func: | ||||
|         # Default return value | ||||
|         mock_func.return_value = OrderedDict({"sensor": []}) | ||||
|         yield mock_func | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_install_meta_finder() -> Generator[Any]: | ||||
|     """Mock loader.install_meta_finder for testing.""" | ||||
|     with mock.patch("esphome.loader.install_meta_finder") as mock_func: | ||||
|         yield mock_func | ||||
|   | ||||
							
								
								
									
										134
									
								
								tests/component_tests/external_components/test_init.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										134
									
								
								tests/component_tests/external_components/test_init.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,134 @@ | ||||
| """Tests for the external_components skip_update functionality.""" | ||||
|  | ||||
| from pathlib import Path | ||||
| from typing import Any | ||||
| from unittest.mock import MagicMock | ||||
|  | ||||
| from esphome.components.external_components import do_external_components_pass | ||||
| from esphome.const import ( | ||||
|     CONF_EXTERNAL_COMPONENTS, | ||||
|     CONF_REFRESH, | ||||
|     CONF_SOURCE, | ||||
|     CONF_URL, | ||||
|     TYPE_GIT, | ||||
| ) | ||||
|  | ||||
|  | ||||
| def test_external_components_skip_update_true( | ||||
|     tmp_path: Path, mock_clone_or_update: MagicMock, mock_install_meta_finder: MagicMock | ||||
| ) -> None: | ||||
|     """Test that external components don't update when skip_update=True.""" | ||||
|     # Create a components directory structure | ||||
|     components_dir = tmp_path / "components" | ||||
|     components_dir.mkdir() | ||||
|  | ||||
|     # Create a test component | ||||
|     test_component_dir = components_dir / "test_component" | ||||
|     test_component_dir.mkdir() | ||||
|     (test_component_dir / "__init__.py").write_text("# Test component") | ||||
|  | ||||
|     # Set up mock to return our tmp_path | ||||
|     mock_clone_or_update.return_value = (tmp_path, None) | ||||
|  | ||||
|     config: dict[str, Any] = { | ||||
|         CONF_EXTERNAL_COMPONENTS: [ | ||||
|             { | ||||
|                 CONF_SOURCE: { | ||||
|                     "type": TYPE_GIT, | ||||
|                     CONF_URL: "https://github.com/test/components", | ||||
|                 }, | ||||
|                 CONF_REFRESH: "1d", | ||||
|                 "components": "all", | ||||
|             } | ||||
|         ] | ||||
|     } | ||||
|  | ||||
|     # Call with skip_update=True | ||||
|     do_external_components_pass(config, skip_update=True) | ||||
|  | ||||
|     # Verify clone_or_update was called with NEVER_REFRESH | ||||
|     mock_clone_or_update.assert_called_once() | ||||
|     call_args = mock_clone_or_update.call_args | ||||
|     from esphome import git | ||||
|  | ||||
|     assert call_args.kwargs["refresh"] == git.NEVER_REFRESH | ||||
|  | ||||
|  | ||||
| def test_external_components_skip_update_false( | ||||
|     tmp_path: Path, mock_clone_or_update: MagicMock, mock_install_meta_finder: MagicMock | ||||
| ) -> None: | ||||
|     """Test that external components update when skip_update=False.""" | ||||
|     # Create a components directory structure | ||||
|     components_dir = tmp_path / "components" | ||||
|     components_dir.mkdir() | ||||
|  | ||||
|     # Create a test component | ||||
|     test_component_dir = components_dir / "test_component" | ||||
|     test_component_dir.mkdir() | ||||
|     (test_component_dir / "__init__.py").write_text("# Test component") | ||||
|  | ||||
|     # Set up mock to return our tmp_path | ||||
|     mock_clone_or_update.return_value = (tmp_path, None) | ||||
|  | ||||
|     config: dict[str, Any] = { | ||||
|         CONF_EXTERNAL_COMPONENTS: [ | ||||
|             { | ||||
|                 CONF_SOURCE: { | ||||
|                     "type": TYPE_GIT, | ||||
|                     CONF_URL: "https://github.com/test/components", | ||||
|                 }, | ||||
|                 CONF_REFRESH: "1d", | ||||
|                 "components": "all", | ||||
|             } | ||||
|         ] | ||||
|     } | ||||
|  | ||||
|     # Call with skip_update=False | ||||
|     do_external_components_pass(config, skip_update=False) | ||||
|  | ||||
|     # Verify clone_or_update was called with actual refresh value | ||||
|     mock_clone_or_update.assert_called_once() | ||||
|     call_args = mock_clone_or_update.call_args | ||||
|     from esphome.core import TimePeriodSeconds | ||||
|  | ||||
|     assert call_args.kwargs["refresh"] == TimePeriodSeconds(days=1) | ||||
|  | ||||
|  | ||||
| def test_external_components_default_no_skip( | ||||
|     tmp_path: Path, mock_clone_or_update: MagicMock, mock_install_meta_finder: MagicMock | ||||
| ) -> None: | ||||
|     """Test that external components update by default when skip_update not specified.""" | ||||
|     # Create a components directory structure | ||||
|     components_dir = tmp_path / "components" | ||||
|     components_dir.mkdir() | ||||
|  | ||||
|     # Create a test component | ||||
|     test_component_dir = components_dir / "test_component" | ||||
|     test_component_dir.mkdir() | ||||
|     (test_component_dir / "__init__.py").write_text("# Test component") | ||||
|  | ||||
|     # Set up mock to return our tmp_path | ||||
|     mock_clone_or_update.return_value = (tmp_path, None) | ||||
|  | ||||
|     config: dict[str, Any] = { | ||||
|         CONF_EXTERNAL_COMPONENTS: [ | ||||
|             { | ||||
|                 CONF_SOURCE: { | ||||
|                     "type": TYPE_GIT, | ||||
|                     CONF_URL: "https://github.com/test/components", | ||||
|                 }, | ||||
|                 CONF_REFRESH: "1d", | ||||
|                 "components": "all", | ||||
|             } | ||||
|         ] | ||||
|     } | ||||
|  | ||||
|     # Call without skip_update parameter | ||||
|     do_external_components_pass(config) | ||||
|  | ||||
|     # Verify clone_or_update was called with actual refresh value | ||||
|     mock_clone_or_update.assert_called_once() | ||||
|     call_args = mock_clone_or_update.call_args | ||||
|     from esphome.core import TimePeriodSeconds | ||||
|  | ||||
|     assert call_args.kwargs["refresh"] == TimePeriodSeconds(days=1) | ||||
							
								
								
									
										114
									
								
								tests/component_tests/packages/test_init.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										114
									
								
								tests/component_tests/packages/test_init.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,114 @@ | ||||
| """Tests for the packages component skip_update functionality.""" | ||||
|  | ||||
| from pathlib import Path | ||||
| from typing import Any | ||||
| from unittest.mock import MagicMock | ||||
|  | ||||
| from esphome.components.packages import do_packages_pass | ||||
| from esphome.const import CONF_FILES, CONF_PACKAGES, CONF_REFRESH, CONF_URL | ||||
| from esphome.util import OrderedDict | ||||
|  | ||||
|  | ||||
| def test_packages_skip_update_true( | ||||
|     tmp_path: Path, mock_clone_or_update: MagicMock, mock_load_yaml: MagicMock | ||||
| ) -> None: | ||||
|     """Test that packages don't update when skip_update=True.""" | ||||
|     # Set up mock to return our tmp_path | ||||
|     mock_clone_or_update.return_value = (tmp_path, None) | ||||
|  | ||||
|     # Create the test yaml file | ||||
|     test_file = tmp_path / "test.yaml" | ||||
|     test_file.write_text("sensor: []") | ||||
|  | ||||
|     # Set mock_load_yaml to return some valid config | ||||
|     mock_load_yaml.return_value = OrderedDict({"sensor": []}) | ||||
|  | ||||
|     config: dict[str, Any] = { | ||||
|         CONF_PACKAGES: { | ||||
|             "test_package": { | ||||
|                 CONF_URL: "https://github.com/test/repo", | ||||
|                 CONF_FILES: ["test.yaml"], | ||||
|                 CONF_REFRESH: "1d", | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     # Call with skip_update=True | ||||
|     do_packages_pass(config, skip_update=True) | ||||
|  | ||||
|     # Verify clone_or_update was called with NEVER_REFRESH | ||||
|     mock_clone_or_update.assert_called_once() | ||||
|     call_args = mock_clone_or_update.call_args | ||||
|     from esphome import git | ||||
|  | ||||
|     assert call_args.kwargs["refresh"] == git.NEVER_REFRESH | ||||
|  | ||||
|  | ||||
| def test_packages_skip_update_false( | ||||
|     tmp_path: Path, mock_clone_or_update: MagicMock, mock_load_yaml: MagicMock | ||||
| ) -> None: | ||||
|     """Test that packages update when skip_update=False.""" | ||||
|     # Set up mock to return our tmp_path | ||||
|     mock_clone_or_update.return_value = (tmp_path, None) | ||||
|  | ||||
|     # Create the test yaml file | ||||
|     test_file = tmp_path / "test.yaml" | ||||
|     test_file.write_text("sensor: []") | ||||
|  | ||||
|     # Set mock_load_yaml to return some valid config | ||||
|     mock_load_yaml.return_value = OrderedDict({"sensor": []}) | ||||
|  | ||||
|     config: dict[str, Any] = { | ||||
|         CONF_PACKAGES: { | ||||
|             "test_package": { | ||||
|                 CONF_URL: "https://github.com/test/repo", | ||||
|                 CONF_FILES: ["test.yaml"], | ||||
|                 CONF_REFRESH: "1d", | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     # Call with skip_update=False (default) | ||||
|     do_packages_pass(config, skip_update=False) | ||||
|  | ||||
|     # Verify clone_or_update was called with actual refresh value | ||||
|     mock_clone_or_update.assert_called_once() | ||||
|     call_args = mock_clone_or_update.call_args | ||||
|     from esphome.core import TimePeriodSeconds | ||||
|  | ||||
|     assert call_args.kwargs["refresh"] == TimePeriodSeconds(days=1) | ||||
|  | ||||
|  | ||||
| def test_packages_default_no_skip( | ||||
|     tmp_path: Path, mock_clone_or_update: MagicMock, mock_load_yaml: MagicMock | ||||
| ) -> None: | ||||
|     """Test that packages update by default when skip_update not specified.""" | ||||
|     # Set up mock to return our tmp_path | ||||
|     mock_clone_or_update.return_value = (tmp_path, None) | ||||
|  | ||||
|     # Create the test yaml file | ||||
|     test_file = tmp_path / "test.yaml" | ||||
|     test_file.write_text("sensor: []") | ||||
|  | ||||
|     # Set mock_load_yaml to return some valid config | ||||
|     mock_load_yaml.return_value = OrderedDict({"sensor": []}) | ||||
|  | ||||
|     config: dict[str, Any] = { | ||||
|         CONF_PACKAGES: { | ||||
|             "test_package": { | ||||
|                 CONF_URL: "https://github.com/test/repo", | ||||
|                 CONF_FILES: ["test.yaml"], | ||||
|                 CONF_REFRESH: "1d", | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     # Call without skip_update parameter | ||||
|     do_packages_pass(config) | ||||
|  | ||||
|     # Verify clone_or_update was called with actual refresh value | ||||
|     mock_clone_or_update.assert_called_once() | ||||
|     call_args = mock_clone_or_update.call_args | ||||
|     from esphome.core import TimePeriodSeconds | ||||
|  | ||||
|     assert call_args.kwargs["refresh"] == TimePeriodSeconds(days=1) | ||||
| @@ -58,6 +58,8 @@ def _get_platformio_env(cache_dir: Path) -> dict[str, str]: | ||||
|     env["PLATFORMIO_CORE_DIR"] = str(cache_dir) | ||||
|     env["PLATFORMIO_CACHE_DIR"] = str(cache_dir / ".cache") | ||||
|     env["PLATFORMIO_LIBDEPS_DIR"] = str(cache_dir / "libdeps") | ||||
|     # Prevent cache cleaning during integration tests | ||||
|     env["ESPHOME_SKIP_CLEAN_BUILD"] = "1" | ||||
|     return env | ||||
|  | ||||
|  | ||||
| @@ -68,6 +70,11 @@ def shared_platformio_cache() -> Generator[Path]: | ||||
|     test_cache_dir = Path.home() / ".esphome-integration-tests" | ||||
|     cache_dir = test_cache_dir / "platformio" | ||||
|  | ||||
|     # Create the temp directory that PlatformIO uses to avoid race conditions | ||||
|     # This ensures it exists and won't be deleted by parallel processes | ||||
|     platformio_tmp_dir = cache_dir / ".cache" / "tmp" | ||||
|     platformio_tmp_dir.mkdir(parents=True, exist_ok=True) | ||||
|  | ||||
|     # Use a lock file in the home directory to ensure only one process initializes the cache | ||||
|     # This is needed when running with pytest-xdist | ||||
|     # The lock file must be in a directory that already exists to avoid race conditions | ||||
| @@ -83,17 +90,11 @@ def shared_platformio_cache() -> Generator[Path]: | ||||
|             test_cache_dir.mkdir(exist_ok=True) | ||||
|  | ||||
|             with tempfile.TemporaryDirectory() as tmpdir: | ||||
|                 # Create a basic host config | ||||
|                 # Use the cache_init fixture for initialization | ||||
|                 init_dir = Path(tmpdir) | ||||
|                 fixture_path = Path(__file__).parent / "fixtures" / "cache_init.yaml" | ||||
|                 config_path = init_dir / "cache_init.yaml" | ||||
|                 config_path.write_text("""esphome: | ||||
|   name: cache-init | ||||
| host: | ||||
| api: | ||||
|   encryption: | ||||
|     key: "IIevImVI42I0FGos5nLqFK91jrJehrgidI0ArwMLr8w=" | ||||
| logger: | ||||
| """) | ||||
|                 config_path.write_text(fixture_path.read_text()) | ||||
|  | ||||
|                 # Run compilation to populate the cache | ||||
|                 # We must succeed here to avoid race conditions where multiple | ||||
| @@ -346,7 +347,8 @@ async def wait_and_connect_api_client( | ||||
|     noise_psk: str | None = None, | ||||
|     client_info: str = "integration-test", | ||||
|     timeout: float = API_CONNECTION_TIMEOUT, | ||||
| ) -> AsyncGenerator[APIClient]: | ||||
|     return_disconnect_event: bool = False, | ||||
| ) -> AsyncGenerator[APIClient | tuple[APIClient, asyncio.Event]]: | ||||
|     """Wait for API to be available and connect.""" | ||||
|     client = APIClient( | ||||
|         address=address, | ||||
| @@ -359,14 +361,17 @@ async def wait_and_connect_api_client( | ||||
|     # Create a future to signal when connected | ||||
|     loop = asyncio.get_running_loop() | ||||
|     connected_future: asyncio.Future[None] = loop.create_future() | ||||
|     disconnect_event = asyncio.Event() | ||||
|  | ||||
|     async def on_connect() -> None: | ||||
|         """Called when successfully connected.""" | ||||
|         disconnect_event.clear()  # Clear the disconnect event on new connection | ||||
|         if not connected_future.done(): | ||||
|             connected_future.set_result(None) | ||||
|  | ||||
|     async def on_disconnect(expected_disconnect: bool) -> None: | ||||
|         """Called when disconnected.""" | ||||
|         disconnect_event.set() | ||||
|         if not connected_future.done() and not expected_disconnect: | ||||
|             connected_future.set_exception( | ||||
|                 APIConnectionError("Disconnected before fully connected") | ||||
| @@ -397,7 +402,10 @@ async def wait_and_connect_api_client( | ||||
|         except TimeoutError: | ||||
|             raise TimeoutError(f"Failed to connect to API after {timeout} seconds") | ||||
|  | ||||
|         yield client | ||||
|         if return_disconnect_event: | ||||
|             yield client, disconnect_event | ||||
|         else: | ||||
|             yield client | ||||
|     finally: | ||||
|         # Stop reconnect logic and disconnect | ||||
|         await reconnect_logic.stop() | ||||
| @@ -430,6 +438,33 @@ async def api_client_connected( | ||||
|     yield _connect_client | ||||
|  | ||||
|  | ||||
| @pytest_asyncio.fixture | ||||
| async def api_client_connected_with_disconnect( | ||||
|     unused_tcp_port: int, | ||||
| ) -> AsyncGenerator: | ||||
|     """Factory for creating connected API client context managers with disconnect event.""" | ||||
|  | ||||
|     def _connect_client_with_disconnect( | ||||
|         address: str = LOCALHOST, | ||||
|         port: int | None = None, | ||||
|         password: str = "", | ||||
|         noise_psk: str | None = None, | ||||
|         client_info: str = "integration-test", | ||||
|         timeout: float = API_CONNECTION_TIMEOUT, | ||||
|     ): | ||||
|         return wait_and_connect_api_client( | ||||
|             address=address, | ||||
|             port=port if port is not None else unused_tcp_port, | ||||
|             password=password, | ||||
|             noise_psk=noise_psk, | ||||
|             client_info=client_info, | ||||
|             timeout=timeout, | ||||
|             return_disconnect_event=True, | ||||
|         ) | ||||
|  | ||||
|     yield _connect_client_with_disconnect | ||||
|  | ||||
|  | ||||
| async def _read_stream_lines( | ||||
|     stream: asyncio.StreamReader, | ||||
|     lines: list[str], | ||||
|   | ||||
							
								
								
									
										10
									
								
								tests/integration/fixtures/cache_init.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								tests/integration/fixtures/cache_init.yaml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,10 @@ | ||||
| esphome: | ||||
|   name: cache-init | ||||
|  | ||||
| host: | ||||
|  | ||||
| api: | ||||
|   encryption: | ||||
|     key: "IIevImVI42I0FGos5nLqFK91jrJehrgidI0ArwMLr8w=" | ||||
|  | ||||
| logger: | ||||
| @@ -0,0 +1,11 @@ | ||||
| esphome: | ||||
|   name: oversized-noise | ||||
|  | ||||
| host: | ||||
|  | ||||
| api: | ||||
|   encryption: | ||||
|     key: N4Yle5YirwZhPiHHsdZLdOA73ndj/84veVaLhTvxCuU= | ||||
|  | ||||
| logger: | ||||
|   level: VERY_VERBOSE | ||||
							
								
								
									
										11
									
								
								tests/integration/fixtures/oversized_payload_noise.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										11
									
								
								tests/integration/fixtures/oversized_payload_noise.yaml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,11 @@ | ||||
| esphome: | ||||
|   name: oversized-noise | ||||
|  | ||||
| host: | ||||
|  | ||||
| api: | ||||
|   encryption: | ||||
|     key: N4Yle5YirwZhPiHHsdZLdOA73ndj/84veVaLhTvxCuU= | ||||
|  | ||||
| logger: | ||||
|   level: VERY_VERBOSE | ||||
| @@ -0,0 +1,9 @@ | ||||
| esphome: | ||||
|   name: oversized-plaintext | ||||
|  | ||||
| host: | ||||
|  | ||||
| api: | ||||
|  | ||||
| logger: | ||||
|   level: VERY_VERBOSE | ||||
| @@ -0,0 +1,11 @@ | ||||
| esphome: | ||||
|   name: oversized-noise | ||||
|  | ||||
| host: | ||||
|  | ||||
| api: | ||||
|   encryption: | ||||
|     key: N4Yle5YirwZhPiHHsdZLdOA73ndj/84veVaLhTvxCuU= | ||||
|  | ||||
| logger: | ||||
|   level: VERY_VERBOSE | ||||
| @@ -0,0 +1,9 @@ | ||||
| esphome: | ||||
|   name: oversized-protobuf-plaintext | ||||
|  | ||||
| host: | ||||
|  | ||||
| api: | ||||
|  | ||||
| logger: | ||||
|   level: VERY_VERBOSE | ||||
							
								
								
									
										335
									
								
								tests/integration/test_oversized_payloads.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										335
									
								
								tests/integration/test_oversized_payloads.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,335 @@ | ||||
| """Integration tests for oversized payloads and headers that should cause disconnection.""" | ||||
|  | ||||
| from __future__ import annotations | ||||
|  | ||||
| import asyncio | ||||
|  | ||||
| import pytest | ||||
|  | ||||
| from .types import APIClientConnectedWithDisconnectFactory, RunCompiledFunction | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_oversized_payload_plaintext( | ||||
|     yaml_config: str, | ||||
|     run_compiled: RunCompiledFunction, | ||||
|     api_client_connected_with_disconnect: APIClientConnectedWithDisconnectFactory, | ||||
| ) -> None: | ||||
|     """Test that oversized payloads (>100KiB) from client cause disconnection without crashing.""" | ||||
|     process_exited = False | ||||
|     helper_log_found = False | ||||
|  | ||||
|     def check_logs(line: str) -> None: | ||||
|         nonlocal process_exited, helper_log_found | ||||
|         # Check for signs that the process exited/crashed | ||||
|         if "Segmentation fault" in line or "core dumped" in line: | ||||
|             process_exited = True | ||||
|         # Check for HELPER_LOG message about message size exceeding maximum | ||||
|         if ( | ||||
|             "[VV]" in line | ||||
|             and "Bad packet: message size" in line | ||||
|             and "exceeds maximum" in line | ||||
|         ): | ||||
|             helper_log_found = True | ||||
|  | ||||
|     async with run_compiled(yaml_config, line_callback=check_logs): | ||||
|         async with api_client_connected_with_disconnect() as (client, disconnect_event): | ||||
|             # Verify basic connection works first | ||||
|             device_info = await client.device_info() | ||||
|             assert device_info is not None | ||||
|             assert device_info.name == "oversized-plaintext" | ||||
|  | ||||
|             # Create an oversized payload (>100KiB) | ||||
|             oversized_data = b"X" * (100 * 1024 + 1)  # 100KiB + 1 byte | ||||
|  | ||||
|             # Access the internal connection to send raw data | ||||
|             frame_helper = client._connection._frame_helper | ||||
|             # Create a message with oversized payload | ||||
|             # Using message type 1 (DeviceInfoRequest) as an example | ||||
|             message_type = 1 | ||||
|             frame_helper.write_packets([(message_type, oversized_data)], True) | ||||
|  | ||||
|             # Wait for the connection to be closed by ESPHome | ||||
|             await asyncio.wait_for(disconnect_event.wait(), timeout=5.0) | ||||
|  | ||||
|         # After disconnection, verify process didn't crash | ||||
|         assert not process_exited, "ESPHome process should not crash" | ||||
|         # Verify we saw the expected HELPER_LOG message | ||||
|         assert helper_log_found, ( | ||||
|             "Expected to see HELPER_LOG about message size exceeding maximum" | ||||
|         ) | ||||
|  | ||||
|         # Try to reconnect to verify the process is still running | ||||
|         async with api_client_connected_with_disconnect() as (client2, _): | ||||
|             device_info = await client2.device_info() | ||||
|             assert device_info is not None | ||||
|             assert device_info.name == "oversized-plaintext" | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_oversized_protobuf_message_id_plaintext( | ||||
|     yaml_config: str, | ||||
|     run_compiled: RunCompiledFunction, | ||||
|     api_client_connected_with_disconnect: APIClientConnectedWithDisconnectFactory, | ||||
| ) -> None: | ||||
|     """Test that protobuf messages with ID > UINT16_MAX cause disconnection without crashing. | ||||
|  | ||||
|     This tests the message type limit - message IDs must fit in a uint16_t (0-65535). | ||||
|     """ | ||||
|     process_exited = False | ||||
|     helper_log_found = False | ||||
|  | ||||
|     def check_logs(line: str) -> None: | ||||
|         nonlocal process_exited, helper_log_found | ||||
|         # Check for signs that the process exited/crashed | ||||
|         if "Segmentation fault" in line or "core dumped" in line: | ||||
|             process_exited = True | ||||
|         # Check for HELPER_LOG message about message type exceeding maximum | ||||
|         if ( | ||||
|             "[VV]" in line | ||||
|             and "Bad packet: message type" in line | ||||
|             and "exceeds maximum" in line | ||||
|         ): | ||||
|             helper_log_found = True | ||||
|  | ||||
|     async with run_compiled(yaml_config, line_callback=check_logs): | ||||
|         async with api_client_connected_with_disconnect() as (client, disconnect_event): | ||||
|             # Verify basic connection works first | ||||
|             device_info = await client.device_info() | ||||
|             assert device_info is not None | ||||
|             assert device_info.name == "oversized-protobuf-plaintext" | ||||
|  | ||||
|             # Access the internal connection to send raw message with large ID | ||||
|             frame_helper = client._connection._frame_helper | ||||
|             # Message ID that exceeds uint16_t limit (> 65535) | ||||
|             large_message_id = 65536  # 2^16, exceeds UINT16_MAX | ||||
|             # Small payload for the test | ||||
|             payload = b"test" | ||||
|  | ||||
|             # This should cause disconnection due to oversized varint | ||||
|             frame_helper.write_packets([(large_message_id, payload)], True) | ||||
|  | ||||
|             # Wait for the connection to be closed by ESPHome | ||||
|             await asyncio.wait_for(disconnect_event.wait(), timeout=5.0) | ||||
|  | ||||
|         # After disconnection, verify process didn't crash | ||||
|         assert not process_exited, "ESPHome process should not crash" | ||||
|         # Verify we saw the expected HELPER_LOG message | ||||
|         assert helper_log_found, ( | ||||
|             "Expected to see HELPER_LOG about message type exceeding maximum" | ||||
|         ) | ||||
|  | ||||
|         # Try to reconnect to verify the process is still running | ||||
|         async with api_client_connected_with_disconnect() as (client2, _): | ||||
|             device_info = await client2.device_info() | ||||
|             assert device_info is not None | ||||
|             assert device_info.name == "oversized-protobuf-plaintext" | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_oversized_payload_noise( | ||||
|     yaml_config: str, | ||||
|     run_compiled: RunCompiledFunction, | ||||
|     api_client_connected_with_disconnect: APIClientConnectedWithDisconnectFactory, | ||||
| ) -> None: | ||||
|     """Test that oversized payloads (>100KiB) from client cause disconnection without crashing with noise encryption.""" | ||||
|     noise_key = "N4Yle5YirwZhPiHHsdZLdOA73ndj/84veVaLhTvxCuU=" | ||||
|     process_exited = False | ||||
|     cipherstate_failed = False | ||||
|  | ||||
|     def check_logs(line: str) -> None: | ||||
|         nonlocal process_exited, cipherstate_failed | ||||
|         # Check for signs that the process exited/crashed | ||||
|         if "Segmentation fault" in line or "core dumped" in line: | ||||
|             process_exited = True | ||||
|         # Check for the expected warning about decryption failure | ||||
|         if ( | ||||
|             "[W][api.connection" in line | ||||
|             and "Reading failed CIPHERSTATE_DECRYPT_FAILED" in line | ||||
|         ): | ||||
|             cipherstate_failed = True | ||||
|  | ||||
|     async with run_compiled(yaml_config, line_callback=check_logs): | ||||
|         async with api_client_connected_with_disconnect(noise_psk=noise_key) as ( | ||||
|             client, | ||||
|             disconnect_event, | ||||
|         ): | ||||
|             # Verify basic connection works first | ||||
|             device_info = await client.device_info() | ||||
|             assert device_info is not None | ||||
|             assert device_info.name == "oversized-noise" | ||||
|  | ||||
|             # Create an oversized payload (>100KiB) | ||||
|             oversized_data = b"Y" * (100 * 1024 + 1)  # 100KiB + 1 byte | ||||
|  | ||||
|             # Access the internal connection to send raw data | ||||
|             frame_helper = client._connection._frame_helper | ||||
|             # For noise connections, we still send through write_packets | ||||
|             # but the frame helper will handle encryption | ||||
|             # Using message type 1 (DeviceInfoRequest) as an example | ||||
|             message_type = 1 | ||||
|             frame_helper.write_packets([(message_type, oversized_data)], True) | ||||
|  | ||||
|             # Wait for the connection to be closed by ESPHome | ||||
|             await asyncio.wait_for(disconnect_event.wait(), timeout=5.0) | ||||
|  | ||||
|         # After disconnection, verify process didn't crash | ||||
|         assert not process_exited, "ESPHome process should not crash" | ||||
|         # Verify we saw the expected warning message | ||||
|         assert cipherstate_failed, ( | ||||
|             "Expected to see warning about CIPHERSTATE_DECRYPT_FAILED" | ||||
|         ) | ||||
|  | ||||
|         # Try to reconnect to verify the process is still running | ||||
|         async with api_client_connected_with_disconnect(noise_psk=noise_key) as ( | ||||
|             client2, | ||||
|             _, | ||||
|         ): | ||||
|             device_info = await client2.device_info() | ||||
|             assert device_info is not None | ||||
|             assert device_info.name == "oversized-noise" | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_oversized_protobuf_message_id_noise( | ||||
|     yaml_config: str, | ||||
|     run_compiled: RunCompiledFunction, | ||||
|     api_client_connected_with_disconnect: APIClientConnectedWithDisconnectFactory, | ||||
| ) -> None: | ||||
|     """Test that the noise protocol handles unknown message types correctly. | ||||
|  | ||||
|     With noise encryption, message types are stored as uint16_t (2 bytes) after decryption. | ||||
|     Unknown message types should be ignored without disconnecting, as ESPHome needs to | ||||
|     read the full message to maintain encryption stream continuity. | ||||
|     """ | ||||
|     noise_key = "N4Yle5YirwZhPiHHsdZLdOA73ndj/84veVaLhTvxCuU=" | ||||
|     process_exited = False | ||||
|  | ||||
|     def check_logs(line: str) -> None: | ||||
|         nonlocal process_exited | ||||
|         # Check for signs that the process exited/crashed | ||||
|         if "Segmentation fault" in line or "core dumped" in line: | ||||
|             process_exited = True | ||||
|  | ||||
|     async with run_compiled(yaml_config, line_callback=check_logs): | ||||
|         async with api_client_connected_with_disconnect(noise_psk=noise_key) as ( | ||||
|             client, | ||||
|             disconnect_event, | ||||
|         ): | ||||
|             # Verify basic connection works first | ||||
|             device_info = await client.device_info() | ||||
|             assert device_info is not None | ||||
|             assert device_info.name == "oversized-noise" | ||||
|  | ||||
|             # With noise, message types are uint16_t, so we test with an unknown but valid value | ||||
|             frame_helper = client._connection._frame_helper | ||||
|  | ||||
|             # Test with an unknown message type (65535 is not used by ESPHome) | ||||
|             unknown_message_id = 65535  # Valid uint16_t but unknown to ESPHome | ||||
|             payload = b"test" | ||||
|  | ||||
|             # Send the unknown message type - ESPHome should read and ignore it | ||||
|             frame_helper.write_packets([(unknown_message_id, payload)], True) | ||||
|  | ||||
|             # Give ESPHome a moment to process (but expect no disconnection) | ||||
|             # The connection should stay alive as ESPHome ignores unknown message types | ||||
|             with pytest.raises(asyncio.TimeoutError): | ||||
|                 await asyncio.wait_for(disconnect_event.wait(), timeout=0.5) | ||||
|  | ||||
|             # Connection should still be alive - unknown types are ignored, not fatal | ||||
|             assert client._connection.is_connected, ( | ||||
|                 "Connection should remain open for unknown message types" | ||||
|             ) | ||||
|  | ||||
|             # Verify we can still communicate by sending a valid request | ||||
|             device_info2 = await client.device_info() | ||||
|             assert device_info2 is not None | ||||
|             assert device_info2.name == "oversized-noise" | ||||
|  | ||||
|         # After test, verify process didn't crash | ||||
|         assert not process_exited, "ESPHome process should not crash" | ||||
|  | ||||
|         # Verify we can still reconnect | ||||
|         async with api_client_connected_with_disconnect(noise_psk=noise_key) as ( | ||||
|             client2, | ||||
|             _, | ||||
|         ): | ||||
|             device_info = await client2.device_info() | ||||
|             assert device_info is not None | ||||
|             assert device_info.name == "oversized-noise" | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_noise_corrupt_encrypted_frame( | ||||
|     yaml_config: str, | ||||
|     run_compiled: RunCompiledFunction, | ||||
|     api_client_connected_with_disconnect: APIClientConnectedWithDisconnectFactory, | ||||
| ) -> None: | ||||
|     """Test that noise protocol properly handles corrupt encrypted frames. | ||||
|  | ||||
|     Send a frame with valid size but corrupt encrypted content (garbage bytes). | ||||
|     This should fail decryption and cause disconnection. | ||||
|     """ | ||||
|     noise_key = "N4Yle5YirwZhPiHHsdZLdOA73ndj/84veVaLhTvxCuU=" | ||||
|     process_exited = False | ||||
|     cipherstate_failed = False | ||||
|  | ||||
|     def check_logs(line: str) -> None: | ||||
|         nonlocal process_exited, cipherstate_failed | ||||
|         # Check for signs that the process exited/crashed | ||||
|         if "Segmentation fault" in line or "core dumped" in line: | ||||
|             process_exited = True | ||||
|         # Check for the expected warning about decryption failure | ||||
|         if ( | ||||
|             "[W][api.connection" in line | ||||
|             and "Reading failed CIPHERSTATE_DECRYPT_FAILED" in line | ||||
|         ): | ||||
|             cipherstate_failed = True | ||||
|  | ||||
|     async with run_compiled(yaml_config, line_callback=check_logs): | ||||
|         async with api_client_connected_with_disconnect(noise_psk=noise_key) as ( | ||||
|             client, | ||||
|             disconnect_event, | ||||
|         ): | ||||
|             # Verify basic connection works first | ||||
|             device_info = await client.device_info() | ||||
|             assert device_info is not None | ||||
|             assert device_info.name == "oversized-noise" | ||||
|  | ||||
|             # Get the socket to send raw corrupt data | ||||
|             socket = client._connection._socket | ||||
|  | ||||
|             # Send a corrupt noise frame directly to the socket | ||||
|             # Format: [indicator=0x01][size_high][size_low][garbage_encrypted_data] | ||||
|             # Size of 32 bytes (reasonable size for a noise frame with MAC) | ||||
|             corrupt_frame = bytes( | ||||
|                 [ | ||||
|                     0x01,  # Noise indicator | ||||
|                     0x00,  # Size high byte | ||||
|                     0x20,  # Size low byte (32 bytes) | ||||
|                 ] | ||||
|             ) + bytes(32)  # 32 bytes of zeros (invalid encrypted data) | ||||
|  | ||||
|             # Send the corrupt frame | ||||
|             socket.sendall(corrupt_frame) | ||||
|  | ||||
|             # Wait for ESPHome to disconnect due to decryption failure | ||||
|             await asyncio.wait_for(disconnect_event.wait(), timeout=5.0) | ||||
|  | ||||
|         # After disconnection, verify process didn't crash | ||||
|         assert not process_exited, ( | ||||
|             "ESPHome process should not crash on corrupt encrypted frames" | ||||
|         ) | ||||
|         # Verify we saw the expected warning message | ||||
|         assert cipherstate_failed, ( | ||||
|             "Expected to see warning about CIPHERSTATE_DECRYPT_FAILED" | ||||
|         ) | ||||
|  | ||||
|         # Verify we can still reconnect after handling the corrupt frame | ||||
|         async with api_client_connected_with_disconnect(noise_psk=noise_key) as ( | ||||
|             client2, | ||||
|             _, | ||||
|         ): | ||||
|             device_info = await client2.device_info() | ||||
|             assert device_info is not None | ||||
|             assert device_info.name == "oversized-noise" | ||||
| @@ -54,3 +54,17 @@ class APIClientConnectedFactory(Protocol): | ||||
|         client_info: str = "integration-test", | ||||
|         timeout: float = 30, | ||||
|     ) -> AbstractAsyncContextManager[APIClient]: ... | ||||
|  | ||||
|  | ||||
| class APIClientConnectedWithDisconnectFactory(Protocol): | ||||
|     """Protocol for connected API client factory that returns disconnect event.""" | ||||
|  | ||||
|     def __call__(  # noqa: E704 | ||||
|         self, | ||||
|         address: str = "localhost", | ||||
|         port: int | None = None, | ||||
|         password: str = "", | ||||
|         noise_psk: str | None = None, | ||||
|         client_info: str = "integration-test", | ||||
|         timeout: float = 30, | ||||
|     ) -> AbstractAsyncContextManager[tuple[APIClient, asyncio.Event]]: ... | ||||
|   | ||||
| @@ -87,3 +87,17 @@ def mock_run_external_command() -> Generator[Mock, None, None]: | ||||
|     """Mock run_external_command for platformio_api.""" | ||||
|     with patch("esphome.platformio_api.run_external_command") as mock: | ||||
|         yield mock | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_run_git_command() -> Generator[Mock, None, None]: | ||||
|     """Mock run_git_command for git module.""" | ||||
|     with patch("esphome.git.run_git_command") as mock: | ||||
|         yield mock | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_get_idedata() -> Generator[Mock, None, None]: | ||||
|     """Mock get_idedata for platformio_api.""" | ||||
|     with patch("esphome.platformio_api.get_idedata") as mock: | ||||
|         yield mock | ||||
|   | ||||
							
								
								
									
										246
									
								
								tests/unit_tests/test_git.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										246
									
								
								tests/unit_tests/test_git.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,246 @@ | ||||
| """Tests for git.py module.""" | ||||
|  | ||||
| from datetime import datetime, timedelta | ||||
| import hashlib | ||||
| import os | ||||
| from pathlib import Path | ||||
| from unittest.mock import Mock | ||||
|  | ||||
| from esphome import git | ||||
| from esphome.core import CORE, TimePeriodSeconds | ||||
|  | ||||
|  | ||||
| def test_clone_or_update_with_never_refresh( | ||||
|     tmp_path: Path, mock_run_git_command: Mock | ||||
| ) -> None: | ||||
|     """Test that NEVER_REFRESH skips updates for existing repos.""" | ||||
|     # Set up CORE.config_path so data_dir uses tmp_path | ||||
|     CORE.config_path = tmp_path / "test.yaml" | ||||
|  | ||||
|     # Compute the expected repo directory path | ||||
|     url = "https://github.com/test/repo" | ||||
|     ref = None | ||||
|     key = f"{url}@{ref}" | ||||
|     domain = "test" | ||||
|  | ||||
|     # Compute hash-based directory name (matching _compute_destination_path logic) | ||||
|     h = hashlib.new("sha256") | ||||
|     h.update(key.encode()) | ||||
|     repo_dir = tmp_path / ".esphome" / domain / h.hexdigest()[:8] | ||||
|  | ||||
|     # Create the git repo directory structure | ||||
|     repo_dir.mkdir(parents=True) | ||||
|     git_dir = repo_dir / ".git" | ||||
|     git_dir.mkdir() | ||||
|  | ||||
|     # Create FETCH_HEAD file with current timestamp | ||||
|     fetch_head = git_dir / "FETCH_HEAD" | ||||
|     fetch_head.write_text("test") | ||||
|  | ||||
|     # Call with NEVER_REFRESH | ||||
|     result_dir, revert = git.clone_or_update( | ||||
|         url=url, | ||||
|         ref=ref, | ||||
|         refresh=git.NEVER_REFRESH, | ||||
|         domain=domain, | ||||
|     ) | ||||
|  | ||||
|     # Should NOT call git commands since NEVER_REFRESH and repo exists | ||||
|     mock_run_git_command.assert_not_called() | ||||
|     assert result_dir == repo_dir | ||||
|     assert revert is None | ||||
|  | ||||
|  | ||||
| def test_clone_or_update_with_refresh_updates_old_repo( | ||||
|     tmp_path: Path, mock_run_git_command: Mock | ||||
| ) -> None: | ||||
|     """Test that refresh triggers update for old repos.""" | ||||
|     # Set up CORE.config_path so data_dir uses tmp_path | ||||
|     CORE.config_path = tmp_path / "test.yaml" | ||||
|  | ||||
|     # Compute the expected repo directory path | ||||
|     url = "https://github.com/test/repo" | ||||
|     ref = None | ||||
|     key = f"{url}@{ref}" | ||||
|     domain = "test" | ||||
|  | ||||
|     # Compute hash-based directory name (matching _compute_destination_path logic) | ||||
|     h = hashlib.new("sha256") | ||||
|     h.update(key.encode()) | ||||
|     repo_dir = tmp_path / ".esphome" / domain / h.hexdigest()[:8] | ||||
|  | ||||
|     # Create the git repo directory structure | ||||
|     repo_dir.mkdir(parents=True) | ||||
|     git_dir = repo_dir / ".git" | ||||
|     git_dir.mkdir() | ||||
|  | ||||
|     # Create FETCH_HEAD file with old timestamp (2 days ago) | ||||
|     fetch_head = git_dir / "FETCH_HEAD" | ||||
|     fetch_head.write_text("test") | ||||
|     old_time = datetime.now() - timedelta(days=2) | ||||
|     fetch_head.touch()  # Create the file | ||||
|     # Set modification time to 2 days ago | ||||
|     os.utime(fetch_head, (old_time.timestamp(), old_time.timestamp())) | ||||
|  | ||||
|     # Mock git command responses | ||||
|     mock_run_git_command.return_value = "abc123"  # SHA for rev-parse | ||||
|  | ||||
|     # Call with refresh=1d (1 day) | ||||
|     refresh = TimePeriodSeconds(days=1) | ||||
|     result_dir, revert = git.clone_or_update( | ||||
|         url=url, | ||||
|         ref=ref, | ||||
|         refresh=refresh, | ||||
|         domain=domain, | ||||
|     ) | ||||
|  | ||||
|     # Should call git fetch and update commands since repo is older than refresh | ||||
|     assert mock_run_git_command.called | ||||
|     # Check for fetch command | ||||
|     fetch_calls = [ | ||||
|         call | ||||
|         for call in mock_run_git_command.call_args_list | ||||
|         if len(call[0]) > 0 and "fetch" in call[0][0] | ||||
|     ] | ||||
|     assert len(fetch_calls) > 0 | ||||
|  | ||||
|  | ||||
| def test_clone_or_update_with_refresh_skips_fresh_repo( | ||||
|     tmp_path: Path, mock_run_git_command: Mock | ||||
| ) -> None: | ||||
|     """Test that refresh doesn't update fresh repos.""" | ||||
|     # Set up CORE.config_path so data_dir uses tmp_path | ||||
|     CORE.config_path = tmp_path / "test.yaml" | ||||
|  | ||||
|     # Compute the expected repo directory path | ||||
|     url = "https://github.com/test/repo" | ||||
|     ref = None | ||||
|     key = f"{url}@{ref}" | ||||
|     domain = "test" | ||||
|  | ||||
|     # Compute hash-based directory name (matching _compute_destination_path logic) | ||||
|     h = hashlib.new("sha256") | ||||
|     h.update(key.encode()) | ||||
|     repo_dir = tmp_path / ".esphome" / domain / h.hexdigest()[:8] | ||||
|  | ||||
|     # Create the git repo directory structure | ||||
|     repo_dir.mkdir(parents=True) | ||||
|     git_dir = repo_dir / ".git" | ||||
|     git_dir.mkdir() | ||||
|  | ||||
|     # Create FETCH_HEAD file with recent timestamp (1 hour ago) | ||||
|     fetch_head = git_dir / "FETCH_HEAD" | ||||
|     fetch_head.write_text("test") | ||||
|     recent_time = datetime.now() - timedelta(hours=1) | ||||
|     fetch_head.touch()  # Create the file | ||||
|     # Set modification time to 1 hour ago | ||||
|     os.utime(fetch_head, (recent_time.timestamp(), recent_time.timestamp())) | ||||
|  | ||||
|     # Call with refresh=1d (1 day) | ||||
|     refresh = TimePeriodSeconds(days=1) | ||||
|     result_dir, revert = git.clone_or_update( | ||||
|         url=url, | ||||
|         ref=ref, | ||||
|         refresh=refresh, | ||||
|         domain=domain, | ||||
|     ) | ||||
|  | ||||
|     # Should NOT call git fetch since repo is fresh | ||||
|     mock_run_git_command.assert_not_called() | ||||
|     assert result_dir == repo_dir | ||||
|     assert revert is None | ||||
|  | ||||
|  | ||||
| def test_clone_or_update_clones_missing_repo( | ||||
|     tmp_path: Path, mock_run_git_command: Mock | ||||
| ) -> None: | ||||
|     """Test that missing repos are cloned regardless of refresh setting.""" | ||||
|     # Set up CORE.config_path so data_dir uses tmp_path | ||||
|     CORE.config_path = tmp_path / "test.yaml" | ||||
|  | ||||
|     # Compute the expected repo directory path | ||||
|     url = "https://github.com/test/repo" | ||||
|     ref = None | ||||
|     key = f"{url}@{ref}" | ||||
|     domain = "test" | ||||
|  | ||||
|     # Compute hash-based directory name (matching _compute_destination_path logic) | ||||
|     h = hashlib.new("sha256") | ||||
|     h.update(key.encode()) | ||||
|     repo_dir = tmp_path / ".esphome" / domain / h.hexdigest()[:8] | ||||
|  | ||||
|     # Create base directory but NOT the repo itself | ||||
|     base_dir = tmp_path / ".esphome" / domain | ||||
|     base_dir.mkdir(parents=True) | ||||
|     # repo_dir should NOT exist | ||||
|     assert not repo_dir.exists() | ||||
|  | ||||
|     # Test with NEVER_REFRESH - should still clone since repo doesn't exist | ||||
|     result_dir, revert = git.clone_or_update( | ||||
|         url=url, | ||||
|         ref=ref, | ||||
|         refresh=git.NEVER_REFRESH, | ||||
|         domain=domain, | ||||
|     ) | ||||
|  | ||||
|     # Should call git clone | ||||
|     assert mock_run_git_command.called | ||||
|     clone_calls = [ | ||||
|         call | ||||
|         for call in mock_run_git_command.call_args_list | ||||
|         if len(call[0]) > 0 and "clone" in call[0][0] | ||||
|     ] | ||||
|     assert len(clone_calls) > 0 | ||||
|  | ||||
|  | ||||
| def test_clone_or_update_with_none_refresh_always_updates( | ||||
|     tmp_path: Path, mock_run_git_command: Mock | ||||
| ) -> None: | ||||
|     """Test that refresh=None always updates existing repos.""" | ||||
|     # Set up CORE.config_path so data_dir uses tmp_path | ||||
|     CORE.config_path = tmp_path / "test.yaml" | ||||
|  | ||||
|     # Compute the expected repo directory path | ||||
|     url = "https://github.com/test/repo" | ||||
|     ref = None | ||||
|     key = f"{url}@{ref}" | ||||
|     domain = "test" | ||||
|  | ||||
|     # Compute hash-based directory name (matching _compute_destination_path logic) | ||||
|     h = hashlib.new("sha256") | ||||
|     h.update(key.encode()) | ||||
|     repo_dir = tmp_path / ".esphome" / domain / h.hexdigest()[:8] | ||||
|  | ||||
|     # Create the git repo directory structure | ||||
|     repo_dir.mkdir(parents=True) | ||||
|     git_dir = repo_dir / ".git" | ||||
|     git_dir.mkdir() | ||||
|  | ||||
|     # Create FETCH_HEAD file with very recent timestamp (1 second ago) | ||||
|     fetch_head = git_dir / "FETCH_HEAD" | ||||
|     fetch_head.write_text("test") | ||||
|     recent_time = datetime.now() - timedelta(seconds=1) | ||||
|     fetch_head.touch()  # Create the file | ||||
|     # Set modification time to 1 second ago | ||||
|     os.utime(fetch_head, (recent_time.timestamp(), recent_time.timestamp())) | ||||
|  | ||||
|     # Mock git command responses | ||||
|     mock_run_git_command.return_value = "abc123"  # SHA for rev-parse | ||||
|  | ||||
|     # Call with refresh=None (default behavior) | ||||
|     result_dir, revert = git.clone_or_update( | ||||
|         url=url, | ||||
|         ref=ref, | ||||
|         refresh=None, | ||||
|         domain=domain, | ||||
|     ) | ||||
|  | ||||
|     # Should call git fetch and update commands since refresh=None means always update | ||||
|     assert mock_run_git_command.called | ||||
|     # Check for fetch command | ||||
|     fetch_calls = [ | ||||
|         call | ||||
|         for call in mock_run_git_command.call_args_list | ||||
|         if len(call[0]) > 0 and "fetch" in call[0][0] | ||||
|     ] | ||||
|     assert len(fetch_calls) > 0 | ||||
| @@ -5,16 +5,19 @@ from __future__ import annotations | ||||
| from collections.abc import Generator | ||||
| from dataclasses import dataclass | ||||
| from pathlib import Path | ||||
| import re | ||||
| from typing import Any | ||||
| from unittest.mock import MagicMock, Mock, patch | ||||
|  | ||||
| import pytest | ||||
| from pytest import CaptureFixture | ||||
|  | ||||
| from esphome import platformio_api | ||||
| from esphome.__main__ import ( | ||||
|     Purpose, | ||||
|     choose_upload_log_host, | ||||
|     command_rename, | ||||
|     command_update_all, | ||||
|     command_wizard, | ||||
|     get_port_type, | ||||
|     has_ip_address, | ||||
| @@ -26,7 +29,9 @@ from esphome.__main__ import ( | ||||
|     mqtt_get_ip, | ||||
|     show_logs, | ||||
|     upload_program, | ||||
|     upload_using_esptool, | ||||
| ) | ||||
| from esphome.components.esp32.const import KEY_ESP32, KEY_VARIANT, VARIANT_ESP32 | ||||
| from esphome.const import ( | ||||
|     CONF_API, | ||||
|     CONF_BROKER, | ||||
| @@ -55,6 +60,17 @@ from esphome.const import ( | ||||
| from esphome.core import CORE, EsphomeError | ||||
|  | ||||
|  | ||||
| def strip_ansi_codes(text: str) -> str: | ||||
|     """Remove ANSI escape codes from text. | ||||
|  | ||||
|     This helps make test assertions cleaner by removing color codes and other | ||||
|     terminal formatting that can make tests brittle. | ||||
|     """ | ||||
|     # Pattern to match ANSI escape sequences | ||||
|     ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") | ||||
|     return ansi_escape.sub("", text) | ||||
|  | ||||
|  | ||||
| @dataclass | ||||
| class MockSerialPort: | ||||
|     """Mock serial port for testing. | ||||
| @@ -207,6 +223,14 @@ def mock_run_external_process() -> Generator[Mock]: | ||||
|         yield mock | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def mock_run_external_command() -> Generator[Mock]: | ||||
|     """Mock run_external_command for testing.""" | ||||
|     with patch("esphome.__main__.run_external_command") as mock: | ||||
|         mock.return_value = 0  # Default to success | ||||
|         yield mock | ||||
|  | ||||
|  | ||||
| def test_choose_upload_log_host_with_string_default() -> None: | ||||
|     """Test with a single string default device.""" | ||||
|     setup_core() | ||||
| @@ -805,6 +829,122 @@ def test_upload_program_serial_esp8266_with_file( | ||||
|     ) | ||||
|  | ||||
|  | ||||
| def test_upload_using_esptool_path_conversion( | ||||
|     tmp_path: Path, | ||||
|     mock_run_external_command: Mock, | ||||
|     mock_get_idedata: Mock, | ||||
| ) -> None: | ||||
|     """Test upload_using_esptool properly converts Path objects to strings for esptool. | ||||
|  | ||||
|     This test ensures that img.path (Path object) is converted to string before | ||||
|     passing to esptool, preventing AttributeError. | ||||
|     """ | ||||
|     setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path, name="test") | ||||
|  | ||||
|     # Set up ESP32-specific data required by get_esp32_variant() | ||||
|     CORE.data[KEY_ESP32] = {KEY_VARIANT: VARIANT_ESP32} | ||||
|  | ||||
|     # Create mock IDEData with Path objects | ||||
|     mock_idedata = MagicMock(spec=platformio_api.IDEData) | ||||
|     mock_idedata.firmware_bin_path = tmp_path / "firmware.bin" | ||||
|     mock_idedata.extra_flash_images = [ | ||||
|         platformio_api.FlashImage(path=tmp_path / "bootloader.bin", offset="0x1000"), | ||||
|         platformio_api.FlashImage(path=tmp_path / "partitions.bin", offset="0x8000"), | ||||
|     ] | ||||
|  | ||||
|     mock_get_idedata.return_value = mock_idedata | ||||
|  | ||||
|     # Create the actual firmware files so they exist | ||||
|     (tmp_path / "firmware.bin").touch() | ||||
|     (tmp_path / "bootloader.bin").touch() | ||||
|     (tmp_path / "partitions.bin").touch() | ||||
|  | ||||
|     config = {CONF_ESPHOME: {"platformio_options": {}}} | ||||
|  | ||||
|     # Call upload_using_esptool without custom file argument | ||||
|     result = upload_using_esptool(config, "/dev/ttyUSB0", None, None) | ||||
|  | ||||
|     assert result == 0 | ||||
|  | ||||
|     # Verify that run_external_command was called | ||||
|     assert mock_run_external_command.call_count == 1 | ||||
|  | ||||
|     # Get the actual call arguments | ||||
|     call_args = mock_run_external_command.call_args[0] | ||||
|  | ||||
|     # The first argument should be esptool.main function, | ||||
|     # followed by the command arguments | ||||
|     assert len(call_args) > 1 | ||||
|  | ||||
|     # Find the indices of the flash image arguments | ||||
|     # They should come after "write-flash" and "-z" | ||||
|     cmd_list = list(call_args[1:])  # Skip the esptool.main function | ||||
|  | ||||
|     # Verify all paths are strings, not Path objects | ||||
|     # The firmware and flash images should be at specific positions | ||||
|     write_flash_idx = cmd_list.index("write-flash") | ||||
|  | ||||
|     # After write-flash we have: -z, --flash-size, detect, then offset/path pairs | ||||
|     # Check firmware at offset 0x10000 (ESP32) | ||||
|     firmware_offset_idx = write_flash_idx + 4 | ||||
|     assert cmd_list[firmware_offset_idx] == "0x10000" | ||||
|     firmware_path = cmd_list[firmware_offset_idx + 1] | ||||
|     assert isinstance(firmware_path, str) | ||||
|     assert firmware_path.endswith("firmware.bin") | ||||
|  | ||||
|     # Check bootloader | ||||
|     bootloader_offset_idx = firmware_offset_idx + 2 | ||||
|     assert cmd_list[bootloader_offset_idx] == "0x1000" | ||||
|     bootloader_path = cmd_list[bootloader_offset_idx + 1] | ||||
|     assert isinstance(bootloader_path, str) | ||||
|     assert bootloader_path.endswith("bootloader.bin") | ||||
|  | ||||
|     # Check partitions | ||||
|     partitions_offset_idx = bootloader_offset_idx + 2 | ||||
|     assert cmd_list[partitions_offset_idx] == "0x8000" | ||||
|     partitions_path = cmd_list[partitions_offset_idx + 1] | ||||
|     assert isinstance(partitions_path, str) | ||||
|     assert partitions_path.endswith("partitions.bin") | ||||
|  | ||||
|  | ||||
| def test_upload_using_esptool_with_file_path( | ||||
|     tmp_path: Path, | ||||
|     mock_run_external_command: Mock, | ||||
| ) -> None: | ||||
|     """Test upload_using_esptool with a custom file that's a Path object.""" | ||||
|     setup_core(platform=PLATFORM_ESP8266, tmp_path=tmp_path, name="test") | ||||
|  | ||||
|     # Create a test firmware file | ||||
|     firmware_file = tmp_path / "custom_firmware.bin" | ||||
|     firmware_file.touch() | ||||
|  | ||||
|     config = {CONF_ESPHOME: {"platformio_options": {}}} | ||||
|  | ||||
|     # Call with a Path object as the file argument (though usually it's a string) | ||||
|     result = upload_using_esptool(config, "/dev/ttyUSB0", str(firmware_file), None) | ||||
|  | ||||
|     assert result == 0 | ||||
|  | ||||
|     # Verify that run_external_command was called | ||||
|     mock_run_external_command.assert_called_once() | ||||
|  | ||||
|     # Get the actual call arguments | ||||
|     call_args = mock_run_external_command.call_args[0] | ||||
|     cmd_list = list(call_args[1:])  # Skip the esptool.main function | ||||
|  | ||||
|     # Find the firmware path in the command | ||||
|     write_flash_idx = cmd_list.index("write-flash") | ||||
|  | ||||
|     # For custom file, it should be at offset 0x0 | ||||
|     firmware_offset_idx = write_flash_idx + 4 | ||||
|     assert cmd_list[firmware_offset_idx] == "0x0" | ||||
|     firmware_path = cmd_list[firmware_offset_idx + 1] | ||||
|  | ||||
|     # Verify it's a string, not a Path object | ||||
|     assert isinstance(firmware_path, str) | ||||
|     assert firmware_path.endswith("custom_firmware.bin") | ||||
|  | ||||
|  | ||||
| @pytest.mark.parametrize( | ||||
|     "platform,device", | ||||
|     [ | ||||
| @@ -1545,3 +1685,171 @@ esp32: | ||||
|  | ||||
|     captured = capfd.readouterr() | ||||
|     assert "Rename failed" in captured.out | ||||
|  | ||||
|  | ||||
| def test_command_update_all_path_string_conversion( | ||||
|     tmp_path: Path, | ||||
|     mock_run_external_process: Mock, | ||||
|     capfd: CaptureFixture[str], | ||||
| ) -> None: | ||||
|     """Test that command_update_all properly converts Path objects to strings in output.""" | ||||
|     yaml1 = tmp_path / "device1.yaml" | ||||
|     yaml1.write_text(""" | ||||
| esphome: | ||||
|   name: device1 | ||||
|  | ||||
| esp32: | ||||
|   board: nodemcu-32s | ||||
| """) | ||||
|  | ||||
|     yaml2 = tmp_path / "device2.yaml" | ||||
|     yaml2.write_text(""" | ||||
| esphome: | ||||
|   name: device2 | ||||
|  | ||||
| esp8266: | ||||
|   board: nodemcuv2 | ||||
| """) | ||||
|  | ||||
|     setup_core(tmp_path=tmp_path) | ||||
|     mock_run_external_process.return_value = 0 | ||||
|  | ||||
|     assert command_update_all(MockArgs(configuration=[str(tmp_path)])) == 0 | ||||
|  | ||||
|     captured = capfd.readouterr() | ||||
|     clean_output = strip_ansi_codes(captured.out) | ||||
|  | ||||
|     # Check that Path objects were properly converted to strings | ||||
|     # The output should contain file paths without causing TypeError | ||||
|     assert "device1.yaml" in clean_output | ||||
|     assert "device2.yaml" in clean_output | ||||
|     assert "SUCCESS" in clean_output | ||||
|     assert "SUMMARY" in clean_output | ||||
|  | ||||
|     # Verify run_external_process was called for each file | ||||
|     assert mock_run_external_process.call_count == 2 | ||||
|  | ||||
|  | ||||
| def test_command_update_all_with_failures( | ||||
|     tmp_path: Path, | ||||
|     mock_run_external_process: Mock, | ||||
|     capfd: CaptureFixture[str], | ||||
| ) -> None: | ||||
|     """Test command_update_all handles mixed success/failure cases properly.""" | ||||
|     yaml1 = tmp_path / "success_device.yaml" | ||||
|     yaml1.write_text(""" | ||||
| esphome: | ||||
|   name: success_device | ||||
|  | ||||
| esp32: | ||||
|   board: nodemcu-32s | ||||
| """) | ||||
|  | ||||
|     yaml2 = tmp_path / "failed_device.yaml" | ||||
|     yaml2.write_text(""" | ||||
| esphome: | ||||
|   name: failed_device | ||||
|  | ||||
| esp8266: | ||||
|   board: nodemcuv2 | ||||
| """) | ||||
|  | ||||
|     setup_core(tmp_path=tmp_path) | ||||
|  | ||||
|     # Mock mixed results - first succeeds, second fails | ||||
|     mock_run_external_process.side_effect = [0, 1] | ||||
|  | ||||
|     # Should return 1 (failure) since one device failed | ||||
|     assert command_update_all(MockArgs(configuration=[str(tmp_path)])) == 1 | ||||
|  | ||||
|     captured = capfd.readouterr() | ||||
|     clean_output = strip_ansi_codes(captured.out) | ||||
|  | ||||
|     # Check that both success and failure are properly displayed | ||||
|     assert "SUCCESS" in clean_output | ||||
|     assert "ERROR" in clean_output or "FAILED" in clean_output | ||||
|     assert "SUMMARY" in clean_output | ||||
|  | ||||
|     # Files are processed in alphabetical order, so we need to check which one succeeded/failed | ||||
|     # The mock_run_external_process.side_effect = [0, 1] applies to files in alphabetical order | ||||
|     # So "failed_device.yaml" gets 0 (success) and "success_device.yaml" gets 1 (failure) | ||||
|     assert "failed_device.yaml: SUCCESS" in clean_output | ||||
|     assert "success_device.yaml: FAILED" in clean_output | ||||
|  | ||||
|  | ||||
| def test_command_update_all_empty_directory( | ||||
|     tmp_path: Path, | ||||
|     mock_run_external_process: Mock, | ||||
|     capfd: CaptureFixture[str], | ||||
| ) -> None: | ||||
|     """Test command_update_all with an empty directory (no YAML files).""" | ||||
|     setup_core(tmp_path=tmp_path) | ||||
|  | ||||
|     assert command_update_all(MockArgs(configuration=[str(tmp_path)])) == 0 | ||||
|     mock_run_external_process.assert_not_called() | ||||
|  | ||||
|     captured = capfd.readouterr() | ||||
|     clean_output = strip_ansi_codes(captured.out) | ||||
|  | ||||
|     assert "SUMMARY" in clean_output | ||||
|  | ||||
|  | ||||
| def test_command_update_all_single_file( | ||||
|     tmp_path: Path, | ||||
|     mock_run_external_process: Mock, | ||||
|     capfd: CaptureFixture[str], | ||||
| ) -> None: | ||||
|     """Test command_update_all with a single YAML file specified.""" | ||||
|     yaml_file = tmp_path / "single_device.yaml" | ||||
|     yaml_file.write_text(""" | ||||
| esphome: | ||||
|   name: single_device | ||||
|  | ||||
| esp32: | ||||
|   board: nodemcu-32s | ||||
| """) | ||||
|  | ||||
|     setup_core(tmp_path=tmp_path) | ||||
|     mock_run_external_process.return_value = 0 | ||||
|  | ||||
|     assert command_update_all(MockArgs(configuration=[str(yaml_file)])) == 0 | ||||
|  | ||||
|     captured = capfd.readouterr() | ||||
|     clean_output = strip_ansi_codes(captured.out) | ||||
|  | ||||
|     assert "single_device.yaml" in clean_output | ||||
|     assert "SUCCESS" in clean_output | ||||
|     mock_run_external_process.assert_called_once() | ||||
|  | ||||
|  | ||||
| def test_command_update_all_path_formatting_in_color_calls( | ||||
|     tmp_path: Path, | ||||
|     mock_run_external_process: Mock, | ||||
|     capfd: CaptureFixture[str], | ||||
| ) -> None: | ||||
|     """Test that Path objects are properly converted when passed to color() function.""" | ||||
|     yaml_file = tmp_path / "test-device_123.yaml" | ||||
|     yaml_file.write_text(""" | ||||
| esphome: | ||||
|   name: test-device_123 | ||||
|  | ||||
| esp32: | ||||
|   board: nodemcu-32s | ||||
| """) | ||||
|  | ||||
|     setup_core(tmp_path=tmp_path) | ||||
|     mock_run_external_process.return_value = 0 | ||||
|  | ||||
|     assert command_update_all(MockArgs(configuration=[str(tmp_path)])) == 0 | ||||
|  | ||||
|     captured = capfd.readouterr() | ||||
|     clean_output = strip_ansi_codes(captured.out) | ||||
|  | ||||
|     assert "test-device_123.yaml" in clean_output | ||||
|     assert "Updating" in clean_output | ||||
|     assert "SUCCESS" in clean_output | ||||
|     assert "SUMMARY" in clean_output | ||||
|  | ||||
|     # Should not have any Python error messages | ||||
|     assert "TypeError" not in clean_output | ||||
|     assert "can only concatenate str" not in clean_output | ||||
|   | ||||
		Reference in New Issue
	
	Block a user