mirror of
https://github.com/esphome/esphome.git
synced 2025-09-16 18:22:22 +01:00
Add additional coverage ahead of Path conversion (#10723)
This commit is contained in:
188
tests/unit_tests/build_gen/test_platformio.py
Normal file
188
tests/unit_tests/build_gen/test_platformio.py
Normal file
@@ -0,0 +1,188 @@
|
|||||||
|
"""Tests for esphome.build_gen.platformio module."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections.abc import Generator
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from esphome.build_gen import platformio
|
||||||
|
from esphome.core import CORE
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_update_storage_json() -> Generator[MagicMock]:
|
||||||
|
"""Mock update_storage_json for all tests."""
|
||||||
|
with patch("esphome.build_gen.platformio.update_storage_json") as mock:
|
||||||
|
yield mock
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_write_file_if_changed() -> Generator[MagicMock]:
|
||||||
|
"""Mock write_file_if_changed for tests."""
|
||||||
|
with patch("esphome.build_gen.platformio.write_file_if_changed") as mock:
|
||||||
|
yield mock
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_ini_creates_new_file(
|
||||||
|
tmp_path: Path, mock_update_storage_json: MagicMock
|
||||||
|
) -> None:
|
||||||
|
"""Test write_ini creates a new platformio.ini file."""
|
||||||
|
CORE.build_path = str(tmp_path)
|
||||||
|
|
||||||
|
content = """
|
||||||
|
[env:test]
|
||||||
|
platform = espressif32
|
||||||
|
board = esp32dev
|
||||||
|
framework = arduino
|
||||||
|
"""
|
||||||
|
|
||||||
|
platformio.write_ini(content)
|
||||||
|
|
||||||
|
ini_file = tmp_path / "platformio.ini"
|
||||||
|
assert ini_file.exists()
|
||||||
|
|
||||||
|
file_content = ini_file.read_text()
|
||||||
|
assert content in file_content
|
||||||
|
assert platformio.INI_AUTO_GENERATE_BEGIN in file_content
|
||||||
|
assert platformio.INI_AUTO_GENERATE_END in file_content
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_ini_updates_existing_file(
|
||||||
|
tmp_path: Path, mock_update_storage_json: MagicMock
|
||||||
|
) -> None:
|
||||||
|
"""Test write_ini updates existing platformio.ini file."""
|
||||||
|
CORE.build_path = str(tmp_path)
|
||||||
|
|
||||||
|
# Create existing file with custom content
|
||||||
|
ini_file = tmp_path / "platformio.ini"
|
||||||
|
existing_content = f"""
|
||||||
|
; Custom header
|
||||||
|
[platformio]
|
||||||
|
default_envs = test
|
||||||
|
|
||||||
|
{platformio.INI_AUTO_GENERATE_BEGIN}
|
||||||
|
; Old auto-generated content
|
||||||
|
[env:old]
|
||||||
|
platform = old
|
||||||
|
{platformio.INI_AUTO_GENERATE_END}
|
||||||
|
|
||||||
|
; Custom footer
|
||||||
|
"""
|
||||||
|
ini_file.write_text(existing_content)
|
||||||
|
|
||||||
|
# New content to write
|
||||||
|
new_content = """
|
||||||
|
[env:test]
|
||||||
|
platform = espressif32
|
||||||
|
board = esp32dev
|
||||||
|
framework = arduino
|
||||||
|
"""
|
||||||
|
|
||||||
|
platformio.write_ini(new_content)
|
||||||
|
|
||||||
|
file_content = ini_file.read_text()
|
||||||
|
|
||||||
|
# Check that custom parts are preserved
|
||||||
|
assert "; Custom header" in file_content
|
||||||
|
assert "[platformio]" in file_content
|
||||||
|
assert "default_envs = test" in file_content
|
||||||
|
assert "; Custom footer" in file_content
|
||||||
|
|
||||||
|
# Check that new content replaced old auto-generated content
|
||||||
|
assert new_content in file_content
|
||||||
|
assert "[env:old]" not in file_content
|
||||||
|
assert "platform = old" not in file_content
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_ini_preserves_custom_sections(
|
||||||
|
tmp_path: Path, mock_update_storage_json: MagicMock
|
||||||
|
) -> None:
|
||||||
|
"""Test write_ini preserves custom sections outside auto-generate markers."""
|
||||||
|
CORE.build_path = str(tmp_path)
|
||||||
|
|
||||||
|
# Create existing file with multiple custom sections
|
||||||
|
ini_file = tmp_path / "platformio.ini"
|
||||||
|
existing_content = f"""
|
||||||
|
[platformio]
|
||||||
|
src_dir = .
|
||||||
|
include_dir = .
|
||||||
|
|
||||||
|
[common]
|
||||||
|
lib_deps =
|
||||||
|
Wire
|
||||||
|
SPI
|
||||||
|
|
||||||
|
{platformio.INI_AUTO_GENERATE_BEGIN}
|
||||||
|
[env:old]
|
||||||
|
platform = old
|
||||||
|
{platformio.INI_AUTO_GENERATE_END}
|
||||||
|
|
||||||
|
[env:custom]
|
||||||
|
upload_speed = 921600
|
||||||
|
monitor_speed = 115200
|
||||||
|
"""
|
||||||
|
ini_file.write_text(existing_content)
|
||||||
|
|
||||||
|
new_content = "[env:auto]\nplatform = new"
|
||||||
|
|
||||||
|
platformio.write_ini(new_content)
|
||||||
|
|
||||||
|
file_content = ini_file.read_text()
|
||||||
|
|
||||||
|
# All custom sections should be preserved
|
||||||
|
assert "[platformio]" in file_content
|
||||||
|
assert "src_dir = ." in file_content
|
||||||
|
assert "[common]" in file_content
|
||||||
|
assert "lib_deps" in file_content
|
||||||
|
assert "[env:custom]" in file_content
|
||||||
|
assert "upload_speed = 921600" in file_content
|
||||||
|
|
||||||
|
# New auto-generated content should replace old
|
||||||
|
assert "[env:auto]" in file_content
|
||||||
|
assert "platform = new" in file_content
|
||||||
|
assert "[env:old]" not in file_content
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_ini_no_change_when_content_same(
|
||||||
|
tmp_path: Path,
|
||||||
|
mock_update_storage_json: MagicMock,
|
||||||
|
mock_write_file_if_changed: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""Test write_ini doesn't rewrite file when content is unchanged."""
|
||||||
|
CORE.build_path = str(tmp_path)
|
||||||
|
|
||||||
|
content = "[env:test]\nplatform = esp32"
|
||||||
|
full_content = (
|
||||||
|
f"{platformio.INI_BASE_FORMAT[0]}"
|
||||||
|
f"{platformio.INI_AUTO_GENERATE_BEGIN}\n"
|
||||||
|
f"{content}"
|
||||||
|
f"{platformio.INI_AUTO_GENERATE_END}"
|
||||||
|
f"{platformio.INI_BASE_FORMAT[1]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
ini_file = tmp_path / "platformio.ini"
|
||||||
|
ini_file.write_text(full_content)
|
||||||
|
|
||||||
|
mock_write_file_if_changed.return_value = False # Indicate no change
|
||||||
|
platformio.write_ini(content)
|
||||||
|
|
||||||
|
# write_file_if_changed should be called with the same content
|
||||||
|
mock_write_file_if_changed.assert_called_once()
|
||||||
|
call_args = mock_write_file_if_changed.call_args[0]
|
||||||
|
assert call_args[0] == str(ini_file)
|
||||||
|
assert content in call_args[1]
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_ini_calls_update_storage_json(
|
||||||
|
tmp_path: Path, mock_update_storage_json: MagicMock
|
||||||
|
) -> None:
|
||||||
|
"""Test write_ini calls update_storage_json."""
|
||||||
|
CORE.build_path = str(tmp_path)
|
||||||
|
|
||||||
|
content = "[env:test]\nplatform = esp32"
|
||||||
|
|
||||||
|
platformio.write_ini(content)
|
||||||
|
mock_update_storage_json.assert_called_once()
|
@@ -35,6 +35,22 @@ from .common import load_config_from_fixture
|
|||||||
FIXTURES_DIR = Path(__file__).parent.parent / "fixtures" / "core" / "config"
|
FIXTURES_DIR = Path(__file__).parent.parent / "fixtures" / "core" / "config"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_cg_with_include_capture() -> tuple[Mock, list[str]]:
|
||||||
|
"""Mock code generation with include capture."""
|
||||||
|
includes_added: list[str] = []
|
||||||
|
|
||||||
|
with patch("esphome.core.config.cg") as mock_cg:
|
||||||
|
mock_raw_statement = MagicMock()
|
||||||
|
|
||||||
|
def capture_include(text: str) -> MagicMock:
|
||||||
|
includes_added.append(text)
|
||||||
|
return mock_raw_statement
|
||||||
|
|
||||||
|
mock_cg.RawStatement.side_effect = capture_include
|
||||||
|
yield mock_cg, includes_added
|
||||||
|
|
||||||
|
|
||||||
def test_validate_area_config_with_string() -> None:
|
def test_validate_area_config_with_string() -> None:
|
||||||
"""Test that string area config is converted to structured format."""
|
"""Test that string area config is converted to structured format."""
|
||||||
result = validate_area_config("Living Room")
|
result = validate_area_config("Living Room")
|
||||||
@@ -568,3 +584,262 @@ def test_is_target_platform() -> None:
|
|||||||
assert config._is_target_platform("rp2040") is True
|
assert config._is_target_platform("rp2040") is True
|
||||||
assert config._is_target_platform("invalid_platform") is False
|
assert config._is_target_platform("invalid_platform") is False
|
||||||
assert config._is_target_platform("api") is False # Component but not platform
|
assert config._is_target_platform("api") is False # Component but not platform
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_add_includes_with_single_file(
|
||||||
|
tmp_path: Path,
|
||||||
|
mock_copy_file_if_changed: Mock,
|
||||||
|
mock_cg_with_include_capture: tuple[Mock, list[str]],
|
||||||
|
) -> None:
|
||||||
|
"""Test add_includes copies a single header file to build directory."""
|
||||||
|
CORE.config_path = str(tmp_path / "config.yaml")
|
||||||
|
CORE.build_path = str(tmp_path / "build")
|
||||||
|
os.makedirs(CORE.build_path, exist_ok=True)
|
||||||
|
|
||||||
|
# Create include file
|
||||||
|
include_file = tmp_path / "my_header.h"
|
||||||
|
include_file.write_text("#define MY_CONSTANT 42")
|
||||||
|
|
||||||
|
mock_cg, includes_added = mock_cg_with_include_capture
|
||||||
|
|
||||||
|
await config.add_includes([str(include_file)])
|
||||||
|
|
||||||
|
# Verify copy_file_if_changed was called to copy the file
|
||||||
|
# Note: add_includes adds files to a src/ subdirectory
|
||||||
|
mock_copy_file_if_changed.assert_called_once_with(
|
||||||
|
str(include_file), str(Path(CORE.build_path) / "src" / "my_header.h")
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify include statement was added
|
||||||
|
assert any('#include "my_header.h"' in inc for inc in includes_added)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
|
||||||
|
async def test_add_includes_with_directory_unix(
|
||||||
|
tmp_path: Path,
|
||||||
|
mock_copy_file_if_changed: Mock,
|
||||||
|
mock_cg_with_include_capture: tuple[Mock, list[str]],
|
||||||
|
) -> None:
|
||||||
|
"""Test add_includes copies all files from a directory on Unix."""
|
||||||
|
CORE.config_path = str(tmp_path / "config.yaml")
|
||||||
|
CORE.build_path = str(tmp_path / "build")
|
||||||
|
os.makedirs(CORE.build_path, exist_ok=True)
|
||||||
|
|
||||||
|
# Create include directory with files
|
||||||
|
include_dir = tmp_path / "includes"
|
||||||
|
include_dir.mkdir()
|
||||||
|
(include_dir / "header1.h").write_text("#define HEADER1")
|
||||||
|
(include_dir / "header2.hpp").write_text("#define HEADER2")
|
||||||
|
(include_dir / "source.cpp").write_text("// Implementation")
|
||||||
|
(include_dir / "README.md").write_text(
|
||||||
|
"# Documentation"
|
||||||
|
) # Should be copied but not included
|
||||||
|
|
||||||
|
# Create subdirectory with files
|
||||||
|
subdir = include_dir / "subdir"
|
||||||
|
subdir.mkdir()
|
||||||
|
(subdir / "nested.h").write_text("#define NESTED")
|
||||||
|
|
||||||
|
mock_cg, includes_added = mock_cg_with_include_capture
|
||||||
|
|
||||||
|
await config.add_includes([str(include_dir)])
|
||||||
|
|
||||||
|
# Verify copy_file_if_changed was called for all files
|
||||||
|
assert mock_copy_file_if_changed.call_count == 5 # 4 code files + 1 README
|
||||||
|
|
||||||
|
# Verify include statements were added for valid extensions
|
||||||
|
include_strings = " ".join(includes_added)
|
||||||
|
assert "includes/header1.h" in include_strings
|
||||||
|
assert "includes/header2.hpp" in include_strings
|
||||||
|
assert "includes/subdir/nested.h" in include_strings
|
||||||
|
# CPP files are copied but not included
|
||||||
|
assert "source.cpp" not in include_strings or "#include" not in include_strings
|
||||||
|
# README.md should not have an include statement
|
||||||
|
assert "README.md" not in include_strings
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
|
||||||
|
async def test_add_includes_with_directory_windows(
|
||||||
|
tmp_path: Path,
|
||||||
|
mock_copy_file_if_changed: Mock,
|
||||||
|
mock_cg_with_include_capture: tuple[Mock, list[str]],
|
||||||
|
) -> None:
|
||||||
|
"""Test add_includes copies all files from a directory on Windows."""
|
||||||
|
CORE.config_path = str(tmp_path / "config.yaml")
|
||||||
|
CORE.build_path = str(tmp_path / "build")
|
||||||
|
os.makedirs(CORE.build_path, exist_ok=True)
|
||||||
|
|
||||||
|
# Create include directory with files
|
||||||
|
include_dir = tmp_path / "includes"
|
||||||
|
include_dir.mkdir()
|
||||||
|
(include_dir / "header1.h").write_text("#define HEADER1")
|
||||||
|
(include_dir / "header2.hpp").write_text("#define HEADER2")
|
||||||
|
(include_dir / "source.cpp").write_text("// Implementation")
|
||||||
|
(include_dir / "README.md").write_text(
|
||||||
|
"# Documentation"
|
||||||
|
) # Should be copied but not included
|
||||||
|
|
||||||
|
# Create subdirectory with files
|
||||||
|
subdir = include_dir / "subdir"
|
||||||
|
subdir.mkdir()
|
||||||
|
(subdir / "nested.h").write_text("#define NESTED")
|
||||||
|
|
||||||
|
mock_cg, includes_added = mock_cg_with_include_capture
|
||||||
|
|
||||||
|
await config.add_includes([str(include_dir)])
|
||||||
|
|
||||||
|
# Verify copy_file_if_changed was called for all files
|
||||||
|
assert mock_copy_file_if_changed.call_count == 5 # 4 code files + 1 README
|
||||||
|
|
||||||
|
# Verify include statements were added for valid extensions
|
||||||
|
include_strings = " ".join(includes_added)
|
||||||
|
assert "includes\\header1.h" in include_strings
|
||||||
|
assert "includes\\header2.hpp" in include_strings
|
||||||
|
assert "includes\\subdir\\nested.h" in include_strings
|
||||||
|
# CPP files are copied but not included
|
||||||
|
assert "source.cpp" not in include_strings or "#include" not in include_strings
|
||||||
|
# README.md should not have an include statement
|
||||||
|
assert "README.md" not in include_strings
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_add_includes_with_multiple_sources(
|
||||||
|
tmp_path: Path, mock_copy_file_if_changed: Mock
|
||||||
|
) -> None:
|
||||||
|
"""Test add_includes with multiple files and directories."""
|
||||||
|
CORE.config_path = str(tmp_path / "config.yaml")
|
||||||
|
CORE.build_path = str(tmp_path / "build")
|
||||||
|
os.makedirs(CORE.build_path, exist_ok=True)
|
||||||
|
|
||||||
|
# Create various include sources
|
||||||
|
single_file = tmp_path / "single.h"
|
||||||
|
single_file.write_text("#define SINGLE")
|
||||||
|
|
||||||
|
dir1 = tmp_path / "dir1"
|
||||||
|
dir1.mkdir()
|
||||||
|
(dir1 / "file1.h").write_text("#define FILE1")
|
||||||
|
|
||||||
|
dir2 = tmp_path / "dir2"
|
||||||
|
dir2.mkdir()
|
||||||
|
(dir2 / "file2.cpp").write_text("// File2")
|
||||||
|
|
||||||
|
with patch("esphome.core.config.cg"):
|
||||||
|
await config.add_includes([str(single_file), str(dir1), str(dir2)])
|
||||||
|
|
||||||
|
# Verify copy_file_if_changed was called for all files
|
||||||
|
assert mock_copy_file_if_changed.call_count == 3 # 3 files total
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_add_includes_empty_directory(
|
||||||
|
tmp_path: Path, mock_copy_file_if_changed: Mock
|
||||||
|
) -> None:
|
||||||
|
"""Test add_includes with an empty directory doesn't fail."""
|
||||||
|
CORE.config_path = str(tmp_path / "config.yaml")
|
||||||
|
CORE.build_path = str(tmp_path / "build")
|
||||||
|
os.makedirs(CORE.build_path, exist_ok=True)
|
||||||
|
|
||||||
|
# Create empty directory
|
||||||
|
empty_dir = tmp_path / "empty"
|
||||||
|
empty_dir.mkdir()
|
||||||
|
|
||||||
|
with patch("esphome.core.config.cg"):
|
||||||
|
# Should not raise any errors
|
||||||
|
await config.add_includes([str(empty_dir)])
|
||||||
|
|
||||||
|
# No files to copy from empty directory
|
||||||
|
mock_copy_file_if_changed.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
|
||||||
|
async def test_add_includes_preserves_directory_structure_unix(
|
||||||
|
tmp_path: Path, mock_copy_file_if_changed: Mock
|
||||||
|
) -> None:
|
||||||
|
"""Test that add_includes preserves relative directory structure on Unix."""
|
||||||
|
CORE.config_path = str(tmp_path / "config.yaml")
|
||||||
|
CORE.build_path = str(tmp_path / "build")
|
||||||
|
os.makedirs(CORE.build_path, exist_ok=True)
|
||||||
|
|
||||||
|
# Create nested directory structure
|
||||||
|
lib_dir = tmp_path / "lib"
|
||||||
|
lib_dir.mkdir()
|
||||||
|
|
||||||
|
src_dir = lib_dir / "src"
|
||||||
|
src_dir.mkdir()
|
||||||
|
(src_dir / "core.h").write_text("#define CORE")
|
||||||
|
|
||||||
|
utils_dir = lib_dir / "utils"
|
||||||
|
utils_dir.mkdir()
|
||||||
|
(utils_dir / "helper.h").write_text("#define HELPER")
|
||||||
|
|
||||||
|
with patch("esphome.core.config.cg"):
|
||||||
|
await config.add_includes([str(lib_dir)])
|
||||||
|
|
||||||
|
# Verify copy_file_if_changed was called with correct paths
|
||||||
|
calls = mock_copy_file_if_changed.call_args_list
|
||||||
|
dest_paths = [call[0][1] for call in calls]
|
||||||
|
|
||||||
|
# Check that relative paths are preserved
|
||||||
|
assert any("lib/src/core.h" in path for path in dest_paths)
|
||||||
|
assert any("lib/utils/helper.h" in path for path in dest_paths)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
|
||||||
|
async def test_add_includes_preserves_directory_structure_windows(
|
||||||
|
tmp_path: Path, mock_copy_file_if_changed: Mock
|
||||||
|
) -> None:
|
||||||
|
"""Test that add_includes preserves relative directory structure on Windows."""
|
||||||
|
CORE.config_path = str(tmp_path / "config.yaml")
|
||||||
|
CORE.build_path = str(tmp_path / "build")
|
||||||
|
os.makedirs(CORE.build_path, exist_ok=True)
|
||||||
|
|
||||||
|
# Create nested directory structure
|
||||||
|
lib_dir = tmp_path / "lib"
|
||||||
|
lib_dir.mkdir()
|
||||||
|
|
||||||
|
src_dir = lib_dir / "src"
|
||||||
|
src_dir.mkdir()
|
||||||
|
(src_dir / "core.h").write_text("#define CORE")
|
||||||
|
|
||||||
|
utils_dir = lib_dir / "utils"
|
||||||
|
utils_dir.mkdir()
|
||||||
|
(utils_dir / "helper.h").write_text("#define HELPER")
|
||||||
|
|
||||||
|
with patch("esphome.core.config.cg"):
|
||||||
|
await config.add_includes([str(lib_dir)])
|
||||||
|
|
||||||
|
# Verify copy_file_if_changed was called with correct paths
|
||||||
|
calls = mock_copy_file_if_changed.call_args_list
|
||||||
|
dest_paths = [call[0][1] for call in calls]
|
||||||
|
|
||||||
|
# Check that relative paths are preserved
|
||||||
|
assert any("lib\\src\\core.h" in path for path in dest_paths)
|
||||||
|
assert any("lib\\utils\\helper.h" in path for path in dest_paths)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_add_includes_overwrites_existing_files(
|
||||||
|
tmp_path: Path, mock_copy_file_if_changed: Mock
|
||||||
|
) -> None:
|
||||||
|
"""Test that add_includes overwrites existing files in build directory."""
|
||||||
|
CORE.config_path = str(tmp_path / "config.yaml")
|
||||||
|
CORE.build_path = str(tmp_path / "build")
|
||||||
|
os.makedirs(CORE.build_path, exist_ok=True)
|
||||||
|
|
||||||
|
# Create include file
|
||||||
|
include_file = tmp_path / "header.h"
|
||||||
|
include_file.write_text("#define NEW_VALUE 42")
|
||||||
|
|
||||||
|
with patch("esphome.core.config.cg"):
|
||||||
|
await config.add_includes([str(include_file)])
|
||||||
|
|
||||||
|
# Verify copy_file_if_changed was called (it handles overwriting)
|
||||||
|
# Note: add_includes adds files to a src/ subdirectory
|
||||||
|
mock_copy_file_if_changed.assert_called_once_with(
|
||||||
|
str(include_file), str(Path(CORE.build_path) / "src" / "header.h")
|
||||||
|
)
|
||||||
|
@@ -1,3 +1,6 @@
|
|||||||
|
import os
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
from hypothesis import given
|
from hypothesis import given
|
||||||
import pytest
|
import pytest
|
||||||
from strategies import mac_addr_strings
|
from strategies import mac_addr_strings
|
||||||
@@ -577,3 +580,83 @@ class TestEsphomeCore:
|
|||||||
|
|
||||||
assert target.is_esp32 is False
|
assert target.is_esp32 is False
|
||||||
assert target.is_esp8266 is True
|
assert target.is_esp8266 is True
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
|
||||||
|
def test_data_dir_default_unix(self, target):
|
||||||
|
"""Test data_dir returns .esphome in config directory by default on Unix."""
|
||||||
|
target.config_path = "/home/user/config.yaml"
|
||||||
|
assert target.data_dir == "/home/user/.esphome"
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
|
||||||
|
def test_data_dir_default_windows(self, target):
|
||||||
|
"""Test data_dir returns .esphome in config directory by default on Windows."""
|
||||||
|
target.config_path = "D:\\home\\user\\config.yaml"
|
||||||
|
assert target.data_dir == "D:\\home\\user\\.esphome"
|
||||||
|
|
||||||
|
def test_data_dir_ha_addon(self, target):
|
||||||
|
"""Test data_dir returns /data when running as Home Assistant addon."""
|
||||||
|
target.config_path = "/config/test.yaml"
|
||||||
|
|
||||||
|
with patch.dict(os.environ, {"ESPHOME_IS_HA_ADDON": "true"}):
|
||||||
|
assert target.data_dir == "/data"
|
||||||
|
|
||||||
|
def test_data_dir_env_override(self, target):
|
||||||
|
"""Test data_dir uses ESPHOME_DATA_DIR environment variable when set."""
|
||||||
|
target.config_path = "/home/user/config.yaml"
|
||||||
|
|
||||||
|
with patch.dict(os.environ, {"ESPHOME_DATA_DIR": "/custom/data/path"}):
|
||||||
|
assert target.data_dir == "/custom/data/path"
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
|
||||||
|
def test_data_dir_priority_unix(self, target):
|
||||||
|
"""Test data_dir priority on Unix: HA addon > env var > default."""
|
||||||
|
target.config_path = "/config/test.yaml"
|
||||||
|
expected_default = "/config/.esphome"
|
||||||
|
|
||||||
|
# Test HA addon takes priority over env var
|
||||||
|
with patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{"ESPHOME_IS_HA_ADDON": "true", "ESPHOME_DATA_DIR": "/custom/path"},
|
||||||
|
):
|
||||||
|
assert target.data_dir == "/data"
|
||||||
|
|
||||||
|
# Test env var is used when not HA addon
|
||||||
|
with patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{"ESPHOME_IS_HA_ADDON": "false", "ESPHOME_DATA_DIR": "/custom/path"},
|
||||||
|
):
|
||||||
|
assert target.data_dir == "/custom/path"
|
||||||
|
|
||||||
|
# Test default when neither is set
|
||||||
|
with patch.dict(os.environ, {}, clear=True):
|
||||||
|
# Ensure these env vars are not set
|
||||||
|
os.environ.pop("ESPHOME_IS_HA_ADDON", None)
|
||||||
|
os.environ.pop("ESPHOME_DATA_DIR", None)
|
||||||
|
assert target.data_dir == expected_default
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
|
||||||
|
def test_data_dir_priority_windows(self, target):
|
||||||
|
"""Test data_dir priority on Windows: HA addon > env var > default."""
|
||||||
|
target.config_path = "D:\\config\\test.yaml"
|
||||||
|
expected_default = "D:\\config\\.esphome"
|
||||||
|
|
||||||
|
# Test HA addon takes priority over env var
|
||||||
|
with patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{"ESPHOME_IS_HA_ADDON": "true", "ESPHOME_DATA_DIR": "/custom/path"},
|
||||||
|
):
|
||||||
|
assert target.data_dir == "/data"
|
||||||
|
|
||||||
|
# Test env var is used when not HA addon
|
||||||
|
with patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{"ESPHOME_IS_HA_ADDON": "false", "ESPHOME_DATA_DIR": "/custom/path"},
|
||||||
|
):
|
||||||
|
assert target.data_dir == "/custom/path"
|
||||||
|
|
||||||
|
# Test default when neither is set
|
||||||
|
with patch.dict(os.environ, {}, clear=True):
|
||||||
|
# Ensure these env vars are not set
|
||||||
|
os.environ.pop("ESPHOME_IS_HA_ADDON", None)
|
||||||
|
os.environ.pop("ESPHOME_DATA_DIR", None)
|
||||||
|
assert target.data_dir == expected_default
|
||||||
|
@@ -1,5 +1,8 @@
|
|||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
import socket
|
import socket
|
||||||
|
import stat
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
from aioesphomeapi.host_resolver import AddrInfo, IPv4Sockaddr, IPv6Sockaddr
|
from aioesphomeapi.host_resolver import AddrInfo, IPv4Sockaddr, IPv6Sockaddr
|
||||||
@@ -554,6 +557,239 @@ def test_addr_preference_ipv6_link_local_with_scope() -> None:
|
|||||||
assert helpers.addr_preference_(addr_info) == 1 # Has scope, so it's usable
|
assert helpers.addr_preference_(addr_info) == 1 # Has scope, so it's usable
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_p(tmp_path: Path) -> None:
|
||||||
|
"""Test mkdir_p creates directories recursively."""
|
||||||
|
# Test creating nested directories
|
||||||
|
nested_path = tmp_path / "level1" / "level2" / "level3"
|
||||||
|
helpers.mkdir_p(nested_path)
|
||||||
|
assert nested_path.exists()
|
||||||
|
assert nested_path.is_dir()
|
||||||
|
|
||||||
|
# Test that mkdir_p is idempotent (doesn't fail if directory exists)
|
||||||
|
helpers.mkdir_p(nested_path)
|
||||||
|
assert nested_path.exists()
|
||||||
|
|
||||||
|
# Test with empty path (should do nothing)
|
||||||
|
helpers.mkdir_p("")
|
||||||
|
|
||||||
|
# Test with existing directory
|
||||||
|
existing_dir = tmp_path / "existing"
|
||||||
|
existing_dir.mkdir()
|
||||||
|
helpers.mkdir_p(existing_dir)
|
||||||
|
assert existing_dir.exists()
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_p_file_exists_error(tmp_path: Path) -> None:
|
||||||
|
"""Test mkdir_p raises error when path is a file."""
|
||||||
|
# Create a file
|
||||||
|
file_path = tmp_path / "test_file.txt"
|
||||||
|
file_path.write_text("test content")
|
||||||
|
|
||||||
|
# Try to create directory with same name as existing file
|
||||||
|
with pytest.raises(EsphomeError, match=r"Error creating directories"):
|
||||||
|
helpers.mkdir_p(file_path)
|
||||||
|
|
||||||
|
|
||||||
|
def test_mkdir_p_with_existing_file_raises_error(tmp_path: Path) -> None:
|
||||||
|
"""Test mkdir_p raises error when trying to create dir over existing file."""
|
||||||
|
# Create a file where we want to create a directory
|
||||||
|
file_path = tmp_path / "existing_file"
|
||||||
|
file_path.write_text("content")
|
||||||
|
|
||||||
|
# Try to create a directory with a path that goes through the file
|
||||||
|
dir_path = file_path / "subdir"
|
||||||
|
|
||||||
|
with pytest.raises(EsphomeError, match=r"Error creating directories"):
|
||||||
|
helpers.mkdir_p(dir_path)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
|
||||||
|
def test_read_file_unix(tmp_path: Path) -> None:
|
||||||
|
"""Test read_file reads file content correctly on Unix."""
|
||||||
|
# Test reading regular file
|
||||||
|
test_file = tmp_path / "test.txt"
|
||||||
|
expected_content = "Test content\nLine 2\n"
|
||||||
|
test_file.write_text(expected_content)
|
||||||
|
|
||||||
|
content = helpers.read_file(test_file)
|
||||||
|
assert content == expected_content
|
||||||
|
|
||||||
|
# Test reading file with UTF-8 characters
|
||||||
|
utf8_file = tmp_path / "utf8.txt"
|
||||||
|
utf8_content = "Hello 世界 🌍"
|
||||||
|
utf8_file.write_text(utf8_content, encoding="utf-8")
|
||||||
|
|
||||||
|
content = helpers.read_file(utf8_file)
|
||||||
|
assert content == utf8_content
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
|
||||||
|
def test_read_file_windows(tmp_path: Path) -> None:
|
||||||
|
"""Test read_file reads file content correctly on Windows."""
|
||||||
|
# Test reading regular file
|
||||||
|
test_file = tmp_path / "test.txt"
|
||||||
|
expected_content = "Test content\nLine 2\n"
|
||||||
|
test_file.write_text(expected_content)
|
||||||
|
|
||||||
|
content = helpers.read_file(test_file)
|
||||||
|
# On Windows, text mode reading converts \n to \r\n
|
||||||
|
assert content == expected_content.replace("\n", "\r\n")
|
||||||
|
|
||||||
|
# Test reading file with UTF-8 characters
|
||||||
|
utf8_file = tmp_path / "utf8.txt"
|
||||||
|
utf8_content = "Hello 世界 🌍"
|
||||||
|
utf8_file.write_text(utf8_content, encoding="utf-8")
|
||||||
|
|
||||||
|
content = helpers.read_file(utf8_file)
|
||||||
|
assert content == utf8_content
|
||||||
|
|
||||||
|
|
||||||
|
def test_read_file_not_found() -> None:
|
||||||
|
"""Test read_file raises error for non-existent file."""
|
||||||
|
with pytest.raises(EsphomeError, match=r"Error reading file"):
|
||||||
|
helpers.read_file("/nonexistent/file.txt")
|
||||||
|
|
||||||
|
|
||||||
|
def test_read_file_unicode_decode_error(tmp_path: Path) -> None:
|
||||||
|
"""Test read_file raises error for invalid UTF-8."""
|
||||||
|
test_file = tmp_path / "invalid.txt"
|
||||||
|
# Write invalid UTF-8 bytes
|
||||||
|
test_file.write_bytes(b"\xff\xfe")
|
||||||
|
|
||||||
|
with pytest.raises(EsphomeError, match=r"Error reading file"):
|
||||||
|
helpers.read_file(test_file)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
|
||||||
|
def test_write_file_unix(tmp_path: Path) -> None:
|
||||||
|
"""Test write_file writes content correctly on Unix."""
|
||||||
|
# Test writing string content
|
||||||
|
test_file = tmp_path / "test.txt"
|
||||||
|
content = "Test content\nLine 2"
|
||||||
|
helpers.write_file(test_file, content)
|
||||||
|
|
||||||
|
assert test_file.read_text() == content
|
||||||
|
# Check file permissions
|
||||||
|
assert oct(test_file.stat().st_mode)[-3:] == "644"
|
||||||
|
|
||||||
|
# Test overwriting existing file
|
||||||
|
new_content = "New content"
|
||||||
|
helpers.write_file(test_file, new_content)
|
||||||
|
assert test_file.read_text() == new_content
|
||||||
|
|
||||||
|
# Test writing to nested directories (should create them)
|
||||||
|
nested_file = tmp_path / "dir1" / "dir2" / "file.txt"
|
||||||
|
helpers.write_file(nested_file, content)
|
||||||
|
assert nested_file.read_text() == content
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
|
||||||
|
def test_write_file_windows(tmp_path: Path) -> None:
|
||||||
|
"""Test write_file writes content correctly on Windows."""
|
||||||
|
# Test writing string content
|
||||||
|
test_file = tmp_path / "test.txt"
|
||||||
|
content = "Test content\nLine 2"
|
||||||
|
helpers.write_file(test_file, content)
|
||||||
|
|
||||||
|
assert test_file.read_text() == content
|
||||||
|
# Windows doesn't have Unix-style 644 permissions
|
||||||
|
|
||||||
|
# Test overwriting existing file
|
||||||
|
new_content = "New content"
|
||||||
|
helpers.write_file(test_file, new_content)
|
||||||
|
assert test_file.read_text() == new_content
|
||||||
|
|
||||||
|
# Test writing to nested directories (should create them)
|
||||||
|
nested_file = tmp_path / "dir1" / "dir2" / "file.txt"
|
||||||
|
helpers.write_file(nested_file, content)
|
||||||
|
assert nested_file.read_text() == content
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific permission test")
|
||||||
|
def test_write_file_to_non_writable_directory_unix(tmp_path: Path) -> None:
|
||||||
|
"""Test write_file raises error when directory is not writable on Unix."""
|
||||||
|
# Create a directory and make it read-only
|
||||||
|
read_only_dir = tmp_path / "readonly"
|
||||||
|
read_only_dir.mkdir()
|
||||||
|
test_file = read_only_dir / "test.txt"
|
||||||
|
|
||||||
|
# Make directory read-only (no write permission)
|
||||||
|
read_only_dir.chmod(0o555)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with pytest.raises(EsphomeError, match=r"Could not write file"):
|
||||||
|
helpers.write_file(test_file, "content")
|
||||||
|
finally:
|
||||||
|
# Restore write permissions for cleanup
|
||||||
|
read_only_dir.chmod(0o755)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
|
||||||
|
def test_write_file_to_non_writable_directory_windows(tmp_path: Path) -> None:
|
||||||
|
"""Test write_file error handling on Windows."""
|
||||||
|
# Windows handles permissions differently - test a different error case
|
||||||
|
# Try to write to a file path that contains an existing file as a directory component
|
||||||
|
existing_file = tmp_path / "file.txt"
|
||||||
|
existing_file.write_text("content")
|
||||||
|
|
||||||
|
# Try to write to a path that treats the file as a directory
|
||||||
|
invalid_path = existing_file / "subdir" / "test.txt"
|
||||||
|
|
||||||
|
with pytest.raises(EsphomeError, match=r"Could not write file"):
|
||||||
|
helpers.write_file(invalid_path, "content")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific permission test")
|
||||||
|
def test_write_file_with_permission_bits_unix(tmp_path: Path) -> None:
|
||||||
|
"""Test that write_file sets correct permissions on Unix."""
|
||||||
|
test_file = tmp_path / "test.txt"
|
||||||
|
helpers.write_file(test_file, "content")
|
||||||
|
|
||||||
|
# Check that file has 644 permissions
|
||||||
|
file_mode = test_file.stat().st_mode
|
||||||
|
assert stat.S_IMODE(file_mode) == 0o644
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific permission test")
|
||||||
|
def test_copy_file_if_changed_permission_recovery_unix(tmp_path: Path) -> None:
|
||||||
|
"""Test copy_file_if_changed handles permission errors correctly on Unix."""
|
||||||
|
# Test with read-only destination file
|
||||||
|
src = tmp_path / "source.txt"
|
||||||
|
dst = tmp_path / "dest.txt"
|
||||||
|
src.write_text("new content")
|
||||||
|
dst.write_text("old content")
|
||||||
|
dst.chmod(0o444) # Make destination read-only
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Should handle permission error by deleting and retrying
|
||||||
|
helpers.copy_file_if_changed(src, dst)
|
||||||
|
assert dst.read_text() == "new content"
|
||||||
|
finally:
|
||||||
|
# Restore write permissions for cleanup
|
||||||
|
if dst.exists():
|
||||||
|
dst.chmod(0o644)
|
||||||
|
|
||||||
|
|
||||||
|
def test_copy_file_if_changed_creates_directories(tmp_path: Path) -> None:
|
||||||
|
"""Test copy_file_if_changed creates missing directories."""
|
||||||
|
src = tmp_path / "source.txt"
|
||||||
|
dst = tmp_path / "subdir" / "nested" / "dest.txt"
|
||||||
|
src.write_text("content")
|
||||||
|
|
||||||
|
helpers.copy_file_if_changed(src, dst)
|
||||||
|
assert dst.exists()
|
||||||
|
assert dst.read_text() == "content"
|
||||||
|
|
||||||
|
|
||||||
|
def test_copy_file_if_changed_nonexistent_source(tmp_path: Path) -> None:
|
||||||
|
"""Test copy_file_if_changed with non-existent source."""
|
||||||
|
src = tmp_path / "nonexistent.txt"
|
||||||
|
dst = tmp_path / "dest.txt"
|
||||||
|
|
||||||
|
with pytest.raises(EsphomeError, match=r"Error copying file"):
|
||||||
|
helpers.copy_file_if_changed(src, dst)
|
||||||
|
|
||||||
|
|
||||||
def test_resolve_ip_address_sorting() -> None:
|
def test_resolve_ip_address_sorting() -> None:
|
||||||
"""Test that results are sorted by preference."""
|
"""Test that results are sorted by preference."""
|
||||||
# Create multiple address infos with different preferences
|
# Create multiple address infos with different preferences
|
||||||
|
@@ -1,5 +1,7 @@
|
|||||||
"""Tests for esphome.util module."""
|
"""Tests for esphome.util module."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
@@ -308,3 +310,85 @@ def test_filter_yaml_files_case_sensitive() -> None:
|
|||||||
assert "/path/to/config.YAML" not in result
|
assert "/path/to/config.YAML" not in result
|
||||||
assert "/path/to/config.YML" not in result
|
assert "/path/to/config.YML" not in result
|
||||||
assert "/path/to/config.Yaml" not in result
|
assert "/path/to/config.Yaml" not in result
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
("input_str", "expected"),
|
||||||
|
[
|
||||||
|
# Empty string
|
||||||
|
("", "''"),
|
||||||
|
# Simple strings that don't need quoting
|
||||||
|
("hello", "hello"),
|
||||||
|
("test123", "test123"),
|
||||||
|
("file.txt", "file.txt"),
|
||||||
|
("/path/to/file", "/path/to/file"),
|
||||||
|
("user@host", "user@host"),
|
||||||
|
("value:123", "value:123"),
|
||||||
|
("item,list", "item,list"),
|
||||||
|
("path-with-dash", "path-with-dash"),
|
||||||
|
# Strings that need quoting
|
||||||
|
("hello world", "'hello world'"),
|
||||||
|
("test\ttab", "'test\ttab'"),
|
||||||
|
("line\nbreak", "'line\nbreak'"),
|
||||||
|
("semicolon;here", "'semicolon;here'"),
|
||||||
|
("pipe|symbol", "'pipe|symbol'"),
|
||||||
|
("redirect>file", "'redirect>file'"),
|
||||||
|
("redirect<file", "'redirect<file'"),
|
||||||
|
("background&", "'background&'"),
|
||||||
|
("dollar$sign", "'dollar$sign'"),
|
||||||
|
("backtick`cmd", "'backtick`cmd'"),
|
||||||
|
('double"quote', "'double\"quote'"),
|
||||||
|
("backslash\\path", "'backslash\\path'"),
|
||||||
|
("question?mark", "'question?mark'"),
|
||||||
|
("asterisk*wild", "'asterisk*wild'"),
|
||||||
|
("bracket[test]", "'bracket[test]'"),
|
||||||
|
("paren(test)", "'paren(test)'"),
|
||||||
|
("curly{brace}", "'curly{brace}'"),
|
||||||
|
# Single quotes in string (special escaping)
|
||||||
|
("it's", "'it'\"'\"'s'"),
|
||||||
|
("don't", "'don'\"'\"'t'"),
|
||||||
|
("'quoted'", "''\"'\"'quoted'\"'\"''"),
|
||||||
|
# Complex combinations
|
||||||
|
("test 'with' quotes", "'test '\"'\"'with'\"'\"' quotes'"),
|
||||||
|
("path/to/file's.txt", "'path/to/file'\"'\"'s.txt'"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_shlex_quote(input_str: str, expected: str) -> None:
|
||||||
|
"""Test shlex_quote properly escapes shell arguments."""
|
||||||
|
assert util.shlex_quote(input_str) == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_shlex_quote_safe_characters() -> None:
|
||||||
|
"""Test that safe characters are not quoted."""
|
||||||
|
# These characters are considered safe and shouldn't be quoted
|
||||||
|
safe_chars = (
|
||||||
|
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789@%+=:,./-_"
|
||||||
|
)
|
||||||
|
for char in safe_chars:
|
||||||
|
assert util.shlex_quote(char) == char
|
||||||
|
assert util.shlex_quote(f"test{char}test") == f"test{char}test"
|
||||||
|
|
||||||
|
|
||||||
|
def test_shlex_quote_unsafe_characters() -> None:
|
||||||
|
"""Test that unsafe characters trigger quoting."""
|
||||||
|
# These characters should trigger quoting
|
||||||
|
unsafe_chars = ' \t\n;|>&<$`"\\?*[](){}!#~^'
|
||||||
|
for char in unsafe_chars:
|
||||||
|
result = util.shlex_quote(f"test{char}test")
|
||||||
|
assert result.startswith("'")
|
||||||
|
assert result.endswith("'")
|
||||||
|
|
||||||
|
|
||||||
|
def test_shlex_quote_edge_cases() -> None:
|
||||||
|
"""Test edge cases for shlex_quote."""
|
||||||
|
# Multiple single quotes
|
||||||
|
assert util.shlex_quote("'''") == "''\"'\"''\"'\"''\"'\"''"
|
||||||
|
|
||||||
|
# Mixed quotes
|
||||||
|
assert util.shlex_quote('"\'"') == "'\"'\"'\"'\"'"
|
||||||
|
|
||||||
|
# Only whitespace
|
||||||
|
assert util.shlex_quote(" ") == "' '"
|
||||||
|
assert util.shlex_quote("\t") == "'\t'"
|
||||||
|
assert util.shlex_quote("\n") == "'\n'"
|
||||||
|
assert util.shlex_quote(" ") == "' '"
|
||||||
|
Reference in New Issue
Block a user