mirror of
https://github.com/esphome/esphome.git
synced 2025-10-07 04:13:47 +01:00
Merge remote-tracking branch 'upstream/dev' into integration
This commit is contained in:
@@ -18,6 +18,7 @@ from esphome.const import (
|
||||
DEVICE_CLASS_TEMPERATURE,
|
||||
DEVICE_CLASS_VOLTAGE,
|
||||
STATE_CLASS_MEASUREMENT,
|
||||
STATE_CLASS_TOTAL_INCREASING,
|
||||
UNIT_AMPERE,
|
||||
UNIT_CELSIUS,
|
||||
UNIT_VOLT,
|
||||
@@ -162,7 +163,7 @@ INA2XX_SCHEMA = cv.Schema(
|
||||
unit_of_measurement=UNIT_WATT_HOURS,
|
||||
accuracy_decimals=8,
|
||||
device_class=DEVICE_CLASS_ENERGY,
|
||||
state_class=STATE_CLASS_MEASUREMENT,
|
||||
state_class=STATE_CLASS_TOTAL_INCREASING,
|
||||
),
|
||||
key=CONF_NAME,
|
||||
),
|
||||
@@ -170,7 +171,8 @@ INA2XX_SCHEMA = cv.Schema(
|
||||
sensor.sensor_schema(
|
||||
unit_of_measurement=UNIT_JOULE,
|
||||
accuracy_decimals=8,
|
||||
state_class=STATE_CLASS_MEASUREMENT,
|
||||
device_class=DEVICE_CLASS_ENERGY,
|
||||
state_class=STATE_CLASS_TOTAL_INCREASING,
|
||||
),
|
||||
key=CONF_NAME,
|
||||
),
|
||||
|
@@ -12,7 +12,7 @@ platformio==6.1.18 # When updating platformio, also update /docker/Dockerfile
|
||||
esptool==5.0.2
|
||||
click==8.1.7
|
||||
esphome-dashboard==20250904.0
|
||||
aioesphomeapi==40.2.0
|
||||
aioesphomeapi==40.2.1
|
||||
zeroconf==0.147.2
|
||||
puremagic==1.30
|
||||
ruamel.yaml==0.18.15 # dashboard_import
|
||||
|
@@ -1,8 +1,4 @@
|
||||
substitutions:
|
||||
network_enable_ipv6: "true"
|
||||
|
||||
bk72xx:
|
||||
framework:
|
||||
version: 1.7.0
|
||||
|
||||
<<: !include common.yaml
|
||||
|
1
tests/components/network/test.bk72xx-ard.yaml
Normal file
1
tests/components/network/test.bk72xx-ard.yaml
Normal file
@@ -0,0 +1 @@
|
||||
<<: !include common.yaml
|
188
tests/unit_tests/build_gen/test_platformio.py
Normal file
188
tests/unit_tests/build_gen/test_platformio.py
Normal file
@@ -0,0 +1,188 @@
|
||||
"""Tests for esphome.build_gen.platformio module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from esphome.build_gen import platformio
|
||||
from esphome.core import CORE
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_update_storage_json() -> Generator[MagicMock]:
|
||||
"""Mock update_storage_json for all tests."""
|
||||
with patch("esphome.build_gen.platformio.update_storage_json") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_write_file_if_changed() -> Generator[MagicMock]:
|
||||
"""Mock write_file_if_changed for tests."""
|
||||
with patch("esphome.build_gen.platformio.write_file_if_changed") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
def test_write_ini_creates_new_file(
|
||||
tmp_path: Path, mock_update_storage_json: MagicMock
|
||||
) -> None:
|
||||
"""Test write_ini creates a new platformio.ini file."""
|
||||
CORE.build_path = str(tmp_path)
|
||||
|
||||
content = """
|
||||
[env:test]
|
||||
platform = espressif32
|
||||
board = esp32dev
|
||||
framework = arduino
|
||||
"""
|
||||
|
||||
platformio.write_ini(content)
|
||||
|
||||
ini_file = tmp_path / "platformio.ini"
|
||||
assert ini_file.exists()
|
||||
|
||||
file_content = ini_file.read_text()
|
||||
assert content in file_content
|
||||
assert platformio.INI_AUTO_GENERATE_BEGIN in file_content
|
||||
assert platformio.INI_AUTO_GENERATE_END in file_content
|
||||
|
||||
|
||||
def test_write_ini_updates_existing_file(
|
||||
tmp_path: Path, mock_update_storage_json: MagicMock
|
||||
) -> None:
|
||||
"""Test write_ini updates existing platformio.ini file."""
|
||||
CORE.build_path = str(tmp_path)
|
||||
|
||||
# Create existing file with custom content
|
||||
ini_file = tmp_path / "platformio.ini"
|
||||
existing_content = f"""
|
||||
; Custom header
|
||||
[platformio]
|
||||
default_envs = test
|
||||
|
||||
{platformio.INI_AUTO_GENERATE_BEGIN}
|
||||
; Old auto-generated content
|
||||
[env:old]
|
||||
platform = old
|
||||
{platformio.INI_AUTO_GENERATE_END}
|
||||
|
||||
; Custom footer
|
||||
"""
|
||||
ini_file.write_text(existing_content)
|
||||
|
||||
# New content to write
|
||||
new_content = """
|
||||
[env:test]
|
||||
platform = espressif32
|
||||
board = esp32dev
|
||||
framework = arduino
|
||||
"""
|
||||
|
||||
platformio.write_ini(new_content)
|
||||
|
||||
file_content = ini_file.read_text()
|
||||
|
||||
# Check that custom parts are preserved
|
||||
assert "; Custom header" in file_content
|
||||
assert "[platformio]" in file_content
|
||||
assert "default_envs = test" in file_content
|
||||
assert "; Custom footer" in file_content
|
||||
|
||||
# Check that new content replaced old auto-generated content
|
||||
assert new_content in file_content
|
||||
assert "[env:old]" not in file_content
|
||||
assert "platform = old" not in file_content
|
||||
|
||||
|
||||
def test_write_ini_preserves_custom_sections(
|
||||
tmp_path: Path, mock_update_storage_json: MagicMock
|
||||
) -> None:
|
||||
"""Test write_ini preserves custom sections outside auto-generate markers."""
|
||||
CORE.build_path = str(tmp_path)
|
||||
|
||||
# Create existing file with multiple custom sections
|
||||
ini_file = tmp_path / "platformio.ini"
|
||||
existing_content = f"""
|
||||
[platformio]
|
||||
src_dir = .
|
||||
include_dir = .
|
||||
|
||||
[common]
|
||||
lib_deps =
|
||||
Wire
|
||||
SPI
|
||||
|
||||
{platformio.INI_AUTO_GENERATE_BEGIN}
|
||||
[env:old]
|
||||
platform = old
|
||||
{platformio.INI_AUTO_GENERATE_END}
|
||||
|
||||
[env:custom]
|
||||
upload_speed = 921600
|
||||
monitor_speed = 115200
|
||||
"""
|
||||
ini_file.write_text(existing_content)
|
||||
|
||||
new_content = "[env:auto]\nplatform = new"
|
||||
|
||||
platformio.write_ini(new_content)
|
||||
|
||||
file_content = ini_file.read_text()
|
||||
|
||||
# All custom sections should be preserved
|
||||
assert "[platformio]" in file_content
|
||||
assert "src_dir = ." in file_content
|
||||
assert "[common]" in file_content
|
||||
assert "lib_deps" in file_content
|
||||
assert "[env:custom]" in file_content
|
||||
assert "upload_speed = 921600" in file_content
|
||||
|
||||
# New auto-generated content should replace old
|
||||
assert "[env:auto]" in file_content
|
||||
assert "platform = new" in file_content
|
||||
assert "[env:old]" not in file_content
|
||||
|
||||
|
||||
def test_write_ini_no_change_when_content_same(
|
||||
tmp_path: Path,
|
||||
mock_update_storage_json: MagicMock,
|
||||
mock_write_file_if_changed: MagicMock,
|
||||
) -> None:
|
||||
"""Test write_ini doesn't rewrite file when content is unchanged."""
|
||||
CORE.build_path = str(tmp_path)
|
||||
|
||||
content = "[env:test]\nplatform = esp32"
|
||||
full_content = (
|
||||
f"{platformio.INI_BASE_FORMAT[0]}"
|
||||
f"{platformio.INI_AUTO_GENERATE_BEGIN}\n"
|
||||
f"{content}"
|
||||
f"{platformio.INI_AUTO_GENERATE_END}"
|
||||
f"{platformio.INI_BASE_FORMAT[1]}"
|
||||
)
|
||||
|
||||
ini_file = tmp_path / "platformio.ini"
|
||||
ini_file.write_text(full_content)
|
||||
|
||||
mock_write_file_if_changed.return_value = False # Indicate no change
|
||||
platformio.write_ini(content)
|
||||
|
||||
# write_file_if_changed should be called with the same content
|
||||
mock_write_file_if_changed.assert_called_once()
|
||||
call_args = mock_write_file_if_changed.call_args[0]
|
||||
assert call_args[0] == str(ini_file)
|
||||
assert content in call_args[1]
|
||||
|
||||
|
||||
def test_write_ini_calls_update_storage_json(
|
||||
tmp_path: Path, mock_update_storage_json: MagicMock
|
||||
) -> None:
|
||||
"""Test write_ini calls update_storage_json."""
|
||||
CORE.build_path = str(tmp_path)
|
||||
|
||||
content = "[env:test]\nplatform = esp32"
|
||||
|
||||
platformio.write_ini(content)
|
||||
mock_update_storage_json.assert_called_once()
|
@@ -9,8 +9,10 @@ not be part of a unit test suite.
|
||||
|
||||
"""
|
||||
|
||||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
import sys
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -43,3 +45,45 @@ def setup_core(tmp_path: Path) -> Path:
|
||||
"""Set up CORE with test paths."""
|
||||
CORE.config_path = str(tmp_path / "test.yaml")
|
||||
return tmp_path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_write_file_if_changed() -> Generator[Mock, None, None]:
|
||||
"""Mock write_file_if_changed for storage_json."""
|
||||
with patch("esphome.storage_json.write_file_if_changed") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_copy_file_if_changed() -> Generator[Mock, None, None]:
|
||||
"""Mock copy_file_if_changed for core.config."""
|
||||
with patch("esphome.core.config.copy_file_if_changed") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_run_platformio_cli() -> Generator[Mock, None, None]:
|
||||
"""Mock run_platformio_cli for platformio_api."""
|
||||
with patch("esphome.platformio_api.run_platformio_cli") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_run_platformio_cli_run() -> Generator[Mock, None, None]:
|
||||
"""Mock run_platformio_cli_run for platformio_api."""
|
||||
with patch("esphome.platformio_api.run_platformio_cli_run") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_decode_pc() -> Generator[Mock, None, None]:
|
||||
"""Mock _decode_pc for platformio_api."""
|
||||
with patch("esphome.platformio_api._decode_pc") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_run_external_command() -> Generator[Mock, None, None]:
|
||||
"""Mock run_external_command for platformio_api."""
|
||||
with patch("esphome.platformio_api.run_external_command") as mock:
|
||||
yield mock
|
||||
|
@@ -1,21 +1,56 @@
|
||||
"""Unit tests for core config functionality including areas and devices."""
|
||||
|
||||
from collections.abc import Callable
|
||||
import os
|
||||
from pathlib import Path
|
||||
import types
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from esphome import config_validation as cv, core
|
||||
from esphome.const import CONF_AREA, CONF_AREAS, CONF_DEVICES
|
||||
from esphome.core import config
|
||||
from esphome.core.config import Area, validate_area_config
|
||||
from esphome.const import (
|
||||
CONF_AREA,
|
||||
CONF_AREAS,
|
||||
CONF_BUILD_PATH,
|
||||
CONF_DEVICES,
|
||||
CONF_ESPHOME,
|
||||
CONF_NAME,
|
||||
CONF_NAME_ADD_MAC_SUFFIX,
|
||||
KEY_CORE,
|
||||
)
|
||||
from esphome.core import CORE, config
|
||||
from esphome.core.config import (
|
||||
Area,
|
||||
preload_core_config,
|
||||
valid_include,
|
||||
valid_project_name,
|
||||
validate_area_config,
|
||||
validate_hostname,
|
||||
)
|
||||
|
||||
from .common import load_config_from_fixture
|
||||
|
||||
FIXTURES_DIR = Path(__file__).parent.parent / "fixtures" / "core" / "config"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_cg_with_include_capture() -> tuple[Mock, list[str]]:
|
||||
"""Mock code generation with include capture."""
|
||||
includes_added: list[str] = []
|
||||
|
||||
with patch("esphome.core.config.cg") as mock_cg:
|
||||
mock_raw_statement = MagicMock()
|
||||
|
||||
def capture_include(text: str) -> MagicMock:
|
||||
includes_added.append(text)
|
||||
return mock_raw_statement
|
||||
|
||||
mock_cg.RawStatement.side_effect = capture_include
|
||||
yield mock_cg, includes_added
|
||||
|
||||
|
||||
def test_validate_area_config_with_string() -> None:
|
||||
"""Test that string area config is converted to structured format."""
|
||||
result = validate_area_config("Living Room")
|
||||
@@ -245,3 +280,566 @@ def test_add_platform_defines_priority() -> None:
|
||||
f"_add_platform_defines priority ({config._add_platform_defines.priority}) must be lower than "
|
||||
f"globals priority ({globals_to_code.priority}) to fix issue #10431 (sensor count bug with lambdas)"
|
||||
)
|
||||
|
||||
|
||||
def test_valid_include_with_angle_brackets() -> None:
|
||||
"""Test valid_include accepts angle bracket includes."""
|
||||
assert valid_include("<ArduinoJson.h>") == "<ArduinoJson.h>"
|
||||
|
||||
|
||||
def test_valid_include_with_valid_file(tmp_path: Path) -> None:
|
||||
"""Test valid_include accepts valid include files."""
|
||||
CORE.config_path = str(tmp_path / "test.yaml")
|
||||
include_file = tmp_path / "include.h"
|
||||
include_file.touch()
|
||||
|
||||
assert valid_include(str(include_file)) == str(include_file)
|
||||
|
||||
|
||||
def test_valid_include_with_valid_directory(tmp_path: Path) -> None:
|
||||
"""Test valid_include accepts valid directories."""
|
||||
CORE.config_path = str(tmp_path / "test.yaml")
|
||||
include_dir = tmp_path / "includes"
|
||||
include_dir.mkdir()
|
||||
|
||||
assert valid_include(str(include_dir)) == str(include_dir)
|
||||
|
||||
|
||||
def test_valid_include_invalid_extension(tmp_path: Path) -> None:
|
||||
"""Test valid_include rejects files with invalid extensions."""
|
||||
CORE.config_path = str(tmp_path / "test.yaml")
|
||||
invalid_file = tmp_path / "file.txt"
|
||||
invalid_file.touch()
|
||||
|
||||
with pytest.raises(cv.Invalid, match="Include has invalid file extension"):
|
||||
valid_include(str(invalid_file))
|
||||
|
||||
|
||||
def test_valid_project_name_valid() -> None:
|
||||
"""Test valid_project_name accepts valid project names."""
|
||||
assert valid_project_name("esphome.my_project") == "esphome.my_project"
|
||||
|
||||
|
||||
def test_valid_project_name_no_namespace() -> None:
|
||||
"""Test valid_project_name rejects names without namespace."""
|
||||
with pytest.raises(cv.Invalid, match="project name needs to have a namespace"):
|
||||
valid_project_name("my_project")
|
||||
|
||||
|
||||
def test_valid_project_name_multiple_dots() -> None:
|
||||
"""Test valid_project_name rejects names with multiple dots."""
|
||||
with pytest.raises(cv.Invalid, match="project name needs to have a namespace"):
|
||||
valid_project_name("esphome.my.project")
|
||||
|
||||
|
||||
def test_validate_hostname_valid() -> None:
|
||||
"""Test validate_hostname accepts valid hostnames."""
|
||||
config = {CONF_NAME: "my-device", CONF_NAME_ADD_MAC_SUFFIX: False}
|
||||
assert validate_hostname(config) == config
|
||||
|
||||
|
||||
def test_validate_hostname_too_long() -> None:
|
||||
"""Test validate_hostname rejects hostnames that are too long."""
|
||||
config = {
|
||||
CONF_NAME: "a" * 32, # 32 chars, max is 31
|
||||
CONF_NAME_ADD_MAC_SUFFIX: False,
|
||||
}
|
||||
with pytest.raises(cv.Invalid, match="Hostnames can only be 31 characters long"):
|
||||
validate_hostname(config)
|
||||
|
||||
|
||||
def test_validate_hostname_too_long_with_mac_suffix() -> None:
|
||||
"""Test validate_hostname accounts for MAC suffix length."""
|
||||
config = {
|
||||
CONF_NAME: "a" * 25, # 25 chars, max is 24 with MAC suffix
|
||||
CONF_NAME_ADD_MAC_SUFFIX: True,
|
||||
}
|
||||
with pytest.raises(cv.Invalid, match="Hostnames can only be 24 characters long"):
|
||||
validate_hostname(config)
|
||||
|
||||
|
||||
def test_validate_hostname_with_underscore(caplog) -> None:
|
||||
"""Test validate_hostname warns about underscores."""
|
||||
config = {CONF_NAME: "my_device", CONF_NAME_ADD_MAC_SUFFIX: False}
|
||||
assert validate_hostname(config) == config
|
||||
assert (
|
||||
"Using the '_' (underscore) character in the hostname is discouraged"
|
||||
in caplog.text
|
||||
)
|
||||
|
||||
|
||||
def test_preload_core_config_basic(setup_core: Path) -> None:
|
||||
"""Test preload_core_config sets basic CORE attributes."""
|
||||
config = {
|
||||
CONF_ESPHOME: {
|
||||
CONF_NAME: "test_device",
|
||||
},
|
||||
"esp32": {},
|
||||
}
|
||||
result = {}
|
||||
|
||||
platform = preload_core_config(config, result)
|
||||
|
||||
assert CORE.name == "test_device"
|
||||
assert platform == "esp32"
|
||||
assert KEY_CORE in CORE.data
|
||||
assert CONF_BUILD_PATH in config[CONF_ESPHOME]
|
||||
|
||||
|
||||
def test_preload_core_config_with_build_path(setup_core: Path) -> None:
|
||||
"""Test preload_core_config uses provided build path."""
|
||||
config = {
|
||||
CONF_ESPHOME: {
|
||||
CONF_NAME: "test_device",
|
||||
CONF_BUILD_PATH: "/custom/build/path",
|
||||
},
|
||||
"esp8266": {},
|
||||
}
|
||||
result = {}
|
||||
|
||||
platform = preload_core_config(config, result)
|
||||
|
||||
assert config[CONF_ESPHOME][CONF_BUILD_PATH] == "/custom/build/path"
|
||||
assert platform == "esp8266"
|
||||
|
||||
|
||||
def test_preload_core_config_env_build_path(setup_core: Path) -> None:
|
||||
"""Test preload_core_config uses ESPHOME_BUILD_PATH env var."""
|
||||
config = {
|
||||
CONF_ESPHOME: {
|
||||
CONF_NAME: "test_device",
|
||||
},
|
||||
"rp2040": {},
|
||||
}
|
||||
result = {}
|
||||
|
||||
with patch.dict(os.environ, {"ESPHOME_BUILD_PATH": "/env/build"}):
|
||||
platform = preload_core_config(config, result)
|
||||
|
||||
assert CONF_BUILD_PATH in config[CONF_ESPHOME]
|
||||
assert "test_device" in config[CONF_ESPHOME][CONF_BUILD_PATH]
|
||||
assert platform == "rp2040"
|
||||
|
||||
|
||||
def test_preload_core_config_no_platform(setup_core: Path) -> None:
|
||||
"""Test preload_core_config raises when no platform is specified."""
|
||||
config = {
|
||||
CONF_ESPHOME: {
|
||||
CONF_NAME: "test_device",
|
||||
},
|
||||
}
|
||||
result = {}
|
||||
|
||||
# Mock _is_target_platform to avoid expensive component loading
|
||||
with patch("esphome.core.config._is_target_platform") as mock_is_platform:
|
||||
# Return True for known platforms
|
||||
mock_is_platform.side_effect = lambda name: name in [
|
||||
"esp32",
|
||||
"esp8266",
|
||||
"rp2040",
|
||||
]
|
||||
|
||||
with pytest.raises(cv.Invalid, match="Platform missing"):
|
||||
preload_core_config(config, result)
|
||||
|
||||
|
||||
def test_preload_core_config_multiple_platforms(setup_core: Path) -> None:
|
||||
"""Test preload_core_config raises when multiple platforms are specified."""
|
||||
config = {
|
||||
CONF_ESPHOME: {
|
||||
CONF_NAME: "test_device",
|
||||
},
|
||||
"esp32": {},
|
||||
"esp8266": {},
|
||||
}
|
||||
result = {}
|
||||
|
||||
# Mock _is_target_platform to avoid expensive component loading
|
||||
with patch("esphome.core.config._is_target_platform") as mock_is_platform:
|
||||
# Return True for known platforms
|
||||
mock_is_platform.side_effect = lambda name: name in [
|
||||
"esp32",
|
||||
"esp8266",
|
||||
"rp2040",
|
||||
]
|
||||
|
||||
with pytest.raises(cv.Invalid, match="Found multiple target platform blocks"):
|
||||
preload_core_config(config, result)
|
||||
|
||||
|
||||
def test_include_file_header(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None:
|
||||
"""Test include_file adds include statement for header files."""
|
||||
src_file = tmp_path / "source.h"
|
||||
src_file.write_text("// Header content")
|
||||
|
||||
CORE.build_path = str(tmp_path / "build")
|
||||
|
||||
with patch("esphome.core.config.cg") as mock_cg:
|
||||
# Mock RawStatement to capture the text
|
||||
mock_raw_statement = MagicMock()
|
||||
mock_raw_statement.text = ""
|
||||
|
||||
def raw_statement_side_effect(text):
|
||||
mock_raw_statement.text = text
|
||||
return mock_raw_statement
|
||||
|
||||
mock_cg.RawStatement.side_effect = raw_statement_side_effect
|
||||
|
||||
config.include_file(str(src_file), "test.h")
|
||||
|
||||
mock_copy_file_if_changed.assert_called_once()
|
||||
mock_cg.add_global.assert_called_once()
|
||||
# Check that include statement was added
|
||||
assert '#include "test.h"' in mock_raw_statement.text
|
||||
|
||||
|
||||
def test_include_file_cpp(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None:
|
||||
"""Test include_file does not add include for cpp files."""
|
||||
src_file = tmp_path / "source.cpp"
|
||||
src_file.write_text("// CPP content")
|
||||
|
||||
CORE.build_path = str(tmp_path / "build")
|
||||
|
||||
with patch("esphome.core.config.cg") as mock_cg:
|
||||
config.include_file(str(src_file), "test.cpp")
|
||||
|
||||
mock_copy_file_if_changed.assert_called_once()
|
||||
# Should not add include statement for .cpp files
|
||||
mock_cg.add_global.assert_not_called()
|
||||
|
||||
|
||||
def test_get_usable_cpu_count() -> None:
|
||||
"""Test get_usable_cpu_count returns CPU count."""
|
||||
count = config.get_usable_cpu_count()
|
||||
assert isinstance(count, int)
|
||||
assert count > 0
|
||||
|
||||
|
||||
def test_get_usable_cpu_count_with_process_cpu_count() -> None:
|
||||
"""Test get_usable_cpu_count uses process_cpu_count when available."""
|
||||
# Test with process_cpu_count (Python 3.13+)
|
||||
# Create a mock os module with process_cpu_count
|
||||
|
||||
mock_os = types.SimpleNamespace(process_cpu_count=lambda: 8, cpu_count=lambda: 4)
|
||||
|
||||
with patch("esphome.core.config.os", mock_os):
|
||||
# When process_cpu_count exists, it should be used
|
||||
count = config.get_usable_cpu_count()
|
||||
assert count == 8
|
||||
|
||||
# Test fallback to cpu_count when process_cpu_count not available
|
||||
mock_os_no_process = types.SimpleNamespace(cpu_count=lambda: 4)
|
||||
|
||||
with patch("esphome.core.config.os", mock_os_no_process):
|
||||
count = config.get_usable_cpu_count()
|
||||
assert count == 4
|
||||
|
||||
|
||||
def test_list_target_platforms(tmp_path: Path) -> None:
|
||||
"""Test _list_target_platforms returns available platforms."""
|
||||
# Create mock components directory structure
|
||||
components_dir = tmp_path / "components"
|
||||
components_dir.mkdir()
|
||||
|
||||
# Create platform and non-platform directories with __init__.py
|
||||
platforms = ["esp32", "esp8266", "rp2040", "libretiny", "host"]
|
||||
non_platforms = ["sensor"]
|
||||
|
||||
for component in platforms + non_platforms:
|
||||
component_dir = components_dir / component
|
||||
component_dir.mkdir()
|
||||
(component_dir / "__init__.py").touch()
|
||||
|
||||
# Create a file (not a directory)
|
||||
(components_dir / "README.md").touch()
|
||||
|
||||
# Create a directory without __init__.py
|
||||
(components_dir / "no_init").mkdir()
|
||||
|
||||
# Mock Path(__file__).parents[1] to return our tmp_path
|
||||
with patch("esphome.core.config.Path") as mock_path:
|
||||
mock_file_path = MagicMock()
|
||||
mock_file_path.parents = [MagicMock(), tmp_path]
|
||||
mock_path.return_value = mock_file_path
|
||||
|
||||
platforms = config._list_target_platforms()
|
||||
|
||||
assert isinstance(platforms, list)
|
||||
# Should include platform components
|
||||
assert "esp32" in platforms
|
||||
assert "esp8266" in platforms
|
||||
assert "rp2040" in platforms
|
||||
assert "libretiny" in platforms
|
||||
assert "host" in platforms
|
||||
# Should not include non-platform components
|
||||
assert "sensor" not in platforms
|
||||
assert "README.md" not in platforms
|
||||
assert "no_init" not in platforms
|
||||
|
||||
|
||||
def test_is_target_platform() -> None:
|
||||
"""Test _is_target_platform identifies valid platforms."""
|
||||
assert config._is_target_platform("esp32") is True
|
||||
assert config._is_target_platform("esp8266") is True
|
||||
assert config._is_target_platform("rp2040") is True
|
||||
assert config._is_target_platform("invalid_platform") is False
|
||||
assert config._is_target_platform("api") is False # Component but not platform
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_includes_with_single_file(
|
||||
tmp_path: Path,
|
||||
mock_copy_file_if_changed: Mock,
|
||||
mock_cg_with_include_capture: tuple[Mock, list[str]],
|
||||
) -> None:
|
||||
"""Test add_includes copies a single header file to build directory."""
|
||||
CORE.config_path = str(tmp_path / "config.yaml")
|
||||
CORE.build_path = str(tmp_path / "build")
|
||||
os.makedirs(CORE.build_path, exist_ok=True)
|
||||
|
||||
# Create include file
|
||||
include_file = tmp_path / "my_header.h"
|
||||
include_file.write_text("#define MY_CONSTANT 42")
|
||||
|
||||
mock_cg, includes_added = mock_cg_with_include_capture
|
||||
|
||||
await config.add_includes([str(include_file)])
|
||||
|
||||
# Verify copy_file_if_changed was called to copy the file
|
||||
# Note: add_includes adds files to a src/ subdirectory
|
||||
mock_copy_file_if_changed.assert_called_once_with(
|
||||
str(include_file), str(Path(CORE.build_path) / "src" / "my_header.h")
|
||||
)
|
||||
|
||||
# Verify include statement was added
|
||||
assert any('#include "my_header.h"' in inc for inc in includes_added)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
|
||||
async def test_add_includes_with_directory_unix(
|
||||
tmp_path: Path,
|
||||
mock_copy_file_if_changed: Mock,
|
||||
mock_cg_with_include_capture: tuple[Mock, list[str]],
|
||||
) -> None:
|
||||
"""Test add_includes copies all files from a directory on Unix."""
|
||||
CORE.config_path = str(tmp_path / "config.yaml")
|
||||
CORE.build_path = str(tmp_path / "build")
|
||||
os.makedirs(CORE.build_path, exist_ok=True)
|
||||
|
||||
# Create include directory with files
|
||||
include_dir = tmp_path / "includes"
|
||||
include_dir.mkdir()
|
||||
(include_dir / "header1.h").write_text("#define HEADER1")
|
||||
(include_dir / "header2.hpp").write_text("#define HEADER2")
|
||||
(include_dir / "source.cpp").write_text("// Implementation")
|
||||
(include_dir / "README.md").write_text(
|
||||
"# Documentation"
|
||||
) # Should be copied but not included
|
||||
|
||||
# Create subdirectory with files
|
||||
subdir = include_dir / "subdir"
|
||||
subdir.mkdir()
|
||||
(subdir / "nested.h").write_text("#define NESTED")
|
||||
|
||||
mock_cg, includes_added = mock_cg_with_include_capture
|
||||
|
||||
await config.add_includes([str(include_dir)])
|
||||
|
||||
# Verify copy_file_if_changed was called for all files
|
||||
assert mock_copy_file_if_changed.call_count == 5 # 4 code files + 1 README
|
||||
|
||||
# Verify include statements were added for valid extensions
|
||||
include_strings = " ".join(includes_added)
|
||||
assert "includes/header1.h" in include_strings
|
||||
assert "includes/header2.hpp" in include_strings
|
||||
assert "includes/subdir/nested.h" in include_strings
|
||||
# CPP files are copied but not included
|
||||
assert "source.cpp" not in include_strings or "#include" not in include_strings
|
||||
# README.md should not have an include statement
|
||||
assert "README.md" not in include_strings
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
|
||||
async def test_add_includes_with_directory_windows(
|
||||
tmp_path: Path,
|
||||
mock_copy_file_if_changed: Mock,
|
||||
mock_cg_with_include_capture: tuple[Mock, list[str]],
|
||||
) -> None:
|
||||
"""Test add_includes copies all files from a directory on Windows."""
|
||||
CORE.config_path = str(tmp_path / "config.yaml")
|
||||
CORE.build_path = str(tmp_path / "build")
|
||||
os.makedirs(CORE.build_path, exist_ok=True)
|
||||
|
||||
# Create include directory with files
|
||||
include_dir = tmp_path / "includes"
|
||||
include_dir.mkdir()
|
||||
(include_dir / "header1.h").write_text("#define HEADER1")
|
||||
(include_dir / "header2.hpp").write_text("#define HEADER2")
|
||||
(include_dir / "source.cpp").write_text("// Implementation")
|
||||
(include_dir / "README.md").write_text(
|
||||
"# Documentation"
|
||||
) # Should be copied but not included
|
||||
|
||||
# Create subdirectory with files
|
||||
subdir = include_dir / "subdir"
|
||||
subdir.mkdir()
|
||||
(subdir / "nested.h").write_text("#define NESTED")
|
||||
|
||||
mock_cg, includes_added = mock_cg_with_include_capture
|
||||
|
||||
await config.add_includes([str(include_dir)])
|
||||
|
||||
# Verify copy_file_if_changed was called for all files
|
||||
assert mock_copy_file_if_changed.call_count == 5 # 4 code files + 1 README
|
||||
|
||||
# Verify include statements were added for valid extensions
|
||||
include_strings = " ".join(includes_added)
|
||||
assert "includes\\header1.h" in include_strings
|
||||
assert "includes\\header2.hpp" in include_strings
|
||||
assert "includes\\subdir\\nested.h" in include_strings
|
||||
# CPP files are copied but not included
|
||||
assert "source.cpp" not in include_strings or "#include" not in include_strings
|
||||
# README.md should not have an include statement
|
||||
assert "README.md" not in include_strings
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_includes_with_multiple_sources(
|
||||
tmp_path: Path, mock_copy_file_if_changed: Mock
|
||||
) -> None:
|
||||
"""Test add_includes with multiple files and directories."""
|
||||
CORE.config_path = str(tmp_path / "config.yaml")
|
||||
CORE.build_path = str(tmp_path / "build")
|
||||
os.makedirs(CORE.build_path, exist_ok=True)
|
||||
|
||||
# Create various include sources
|
||||
single_file = tmp_path / "single.h"
|
||||
single_file.write_text("#define SINGLE")
|
||||
|
||||
dir1 = tmp_path / "dir1"
|
||||
dir1.mkdir()
|
||||
(dir1 / "file1.h").write_text("#define FILE1")
|
||||
|
||||
dir2 = tmp_path / "dir2"
|
||||
dir2.mkdir()
|
||||
(dir2 / "file2.cpp").write_text("// File2")
|
||||
|
||||
with patch("esphome.core.config.cg"):
|
||||
await config.add_includes([str(single_file), str(dir1), str(dir2)])
|
||||
|
||||
# Verify copy_file_if_changed was called for all files
|
||||
assert mock_copy_file_if_changed.call_count == 3 # 3 files total
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_includes_empty_directory(
|
||||
tmp_path: Path, mock_copy_file_if_changed: Mock
|
||||
) -> None:
|
||||
"""Test add_includes with an empty directory doesn't fail."""
|
||||
CORE.config_path = str(tmp_path / "config.yaml")
|
||||
CORE.build_path = str(tmp_path / "build")
|
||||
os.makedirs(CORE.build_path, exist_ok=True)
|
||||
|
||||
# Create empty directory
|
||||
empty_dir = tmp_path / "empty"
|
||||
empty_dir.mkdir()
|
||||
|
||||
with patch("esphome.core.config.cg"):
|
||||
# Should not raise any errors
|
||||
await config.add_includes([str(empty_dir)])
|
||||
|
||||
# No files to copy from empty directory
|
||||
mock_copy_file_if_changed.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
|
||||
async def test_add_includes_preserves_directory_structure_unix(
|
||||
tmp_path: Path, mock_copy_file_if_changed: Mock
|
||||
) -> None:
|
||||
"""Test that add_includes preserves relative directory structure on Unix."""
|
||||
CORE.config_path = str(tmp_path / "config.yaml")
|
||||
CORE.build_path = str(tmp_path / "build")
|
||||
os.makedirs(CORE.build_path, exist_ok=True)
|
||||
|
||||
# Create nested directory structure
|
||||
lib_dir = tmp_path / "lib"
|
||||
lib_dir.mkdir()
|
||||
|
||||
src_dir = lib_dir / "src"
|
||||
src_dir.mkdir()
|
||||
(src_dir / "core.h").write_text("#define CORE")
|
||||
|
||||
utils_dir = lib_dir / "utils"
|
||||
utils_dir.mkdir()
|
||||
(utils_dir / "helper.h").write_text("#define HELPER")
|
||||
|
||||
with patch("esphome.core.config.cg"):
|
||||
await config.add_includes([str(lib_dir)])
|
||||
|
||||
# Verify copy_file_if_changed was called with correct paths
|
||||
calls = mock_copy_file_if_changed.call_args_list
|
||||
dest_paths = [call[0][1] for call in calls]
|
||||
|
||||
# Check that relative paths are preserved
|
||||
assert any("lib/src/core.h" in path for path in dest_paths)
|
||||
assert any("lib/utils/helper.h" in path for path in dest_paths)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
|
||||
async def test_add_includes_preserves_directory_structure_windows(
|
||||
tmp_path: Path, mock_copy_file_if_changed: Mock
|
||||
) -> None:
|
||||
"""Test that add_includes preserves relative directory structure on Windows."""
|
||||
CORE.config_path = str(tmp_path / "config.yaml")
|
||||
CORE.build_path = str(tmp_path / "build")
|
||||
os.makedirs(CORE.build_path, exist_ok=True)
|
||||
|
||||
# Create nested directory structure
|
||||
lib_dir = tmp_path / "lib"
|
||||
lib_dir.mkdir()
|
||||
|
||||
src_dir = lib_dir / "src"
|
||||
src_dir.mkdir()
|
||||
(src_dir / "core.h").write_text("#define CORE")
|
||||
|
||||
utils_dir = lib_dir / "utils"
|
||||
utils_dir.mkdir()
|
||||
(utils_dir / "helper.h").write_text("#define HELPER")
|
||||
|
||||
with patch("esphome.core.config.cg"):
|
||||
await config.add_includes([str(lib_dir)])
|
||||
|
||||
# Verify copy_file_if_changed was called with correct paths
|
||||
calls = mock_copy_file_if_changed.call_args_list
|
||||
dest_paths = [call[0][1] for call in calls]
|
||||
|
||||
# Check that relative paths are preserved
|
||||
assert any("lib\\src\\core.h" in path for path in dest_paths)
|
||||
assert any("lib\\utils\\helper.h" in path for path in dest_paths)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_includes_overwrites_existing_files(
|
||||
tmp_path: Path, mock_copy_file_if_changed: Mock
|
||||
) -> None:
|
||||
"""Test that add_includes overwrites existing files in build directory."""
|
||||
CORE.config_path = str(tmp_path / "config.yaml")
|
||||
CORE.build_path = str(tmp_path / "build")
|
||||
os.makedirs(CORE.build_path, exist_ok=True)
|
||||
|
||||
# Create include file
|
||||
include_file = tmp_path / "header.h"
|
||||
include_file.write_text("#define NEW_VALUE 42")
|
||||
|
||||
with patch("esphome.core.config.cg"):
|
||||
await config.add_includes([str(include_file)])
|
||||
|
||||
# Verify copy_file_if_changed was called (it handles overwriting)
|
||||
# Note: add_includes adds files to a src/ subdirectory
|
||||
mock_copy_file_if_changed.assert_called_once_with(
|
||||
str(include_file), str(Path(CORE.build_path) / "src" / "header.h")
|
||||
)
|
||||
|
@@ -1,3 +1,6 @@
|
||||
import os
|
||||
from unittest.mock import patch
|
||||
|
||||
from hypothesis import given
|
||||
import pytest
|
||||
from strategies import mac_addr_strings
|
||||
@@ -577,3 +580,83 @@ class TestEsphomeCore:
|
||||
|
||||
assert target.is_esp32 is False
|
||||
assert target.is_esp8266 is True
|
||||
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
|
||||
def test_data_dir_default_unix(self, target):
|
||||
"""Test data_dir returns .esphome in config directory by default on Unix."""
|
||||
target.config_path = "/home/user/config.yaml"
|
||||
assert target.data_dir == "/home/user/.esphome"
|
||||
|
||||
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
|
||||
def test_data_dir_default_windows(self, target):
|
||||
"""Test data_dir returns .esphome in config directory by default on Windows."""
|
||||
target.config_path = "D:\\home\\user\\config.yaml"
|
||||
assert target.data_dir == "D:\\home\\user\\.esphome"
|
||||
|
||||
def test_data_dir_ha_addon(self, target):
|
||||
"""Test data_dir returns /data when running as Home Assistant addon."""
|
||||
target.config_path = "/config/test.yaml"
|
||||
|
||||
with patch.dict(os.environ, {"ESPHOME_IS_HA_ADDON": "true"}):
|
||||
assert target.data_dir == "/data"
|
||||
|
||||
def test_data_dir_env_override(self, target):
|
||||
"""Test data_dir uses ESPHOME_DATA_DIR environment variable when set."""
|
||||
target.config_path = "/home/user/config.yaml"
|
||||
|
||||
with patch.dict(os.environ, {"ESPHOME_DATA_DIR": "/custom/data/path"}):
|
||||
assert target.data_dir == "/custom/data/path"
|
||||
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
|
||||
def test_data_dir_priority_unix(self, target):
|
||||
"""Test data_dir priority on Unix: HA addon > env var > default."""
|
||||
target.config_path = "/config/test.yaml"
|
||||
expected_default = "/config/.esphome"
|
||||
|
||||
# Test HA addon takes priority over env var
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{"ESPHOME_IS_HA_ADDON": "true", "ESPHOME_DATA_DIR": "/custom/path"},
|
||||
):
|
||||
assert target.data_dir == "/data"
|
||||
|
||||
# Test env var is used when not HA addon
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{"ESPHOME_IS_HA_ADDON": "false", "ESPHOME_DATA_DIR": "/custom/path"},
|
||||
):
|
||||
assert target.data_dir == "/custom/path"
|
||||
|
||||
# Test default when neither is set
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
# Ensure these env vars are not set
|
||||
os.environ.pop("ESPHOME_IS_HA_ADDON", None)
|
||||
os.environ.pop("ESPHOME_DATA_DIR", None)
|
||||
assert target.data_dir == expected_default
|
||||
|
||||
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
|
||||
def test_data_dir_priority_windows(self, target):
|
||||
"""Test data_dir priority on Windows: HA addon > env var > default."""
|
||||
target.config_path = "D:\\config\\test.yaml"
|
||||
expected_default = "D:\\config\\.esphome"
|
||||
|
||||
# Test HA addon takes priority over env var
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{"ESPHOME_IS_HA_ADDON": "true", "ESPHOME_DATA_DIR": "/custom/path"},
|
||||
):
|
||||
assert target.data_dir == "/data"
|
||||
|
||||
# Test env var is used when not HA addon
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{"ESPHOME_IS_HA_ADDON": "false", "ESPHOME_DATA_DIR": "/custom/path"},
|
||||
):
|
||||
assert target.data_dir == "/custom/path"
|
||||
|
||||
# Test default when neither is set
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
# Ensure these env vars are not set
|
||||
os.environ.pop("ESPHOME_IS_HA_ADDON", None)
|
||||
os.environ.pop("ESPHOME_DATA_DIR", None)
|
||||
assert target.data_dir == expected_default
|
||||
|
@@ -1,5 +1,8 @@
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
import socket
|
||||
import stat
|
||||
from unittest.mock import patch
|
||||
|
||||
from aioesphomeapi.host_resolver import AddrInfo, IPv4Sockaddr, IPv6Sockaddr
|
||||
@@ -554,6 +557,239 @@ def test_addr_preference_ipv6_link_local_with_scope() -> None:
|
||||
assert helpers.addr_preference_(addr_info) == 1 # Has scope, so it's usable
|
||||
|
||||
|
||||
def test_mkdir_p(tmp_path: Path) -> None:
|
||||
"""Test mkdir_p creates directories recursively."""
|
||||
# Test creating nested directories
|
||||
nested_path = tmp_path / "level1" / "level2" / "level3"
|
||||
helpers.mkdir_p(nested_path)
|
||||
assert nested_path.exists()
|
||||
assert nested_path.is_dir()
|
||||
|
||||
# Test that mkdir_p is idempotent (doesn't fail if directory exists)
|
||||
helpers.mkdir_p(nested_path)
|
||||
assert nested_path.exists()
|
||||
|
||||
# Test with empty path (should do nothing)
|
||||
helpers.mkdir_p("")
|
||||
|
||||
# Test with existing directory
|
||||
existing_dir = tmp_path / "existing"
|
||||
existing_dir.mkdir()
|
||||
helpers.mkdir_p(existing_dir)
|
||||
assert existing_dir.exists()
|
||||
|
||||
|
||||
def test_mkdir_p_file_exists_error(tmp_path: Path) -> None:
|
||||
"""Test mkdir_p raises error when path is a file."""
|
||||
# Create a file
|
||||
file_path = tmp_path / "test_file.txt"
|
||||
file_path.write_text("test content")
|
||||
|
||||
# Try to create directory with same name as existing file
|
||||
with pytest.raises(EsphomeError, match=r"Error creating directories"):
|
||||
helpers.mkdir_p(file_path)
|
||||
|
||||
|
||||
def test_mkdir_p_with_existing_file_raises_error(tmp_path: Path) -> None:
|
||||
"""Test mkdir_p raises error when trying to create dir over existing file."""
|
||||
# Create a file where we want to create a directory
|
||||
file_path = tmp_path / "existing_file"
|
||||
file_path.write_text("content")
|
||||
|
||||
# Try to create a directory with a path that goes through the file
|
||||
dir_path = file_path / "subdir"
|
||||
|
||||
with pytest.raises(EsphomeError, match=r"Error creating directories"):
|
||||
helpers.mkdir_p(dir_path)
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
|
||||
def test_read_file_unix(tmp_path: Path) -> None:
|
||||
"""Test read_file reads file content correctly on Unix."""
|
||||
# Test reading regular file
|
||||
test_file = tmp_path / "test.txt"
|
||||
expected_content = "Test content\nLine 2\n"
|
||||
test_file.write_text(expected_content)
|
||||
|
||||
content = helpers.read_file(test_file)
|
||||
assert content == expected_content
|
||||
|
||||
# Test reading file with UTF-8 characters
|
||||
utf8_file = tmp_path / "utf8.txt"
|
||||
utf8_content = "Hello 世界 🌍"
|
||||
utf8_file.write_text(utf8_content, encoding="utf-8")
|
||||
|
||||
content = helpers.read_file(utf8_file)
|
||||
assert content == utf8_content
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
|
||||
def test_read_file_windows(tmp_path: Path) -> None:
|
||||
"""Test read_file reads file content correctly on Windows."""
|
||||
# Test reading regular file
|
||||
test_file = tmp_path / "test.txt"
|
||||
expected_content = "Test content\nLine 2\n"
|
||||
test_file.write_text(expected_content)
|
||||
|
||||
content = helpers.read_file(test_file)
|
||||
# On Windows, text mode reading converts \n to \r\n
|
||||
assert content == expected_content.replace("\n", "\r\n")
|
||||
|
||||
# Test reading file with UTF-8 characters
|
||||
utf8_file = tmp_path / "utf8.txt"
|
||||
utf8_content = "Hello 世界 🌍"
|
||||
utf8_file.write_text(utf8_content, encoding="utf-8")
|
||||
|
||||
content = helpers.read_file(utf8_file)
|
||||
assert content == utf8_content
|
||||
|
||||
|
||||
def test_read_file_not_found() -> None:
|
||||
"""Test read_file raises error for non-existent file."""
|
||||
with pytest.raises(EsphomeError, match=r"Error reading file"):
|
||||
helpers.read_file("/nonexistent/file.txt")
|
||||
|
||||
|
||||
def test_read_file_unicode_decode_error(tmp_path: Path) -> None:
|
||||
"""Test read_file raises error for invalid UTF-8."""
|
||||
test_file = tmp_path / "invalid.txt"
|
||||
# Write invalid UTF-8 bytes
|
||||
test_file.write_bytes(b"\xff\xfe")
|
||||
|
||||
with pytest.raises(EsphomeError, match=r"Error reading file"):
|
||||
helpers.read_file(test_file)
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
|
||||
def test_write_file_unix(tmp_path: Path) -> None:
|
||||
"""Test write_file writes content correctly on Unix."""
|
||||
# Test writing string content
|
||||
test_file = tmp_path / "test.txt"
|
||||
content = "Test content\nLine 2"
|
||||
helpers.write_file(test_file, content)
|
||||
|
||||
assert test_file.read_text() == content
|
||||
# Check file permissions
|
||||
assert oct(test_file.stat().st_mode)[-3:] == "644"
|
||||
|
||||
# Test overwriting existing file
|
||||
new_content = "New content"
|
||||
helpers.write_file(test_file, new_content)
|
||||
assert test_file.read_text() == new_content
|
||||
|
||||
# Test writing to nested directories (should create them)
|
||||
nested_file = tmp_path / "dir1" / "dir2" / "file.txt"
|
||||
helpers.write_file(nested_file, content)
|
||||
assert nested_file.read_text() == content
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
|
||||
def test_write_file_windows(tmp_path: Path) -> None:
|
||||
"""Test write_file writes content correctly on Windows."""
|
||||
# Test writing string content
|
||||
test_file = tmp_path / "test.txt"
|
||||
content = "Test content\nLine 2"
|
||||
helpers.write_file(test_file, content)
|
||||
|
||||
assert test_file.read_text() == content
|
||||
# Windows doesn't have Unix-style 644 permissions
|
||||
|
||||
# Test overwriting existing file
|
||||
new_content = "New content"
|
||||
helpers.write_file(test_file, new_content)
|
||||
assert test_file.read_text() == new_content
|
||||
|
||||
# Test writing to nested directories (should create them)
|
||||
nested_file = tmp_path / "dir1" / "dir2" / "file.txt"
|
||||
helpers.write_file(nested_file, content)
|
||||
assert nested_file.read_text() == content
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific permission test")
|
||||
def test_write_file_to_non_writable_directory_unix(tmp_path: Path) -> None:
|
||||
"""Test write_file raises error when directory is not writable on Unix."""
|
||||
# Create a directory and make it read-only
|
||||
read_only_dir = tmp_path / "readonly"
|
||||
read_only_dir.mkdir()
|
||||
test_file = read_only_dir / "test.txt"
|
||||
|
||||
# Make directory read-only (no write permission)
|
||||
read_only_dir.chmod(0o555)
|
||||
|
||||
try:
|
||||
with pytest.raises(EsphomeError, match=r"Could not write file"):
|
||||
helpers.write_file(test_file, "content")
|
||||
finally:
|
||||
# Restore write permissions for cleanup
|
||||
read_only_dir.chmod(0o755)
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
|
||||
def test_write_file_to_non_writable_directory_windows(tmp_path: Path) -> None:
|
||||
"""Test write_file error handling on Windows."""
|
||||
# Windows handles permissions differently - test a different error case
|
||||
# Try to write to a file path that contains an existing file as a directory component
|
||||
existing_file = tmp_path / "file.txt"
|
||||
existing_file.write_text("content")
|
||||
|
||||
# Try to write to a path that treats the file as a directory
|
||||
invalid_path = existing_file / "subdir" / "test.txt"
|
||||
|
||||
with pytest.raises(EsphomeError, match=r"Could not write file"):
|
||||
helpers.write_file(invalid_path, "content")
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific permission test")
|
||||
def test_write_file_with_permission_bits_unix(tmp_path: Path) -> None:
|
||||
"""Test that write_file sets correct permissions on Unix."""
|
||||
test_file = tmp_path / "test.txt"
|
||||
helpers.write_file(test_file, "content")
|
||||
|
||||
# Check that file has 644 permissions
|
||||
file_mode = test_file.stat().st_mode
|
||||
assert stat.S_IMODE(file_mode) == 0o644
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific permission test")
|
||||
def test_copy_file_if_changed_permission_recovery_unix(tmp_path: Path) -> None:
|
||||
"""Test copy_file_if_changed handles permission errors correctly on Unix."""
|
||||
# Test with read-only destination file
|
||||
src = tmp_path / "source.txt"
|
||||
dst = tmp_path / "dest.txt"
|
||||
src.write_text("new content")
|
||||
dst.write_text("old content")
|
||||
dst.chmod(0o444) # Make destination read-only
|
||||
|
||||
try:
|
||||
# Should handle permission error by deleting and retrying
|
||||
helpers.copy_file_if_changed(src, dst)
|
||||
assert dst.read_text() == "new content"
|
||||
finally:
|
||||
# Restore write permissions for cleanup
|
||||
if dst.exists():
|
||||
dst.chmod(0o644)
|
||||
|
||||
|
||||
def test_copy_file_if_changed_creates_directories(tmp_path: Path) -> None:
|
||||
"""Test copy_file_if_changed creates missing directories."""
|
||||
src = tmp_path / "source.txt"
|
||||
dst = tmp_path / "subdir" / "nested" / "dest.txt"
|
||||
src.write_text("content")
|
||||
|
||||
helpers.copy_file_if_changed(src, dst)
|
||||
assert dst.exists()
|
||||
assert dst.read_text() == "content"
|
||||
|
||||
|
||||
def test_copy_file_if_changed_nonexistent_source(tmp_path: Path) -> None:
|
||||
"""Test copy_file_if_changed with non-existent source."""
|
||||
src = tmp_path / "nonexistent.txt"
|
||||
dst = tmp_path / "dest.txt"
|
||||
|
||||
with pytest.raises(EsphomeError, match=r"Error copying file"):
|
||||
helpers.copy_file_if_changed(src, dst)
|
||||
|
||||
|
||||
def test_resolve_ip_address_sorting() -> None:
|
||||
"""Test that results are sorted by preference."""
|
||||
# Create multiple address infos with different preferences
|
||||
|
@@ -1,10 +1,16 @@
|
||||
"""Tests for platformio_api.py path functions."""
|
||||
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
import shutil
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from esphome import platformio_api
|
||||
from esphome.core import CORE
|
||||
from esphome.core import CORE, EsphomeError
|
||||
|
||||
|
||||
def test_idedata_firmware_elf_path(setup_core: Path) -> None:
|
||||
@@ -104,7 +110,9 @@ def test_flash_image_dataclass() -> None:
|
||||
assert image.offset == "0x10000"
|
||||
|
||||
|
||||
def test_load_idedata_returns_dict(setup_core: Path) -> None:
|
||||
def test_load_idedata_returns_dict(
|
||||
setup_core: Path, mock_run_platformio_cli_run
|
||||
) -> None:
|
||||
"""Test _load_idedata returns parsed idedata dict when successful."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
@@ -118,12 +126,511 @@ def test_load_idedata_returns_dict(setup_core: Path) -> None:
|
||||
idedata_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
idedata_path.write_text('{"prog_path": "/test/firmware.elf"}')
|
||||
|
||||
with patch("esphome.platformio_api.run_platformio_cli_run") as mock_run:
|
||||
mock_run.return_value = '{"prog_path": "/test/firmware.elf"}'
|
||||
mock_run_platformio_cli_run.return_value = '{"prog_path": "/test/firmware.elf"}'
|
||||
|
||||
config = {"name": "test"}
|
||||
result = platformio_api._load_idedata(config)
|
||||
config = {"name": "test"}
|
||||
result = platformio_api._load_idedata(config)
|
||||
|
||||
assert result is not None
|
||||
assert isinstance(result, dict)
|
||||
assert result["prog_path"] == "/test/firmware.elf"
|
||||
|
||||
|
||||
def test_load_idedata_uses_cache_when_valid(
|
||||
setup_core: Path, mock_run_platformio_cli_run: Mock
|
||||
) -> None:
|
||||
"""Test _load_idedata uses cached data when unchanged."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
|
||||
# Create platformio.ini
|
||||
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
|
||||
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
|
||||
platformio_ini.write_text("content")
|
||||
|
||||
# Create idedata cache file that's newer
|
||||
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
|
||||
idedata_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
idedata_path.write_text('{"prog_path": "/cached/firmware.elf"}')
|
||||
|
||||
# Make idedata newer than platformio.ini
|
||||
platformio_ini_mtime = platformio_ini.stat().st_mtime
|
||||
os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1))
|
||||
|
||||
config = {"name": "test"}
|
||||
result = platformio_api._load_idedata(config)
|
||||
|
||||
# Should not call _run_idedata since cache is valid
|
||||
mock_run_platformio_cli_run.assert_not_called()
|
||||
|
||||
assert result["prog_path"] == "/cached/firmware.elf"
|
||||
|
||||
|
||||
def test_load_idedata_regenerates_when_platformio_ini_newer(
|
||||
setup_core: Path, mock_run_platformio_cli_run: Mock
|
||||
) -> None:
|
||||
"""Test _load_idedata regenerates when platformio.ini is newer."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
|
||||
# Create idedata cache file first
|
||||
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
|
||||
idedata_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
idedata_path.write_text('{"prog_path": "/old/firmware.elf"}')
|
||||
|
||||
# Create platformio.ini that's newer
|
||||
idedata_mtime = idedata_path.stat().st_mtime
|
||||
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
|
||||
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
|
||||
platformio_ini.write_text("content")
|
||||
# Make platformio.ini newer than idedata
|
||||
os.utime(platformio_ini, (idedata_mtime + 1, idedata_mtime + 1))
|
||||
|
||||
# Mock platformio to return new data
|
||||
new_data = {"prog_path": "/new/firmware.elf"}
|
||||
mock_run_platformio_cli_run.return_value = json.dumps(new_data)
|
||||
|
||||
config = {"name": "test"}
|
||||
result = platformio_api._load_idedata(config)
|
||||
|
||||
# Should call _run_idedata since platformio.ini is newer
|
||||
mock_run_platformio_cli_run.assert_called_once()
|
||||
|
||||
assert result["prog_path"] == "/new/firmware.elf"
|
||||
|
||||
|
||||
def test_load_idedata_regenerates_on_corrupted_cache(
|
||||
setup_core: Path, mock_run_platformio_cli_run: Mock
|
||||
) -> None:
|
||||
"""Test _load_idedata regenerates when cache file is corrupted."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
|
||||
# Create platformio.ini
|
||||
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
|
||||
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
|
||||
platformio_ini.write_text("content")
|
||||
|
||||
# Create corrupted idedata cache file
|
||||
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
|
||||
idedata_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
idedata_path.write_text('{"prog_path": invalid json')
|
||||
|
||||
# Make idedata newer so it would be used if valid
|
||||
platformio_ini_mtime = platformio_ini.stat().st_mtime
|
||||
os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1))
|
||||
|
||||
# Mock platformio to return new data
|
||||
new_data = {"prog_path": "/new/firmware.elf"}
|
||||
mock_run_platformio_cli_run.return_value = json.dumps(new_data)
|
||||
|
||||
config = {"name": "test"}
|
||||
result = platformio_api._load_idedata(config)
|
||||
|
||||
# Should call _run_idedata since cache is corrupted
|
||||
mock_run_platformio_cli_run.assert_called_once()
|
||||
|
||||
assert result["prog_path"] == "/new/firmware.elf"
|
||||
|
||||
|
||||
def test_run_idedata_parses_json_from_output(
|
||||
setup_core: Path, mock_run_platformio_cli_run: Mock
|
||||
) -> None:
|
||||
"""Test _run_idedata extracts JSON from platformio output."""
|
||||
config = {"name": "test"}
|
||||
|
||||
expected_data = {
|
||||
"prog_path": "/path/to/firmware.elf",
|
||||
"cc_path": "/path/to/gcc",
|
||||
"extra": {"flash_images": []},
|
||||
}
|
||||
|
||||
# Simulate platformio output with JSON embedded
|
||||
mock_run_platformio_cli_run.return_value = (
|
||||
f"Some preamble\n{json.dumps(expected_data)}\nSome postamble"
|
||||
)
|
||||
|
||||
result = platformio_api._run_idedata(config)
|
||||
|
||||
assert result == expected_data
|
||||
|
||||
|
||||
def test_run_idedata_raises_on_no_json(
|
||||
setup_core: Path, mock_run_platformio_cli_run: Mock
|
||||
) -> None:
|
||||
"""Test _run_idedata raises EsphomeError when no JSON found."""
|
||||
config = {"name": "test"}
|
||||
|
||||
mock_run_platformio_cli_run.return_value = "No JSON in this output"
|
||||
|
||||
with pytest.raises(EsphomeError):
|
||||
platformio_api._run_idedata(config)
|
||||
|
||||
|
||||
def test_run_idedata_raises_on_invalid_json(
|
||||
setup_core: Path, mock_run_platformio_cli_run: Mock
|
||||
) -> None:
|
||||
"""Test _run_idedata raises on malformed JSON."""
|
||||
config = {"name": "test"}
|
||||
mock_run_platformio_cli_run.return_value = '{"invalid": json"}'
|
||||
|
||||
# The ValueError from json.loads is re-raised
|
||||
with pytest.raises(ValueError):
|
||||
platformio_api._run_idedata(config)
|
||||
|
||||
|
||||
def test_run_platformio_cli_sets_environment_variables(
|
||||
setup_core: Path, mock_run_external_command: Mock
|
||||
) -> None:
|
||||
"""Test run_platformio_cli sets correct environment variables."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
|
||||
with patch.dict(os.environ, {}, clear=False):
|
||||
mock_run_external_command.return_value = 0
|
||||
platformio_api.run_platformio_cli("test", "arg")
|
||||
|
||||
# Check environment variables were set
|
||||
assert os.environ["PLATFORMIO_FORCE_COLOR"] == "true"
|
||||
assert (
|
||||
setup_core / "build" / "test"
|
||||
in Path(os.environ["PLATFORMIO_BUILD_DIR"]).parents
|
||||
or Path(os.environ["PLATFORMIO_BUILD_DIR"]) == setup_core / "build" / "test"
|
||||
)
|
||||
assert "PLATFORMIO_LIBDEPS_DIR" in os.environ
|
||||
assert "PYTHONWARNINGS" in os.environ
|
||||
|
||||
# Check command was called correctly
|
||||
mock_run_external_command.assert_called_once()
|
||||
args = mock_run_external_command.call_args[0]
|
||||
assert "platformio" in args
|
||||
assert "test" in args
|
||||
assert "arg" in args
|
||||
|
||||
|
||||
def test_run_platformio_cli_run_builds_command(
|
||||
setup_core: Path, mock_run_platformio_cli: Mock
|
||||
) -> None:
|
||||
"""Test run_platformio_cli_run builds correct command."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
mock_run_platformio_cli.return_value = 0
|
||||
|
||||
config = {"name": "test"}
|
||||
platformio_api.run_platformio_cli_run(config, True, "extra", "args")
|
||||
|
||||
mock_run_platformio_cli.assert_called_once_with(
|
||||
"run", "-d", CORE.build_path, "-v", "extra", "args"
|
||||
)
|
||||
|
||||
|
||||
def test_run_compile(setup_core: Path, mock_run_platformio_cli_run: Mock) -> None:
|
||||
"""Test run_compile with process limit."""
|
||||
from esphome.const import CONF_COMPILE_PROCESS_LIMIT, CONF_ESPHOME
|
||||
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
config = {CONF_ESPHOME: {CONF_COMPILE_PROCESS_LIMIT: 4}}
|
||||
mock_run_platformio_cli_run.return_value = 0
|
||||
|
||||
platformio_api.run_compile(config, verbose=True)
|
||||
|
||||
mock_run_platformio_cli_run.assert_called_once_with(config, True, "-j4")
|
||||
|
||||
|
||||
def test_get_idedata_caches_result(
|
||||
setup_core: Path, mock_run_platformio_cli_run: Mock
|
||||
) -> None:
|
||||
"""Test get_idedata caches result in CORE.data."""
|
||||
from esphome.const import KEY_CORE
|
||||
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
CORE.data[KEY_CORE] = {}
|
||||
|
||||
# Create platformio.ini to avoid regeneration
|
||||
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
|
||||
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
|
||||
platformio_ini.write_text("content")
|
||||
|
||||
# Mock platformio to return data
|
||||
idedata = {"prog_path": "/test/firmware.elf"}
|
||||
mock_run_platformio_cli_run.return_value = json.dumps(idedata)
|
||||
|
||||
config = {"name": "test"}
|
||||
|
||||
# First call should load and cache
|
||||
result1 = platformio_api.get_idedata(config)
|
||||
mock_run_platformio_cli_run.assert_called_once()
|
||||
|
||||
# Second call should use cache from CORE.data
|
||||
result2 = platformio_api.get_idedata(config)
|
||||
mock_run_platformio_cli_run.assert_called_once() # Still only called once
|
||||
|
||||
assert result1 is result2
|
||||
assert isinstance(result1, platformio_api.IDEData)
|
||||
assert result1.firmware_elf_path == "/test/firmware.elf"
|
||||
|
||||
|
||||
def test_idedata_addr2line_path_windows(setup_core: Path) -> None:
|
||||
"""Test IDEData.addr2line_path on Windows."""
|
||||
raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "C:\\tools\\gcc.exe"}
|
||||
idedata = platformio_api.IDEData(raw_data)
|
||||
|
||||
result = idedata.addr2line_path
|
||||
assert result == "C:\\tools\\addr2line.exe"
|
||||
|
||||
|
||||
def test_idedata_addr2line_path_unix(setup_core: Path) -> None:
|
||||
"""Test IDEData.addr2line_path on Unix."""
|
||||
raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "/usr/bin/gcc"}
|
||||
idedata = platformio_api.IDEData(raw_data)
|
||||
|
||||
result = idedata.addr2line_path
|
||||
assert result == "/usr/bin/addr2line"
|
||||
|
||||
|
||||
def test_patch_structhash(setup_core: Path) -> None:
|
||||
"""Test patch_structhash monkey patches platformio functions."""
|
||||
# Create simple namespace objects to act as modules
|
||||
mock_cli = SimpleNamespace()
|
||||
mock_helpers = SimpleNamespace()
|
||||
mock_run = SimpleNamespace(cli=mock_cli, helpers=mock_helpers)
|
||||
|
||||
# Mock platformio modules
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"platformio.run.cli": mock_cli,
|
||||
"platformio.run.helpers": mock_helpers,
|
||||
"platformio.run": mock_run,
|
||||
"platformio.project.helpers": MagicMock(),
|
||||
"platformio.fs": MagicMock(),
|
||||
"platformio": MagicMock(),
|
||||
},
|
||||
):
|
||||
# Call patch_structhash
|
||||
platformio_api.patch_structhash()
|
||||
|
||||
# Verify both modules had clean_build_dir patched
|
||||
# Check that clean_build_dir was set on both modules
|
||||
assert hasattr(mock_cli, "clean_build_dir")
|
||||
assert hasattr(mock_helpers, "clean_build_dir")
|
||||
|
||||
# Verify they got the same function assigned
|
||||
assert mock_cli.clean_build_dir is mock_helpers.clean_build_dir
|
||||
|
||||
# Verify it's a real function (not a Mock)
|
||||
assert callable(mock_cli.clean_build_dir)
|
||||
assert mock_cli.clean_build_dir.__name__ == "patched_clean_build_dir"
|
||||
|
||||
|
||||
def test_patched_clean_build_dir_removes_outdated(setup_core: Path) -> None:
|
||||
"""Test patched_clean_build_dir removes build dir when platformio.ini is newer."""
|
||||
build_dir = setup_core / "build"
|
||||
build_dir.mkdir()
|
||||
platformio_ini = setup_core / "platformio.ini"
|
||||
platformio_ini.write_text("config")
|
||||
|
||||
# Make platformio.ini newer than build_dir
|
||||
build_mtime = build_dir.stat().st_mtime
|
||||
os.utime(platformio_ini, (build_mtime + 1, build_mtime + 1))
|
||||
|
||||
# Track if directory was removed
|
||||
removed_paths: list[str] = []
|
||||
|
||||
def track_rmtree(path: str) -> None:
|
||||
removed_paths.append(path)
|
||||
shutil.rmtree(path)
|
||||
|
||||
# Create mock modules that patch_structhash expects
|
||||
mock_cli = SimpleNamespace()
|
||||
mock_helpers = SimpleNamespace()
|
||||
mock_project_helpers = MagicMock()
|
||||
mock_project_helpers.get_project_dir.return_value = str(setup_core)
|
||||
mock_fs = SimpleNamespace(rmtree=track_rmtree)
|
||||
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"platformio": SimpleNamespace(fs=mock_fs),
|
||||
"platformio.fs": mock_fs,
|
||||
"platformio.project.helpers": mock_project_helpers,
|
||||
"platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers),
|
||||
"platformio.run.cli": mock_cli,
|
||||
"platformio.run.helpers": mock_helpers,
|
||||
},
|
||||
):
|
||||
# Call patch_structhash to install the patched function
|
||||
platformio_api.patch_structhash()
|
||||
|
||||
# Call the patched function
|
||||
mock_helpers.clean_build_dir(str(build_dir), [])
|
||||
|
||||
# Verify directory was removed and recreated
|
||||
assert len(removed_paths) == 1
|
||||
assert removed_paths[0] == str(build_dir)
|
||||
assert build_dir.exists() # makedirs recreated it
|
||||
|
||||
|
||||
def test_patched_clean_build_dir_keeps_updated(setup_core: Path) -> None:
|
||||
"""Test patched_clean_build_dir keeps build dir when it's up to date."""
|
||||
build_dir = setup_core / "build"
|
||||
build_dir.mkdir()
|
||||
test_file = build_dir / "test.txt"
|
||||
test_file.write_text("test content")
|
||||
|
||||
platformio_ini = setup_core / "platformio.ini"
|
||||
platformio_ini.write_text("config")
|
||||
|
||||
# Make build_dir newer than platformio.ini
|
||||
ini_mtime = platformio_ini.stat().st_mtime
|
||||
os.utime(build_dir, (ini_mtime + 1, ini_mtime + 1))
|
||||
|
||||
# Track if rmtree is called
|
||||
removed_paths: list[str] = []
|
||||
|
||||
def track_rmtree(path: str) -> None:
|
||||
removed_paths.append(path)
|
||||
|
||||
# Create mock modules
|
||||
mock_cli = SimpleNamespace()
|
||||
mock_helpers = SimpleNamespace()
|
||||
mock_project_helpers = MagicMock()
|
||||
mock_project_helpers.get_project_dir.return_value = str(setup_core)
|
||||
mock_fs = SimpleNamespace(rmtree=track_rmtree)
|
||||
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"platformio": SimpleNamespace(fs=mock_fs),
|
||||
"platformio.fs": mock_fs,
|
||||
"platformio.project.helpers": mock_project_helpers,
|
||||
"platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers),
|
||||
"platformio.run.cli": mock_cli,
|
||||
"platformio.run.helpers": mock_helpers,
|
||||
},
|
||||
):
|
||||
# Call patch_structhash to install the patched function
|
||||
platformio_api.patch_structhash()
|
||||
|
||||
# Call the patched function
|
||||
mock_helpers.clean_build_dir(str(build_dir), [])
|
||||
|
||||
# Verify rmtree was NOT called
|
||||
assert len(removed_paths) == 0
|
||||
|
||||
# Verify directory and file still exist
|
||||
assert build_dir.exists()
|
||||
assert test_file.exists()
|
||||
assert test_file.read_text() == "test content"
|
||||
|
||||
|
||||
def test_patched_clean_build_dir_creates_missing(setup_core: Path) -> None:
|
||||
"""Test patched_clean_build_dir creates build dir when it doesn't exist."""
|
||||
build_dir = setup_core / "build"
|
||||
platformio_ini = setup_core / "platformio.ini"
|
||||
platformio_ini.write_text("config")
|
||||
|
||||
# Ensure build_dir doesn't exist
|
||||
assert not build_dir.exists()
|
||||
|
||||
# Track if rmtree is called
|
||||
removed_paths: list[str] = []
|
||||
|
||||
def track_rmtree(path: str) -> None:
|
||||
removed_paths.append(path)
|
||||
|
||||
# Create mock modules
|
||||
mock_cli = SimpleNamespace()
|
||||
mock_helpers = SimpleNamespace()
|
||||
mock_project_helpers = MagicMock()
|
||||
mock_project_helpers.get_project_dir.return_value = str(setup_core)
|
||||
mock_fs = SimpleNamespace(rmtree=track_rmtree)
|
||||
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"platformio": SimpleNamespace(fs=mock_fs),
|
||||
"platformio.fs": mock_fs,
|
||||
"platformio.project.helpers": mock_project_helpers,
|
||||
"platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers),
|
||||
"platformio.run.cli": mock_cli,
|
||||
"platformio.run.helpers": mock_helpers,
|
||||
},
|
||||
):
|
||||
# Call patch_structhash to install the patched function
|
||||
platformio_api.patch_structhash()
|
||||
|
||||
# Call the patched function
|
||||
mock_helpers.clean_build_dir(str(build_dir), [])
|
||||
|
||||
# Verify rmtree was NOT called
|
||||
assert len(removed_paths) == 0
|
||||
|
||||
# Verify directory was created
|
||||
assert build_dir.exists()
|
||||
|
||||
|
||||
def test_process_stacktrace_esp8266_exception(setup_core: Path, caplog) -> None:
|
||||
"""Test process_stacktrace handles ESP8266 exceptions."""
|
||||
config = {"name": "test"}
|
||||
|
||||
# Test exception type parsing
|
||||
line = "Exception (28):"
|
||||
backtrace_state = False
|
||||
|
||||
result = platformio_api.process_stacktrace(config, line, backtrace_state)
|
||||
|
||||
assert "Access to invalid address: LOAD (wild pointer?)" in caplog.text
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_process_stacktrace_esp8266_backtrace(
|
||||
setup_core: Path, mock_decode_pc: Mock
|
||||
) -> None:
|
||||
"""Test process_stacktrace handles ESP8266 multi-line backtrace."""
|
||||
config = {"name": "test"}
|
||||
|
||||
# Start of backtrace
|
||||
line1 = ">>>stack>>>"
|
||||
state = platformio_api.process_stacktrace(config, line1, False)
|
||||
assert state is True
|
||||
|
||||
# Backtrace content with addresses
|
||||
line2 = "40201234 40205678"
|
||||
state = platformio_api.process_stacktrace(config, line2, state)
|
||||
assert state is True
|
||||
assert mock_decode_pc.call_count == 2
|
||||
|
||||
# End of backtrace
|
||||
line3 = "<<<stack<<<"
|
||||
state = platformio_api.process_stacktrace(config, line3, state)
|
||||
assert state is False
|
||||
|
||||
|
||||
def test_process_stacktrace_esp32_backtrace(
|
||||
setup_core: Path, mock_decode_pc: Mock
|
||||
) -> None:
|
||||
"""Test process_stacktrace handles ESP32 single-line backtrace."""
|
||||
config = {"name": "test"}
|
||||
|
||||
line = "Backtrace: 0x40081234:0x3ffb1234 0x40085678:0x3ffb5678"
|
||||
state = platformio_api.process_stacktrace(config, line, False)
|
||||
|
||||
# Should decode both addresses
|
||||
assert mock_decode_pc.call_count == 2
|
||||
mock_decode_pc.assert_any_call(config, "40081234")
|
||||
mock_decode_pc.assert_any_call(config, "40085678")
|
||||
assert state is False
|
||||
|
||||
|
||||
def test_process_stacktrace_bad_alloc(
|
||||
setup_core: Path, mock_decode_pc: Mock, caplog
|
||||
) -> None:
|
||||
"""Test process_stacktrace handles bad alloc messages."""
|
||||
config = {"name": "test"}
|
||||
|
||||
line = "last failed alloc call: 40201234(512)"
|
||||
state = platformio_api.process_stacktrace(config, line, False)
|
||||
|
||||
assert "Memory allocation of 512 bytes failed at 40201234" in caplog.text
|
||||
mock_decode_pc.assert_called_once_with(config, "40201234")
|
||||
assert state is False
|
||||
|
@@ -1,12 +1,15 @@
|
||||
"""Tests for storage_json.py path functions."""
|
||||
|
||||
from datetime import datetime
|
||||
import json
|
||||
from pathlib import Path
|
||||
import sys
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from esphome import storage_json
|
||||
from esphome.const import CONF_DISABLED, CONF_MDNS
|
||||
from esphome.core import CORE
|
||||
|
||||
|
||||
@@ -115,7 +118,9 @@ def test_storage_json_firmware_bin_path_property(setup_core: Path) -> None:
|
||||
assert storage.firmware_bin_path == "/path/to/firmware.bin"
|
||||
|
||||
|
||||
def test_storage_json_save_creates_directory(setup_core: Path, tmp_path: Path) -> None:
|
||||
def test_storage_json_save_creates_directory(
|
||||
setup_core: Path, tmp_path: Path, mock_write_file_if_changed: Mock
|
||||
) -> None:
|
||||
"""Test StorageJSON.save creates storage directory if it doesn't exist."""
|
||||
storage_dir = tmp_path / "new_data" / "storage"
|
||||
storage_file = storage_dir / "test.json"
|
||||
@@ -139,11 +144,10 @@ def test_storage_json_save_creates_directory(setup_core: Path, tmp_path: Path) -
|
||||
no_mdns=False,
|
||||
)
|
||||
|
||||
with patch("esphome.storage_json.write_file_if_changed") as mock_write:
|
||||
storage.save(str(storage_file))
|
||||
mock_write.assert_called_once()
|
||||
call_args = mock_write.call_args[0]
|
||||
assert call_args[0] == str(storage_file)
|
||||
storage.save(str(storage_file))
|
||||
mock_write_file_if_changed.assert_called_once()
|
||||
call_args = mock_write_file_if_changed.call_args[0]
|
||||
assert call_args[0] == str(storage_file)
|
||||
|
||||
|
||||
def test_storage_json_from_wizard(setup_core: Path) -> None:
|
||||
@@ -180,3 +184,477 @@ def test_storage_paths_with_ha_addon(mock_is_ha_addon: bool, tmp_path: Path) ->
|
||||
result = storage_json.esphome_storage_path()
|
||||
expected = str(Path("/data") / "esphome.json")
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_storage_json_as_dict() -> None:
|
||||
"""Test StorageJSON.as_dict returns correct dictionary."""
|
||||
storage = storage_json.StorageJSON(
|
||||
storage_version=1,
|
||||
name="test_device",
|
||||
friendly_name="Test Device",
|
||||
comment="Test comment",
|
||||
esphome_version="2024.1.0",
|
||||
src_version=1,
|
||||
address="192.168.1.100",
|
||||
web_port=80,
|
||||
target_platform="ESP32",
|
||||
build_path="/path/to/build",
|
||||
firmware_bin_path="/path/to/firmware.bin",
|
||||
loaded_integrations={"wifi", "api", "ota"},
|
||||
loaded_platforms={"sensor", "binary_sensor"},
|
||||
no_mdns=True,
|
||||
framework="arduino",
|
||||
core_platform="esp32",
|
||||
)
|
||||
|
||||
result = storage.as_dict()
|
||||
|
||||
assert result["storage_version"] == 1
|
||||
assert result["name"] == "test_device"
|
||||
assert result["friendly_name"] == "Test Device"
|
||||
assert result["comment"] == "Test comment"
|
||||
assert result["esphome_version"] == "2024.1.0"
|
||||
assert result["src_version"] == 1
|
||||
assert result["address"] == "192.168.1.100"
|
||||
assert result["web_port"] == 80
|
||||
assert result["esp_platform"] == "ESP32"
|
||||
assert result["build_path"] == "/path/to/build"
|
||||
assert result["firmware_bin_path"] == "/path/to/firmware.bin"
|
||||
assert "api" in result["loaded_integrations"]
|
||||
assert "wifi" in result["loaded_integrations"]
|
||||
assert "ota" in result["loaded_integrations"]
|
||||
assert result["loaded_integrations"] == sorted(
|
||||
["wifi", "api", "ota"]
|
||||
) # Should be sorted
|
||||
assert "sensor" in result["loaded_platforms"]
|
||||
assert result["loaded_platforms"] == sorted(
|
||||
["sensor", "binary_sensor"]
|
||||
) # Should be sorted
|
||||
assert result["no_mdns"] is True
|
||||
assert result["framework"] == "arduino"
|
||||
assert result["core_platform"] == "esp32"
|
||||
|
||||
|
||||
def test_storage_json_to_json() -> None:
|
||||
"""Test StorageJSON.to_json returns valid JSON string."""
|
||||
storage = storage_json.StorageJSON(
|
||||
storage_version=1,
|
||||
name="test",
|
||||
friendly_name="Test",
|
||||
comment=None,
|
||||
esphome_version="2024.1.0",
|
||||
src_version=None,
|
||||
address="test.local",
|
||||
web_port=None,
|
||||
target_platform="ESP8266",
|
||||
build_path=None,
|
||||
firmware_bin_path=None,
|
||||
loaded_integrations=set(),
|
||||
loaded_platforms=set(),
|
||||
no_mdns=False,
|
||||
)
|
||||
|
||||
json_str = storage.to_json()
|
||||
|
||||
# Should be valid JSON
|
||||
parsed = json.loads(json_str)
|
||||
assert parsed["name"] == "test"
|
||||
assert parsed["storage_version"] == 1
|
||||
|
||||
# Should end with newline
|
||||
assert json_str.endswith("\n")
|
||||
|
||||
|
||||
def test_storage_json_save(tmp_path: Path) -> None:
|
||||
"""Test StorageJSON.save writes file correctly."""
|
||||
storage = storage_json.StorageJSON(
|
||||
storage_version=1,
|
||||
name="test",
|
||||
friendly_name="Test",
|
||||
comment=None,
|
||||
esphome_version="2024.1.0",
|
||||
src_version=None,
|
||||
address="test.local",
|
||||
web_port=None,
|
||||
target_platform="ESP32",
|
||||
build_path=None,
|
||||
firmware_bin_path=None,
|
||||
loaded_integrations=set(),
|
||||
loaded_platforms=set(),
|
||||
no_mdns=False,
|
||||
)
|
||||
|
||||
save_path = tmp_path / "test.json"
|
||||
|
||||
with patch("esphome.storage_json.write_file_if_changed") as mock_write:
|
||||
storage.save(str(save_path))
|
||||
mock_write.assert_called_once_with(str(save_path), storage.to_json())
|
||||
|
||||
|
||||
def test_storage_json_from_esphome_core(setup_core: Path) -> None:
|
||||
"""Test StorageJSON.from_esphome_core creates correct storage object."""
|
||||
# Mock CORE object
|
||||
mock_core = MagicMock()
|
||||
mock_core.name = "my_device"
|
||||
mock_core.friendly_name = "My Device"
|
||||
mock_core.comment = "A test device"
|
||||
mock_core.address = "192.168.1.50"
|
||||
mock_core.web_port = 8080
|
||||
mock_core.target_platform = "esp32"
|
||||
mock_core.is_esp32 = True
|
||||
mock_core.build_path = "/build/my_device"
|
||||
mock_core.firmware_bin = "/build/my_device/firmware.bin"
|
||||
mock_core.loaded_integrations = {"wifi", "api"}
|
||||
mock_core.loaded_platforms = {"sensor"}
|
||||
mock_core.config = {CONF_MDNS: {CONF_DISABLED: True}}
|
||||
mock_core.target_framework = "esp-idf"
|
||||
|
||||
with patch("esphome.components.esp32.get_esp32_variant") as mock_variant:
|
||||
mock_variant.return_value = "ESP32-C3"
|
||||
|
||||
result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None)
|
||||
|
||||
assert result.name == "my_device"
|
||||
assert result.friendly_name == "My Device"
|
||||
assert result.comment == "A test device"
|
||||
assert result.address == "192.168.1.50"
|
||||
assert result.web_port == 8080
|
||||
assert result.target_platform == "ESP32-C3"
|
||||
assert result.build_path == "/build/my_device"
|
||||
assert result.firmware_bin_path == "/build/my_device/firmware.bin"
|
||||
assert result.loaded_integrations == {"wifi", "api"}
|
||||
assert result.loaded_platforms == {"sensor"}
|
||||
assert result.no_mdns is True
|
||||
assert result.framework == "esp-idf"
|
||||
assert result.core_platform == "esp32"
|
||||
|
||||
|
||||
def test_storage_json_from_esphome_core_mdns_enabled(setup_core: Path) -> None:
|
||||
"""Test from_esphome_core with mDNS enabled."""
|
||||
mock_core = MagicMock()
|
||||
mock_core.name = "test"
|
||||
mock_core.friendly_name = "Test"
|
||||
mock_core.comment = None
|
||||
mock_core.address = "test.local"
|
||||
mock_core.web_port = None
|
||||
mock_core.target_platform = "esp8266"
|
||||
mock_core.is_esp32 = False
|
||||
mock_core.build_path = "/build"
|
||||
mock_core.firmware_bin = "/build/firmware.bin"
|
||||
mock_core.loaded_integrations = set()
|
||||
mock_core.loaded_platforms = set()
|
||||
mock_core.config = {} # No MDNS config means enabled
|
||||
mock_core.target_framework = "arduino"
|
||||
|
||||
result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None)
|
||||
|
||||
assert result.no_mdns is False
|
||||
|
||||
|
||||
def test_storage_json_load_valid_file(tmp_path: Path) -> None:
|
||||
"""Test StorageJSON.load with valid JSON file."""
|
||||
storage_data = {
|
||||
"storage_version": 1,
|
||||
"name": "loaded_device",
|
||||
"friendly_name": "Loaded Device",
|
||||
"comment": "Loaded from file",
|
||||
"esphome_version": "2024.1.0",
|
||||
"src_version": 2,
|
||||
"address": "10.0.0.1",
|
||||
"web_port": 8080,
|
||||
"esp_platform": "ESP32",
|
||||
"build_path": "/loaded/build",
|
||||
"firmware_bin_path": "/loaded/firmware.bin",
|
||||
"loaded_integrations": ["wifi", "api"],
|
||||
"loaded_platforms": ["sensor"],
|
||||
"no_mdns": True,
|
||||
"framework": "arduino",
|
||||
"core_platform": "esp32",
|
||||
}
|
||||
|
||||
file_path = tmp_path / "storage.json"
|
||||
file_path.write_text(json.dumps(storage_data))
|
||||
|
||||
result = storage_json.StorageJSON.load(str(file_path))
|
||||
|
||||
assert result is not None
|
||||
assert result.name == "loaded_device"
|
||||
assert result.friendly_name == "Loaded Device"
|
||||
assert result.comment == "Loaded from file"
|
||||
assert result.esphome_version == "2024.1.0"
|
||||
assert result.src_version == 2
|
||||
assert result.address == "10.0.0.1"
|
||||
assert result.web_port == 8080
|
||||
assert result.target_platform == "ESP32"
|
||||
assert result.build_path == "/loaded/build"
|
||||
assert result.firmware_bin_path == "/loaded/firmware.bin"
|
||||
assert result.loaded_integrations == {"wifi", "api"}
|
||||
assert result.loaded_platforms == {"sensor"}
|
||||
assert result.no_mdns is True
|
||||
assert result.framework == "arduino"
|
||||
assert result.core_platform == "esp32"
|
||||
|
||||
|
||||
def test_storage_json_load_invalid_file(tmp_path: Path) -> None:
|
||||
"""Test StorageJSON.load with invalid JSON file."""
|
||||
file_path = tmp_path / "invalid.json"
|
||||
file_path.write_text("not valid json{")
|
||||
|
||||
result = storage_json.StorageJSON.load(str(file_path))
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_storage_json_load_nonexistent_file() -> None:
|
||||
"""Test StorageJSON.load with non-existent file."""
|
||||
result = storage_json.StorageJSON.load("/nonexistent/file.json")
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_storage_json_equality() -> None:
|
||||
"""Test StorageJSON equality comparison."""
|
||||
storage1 = storage_json.StorageJSON(
|
||||
storage_version=1,
|
||||
name="test",
|
||||
friendly_name="Test",
|
||||
comment=None,
|
||||
esphome_version="2024.1.0",
|
||||
src_version=1,
|
||||
address="test.local",
|
||||
web_port=80,
|
||||
target_platform="ESP32",
|
||||
build_path="/build",
|
||||
firmware_bin_path="/firmware.bin",
|
||||
loaded_integrations={"wifi"},
|
||||
loaded_platforms=set(),
|
||||
no_mdns=False,
|
||||
)
|
||||
|
||||
storage2 = storage_json.StorageJSON(
|
||||
storage_version=1,
|
||||
name="test",
|
||||
friendly_name="Test",
|
||||
comment=None,
|
||||
esphome_version="2024.1.0",
|
||||
src_version=1,
|
||||
address="test.local",
|
||||
web_port=80,
|
||||
target_platform="ESP32",
|
||||
build_path="/build",
|
||||
firmware_bin_path="/firmware.bin",
|
||||
loaded_integrations={"wifi"},
|
||||
loaded_platforms=set(),
|
||||
no_mdns=False,
|
||||
)
|
||||
|
||||
storage3 = storage_json.StorageJSON(
|
||||
storage_version=1,
|
||||
name="different", # Different name
|
||||
friendly_name="Test",
|
||||
comment=None,
|
||||
esphome_version="2024.1.0",
|
||||
src_version=1,
|
||||
address="test.local",
|
||||
web_port=80,
|
||||
target_platform="ESP32",
|
||||
build_path="/build",
|
||||
firmware_bin_path="/firmware.bin",
|
||||
loaded_integrations={"wifi"},
|
||||
loaded_platforms=set(),
|
||||
no_mdns=False,
|
||||
)
|
||||
|
||||
assert storage1 == storage2
|
||||
assert storage1 != storage3
|
||||
assert storage1 != "not a storage object"
|
||||
|
||||
|
||||
def test_esphome_storage_json_as_dict() -> None:
|
||||
"""Test EsphomeStorageJSON.as_dict returns correct dictionary."""
|
||||
storage = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="secret123",
|
||||
last_update_check="2024-01-15T10:30:00",
|
||||
remote_version="2024.1.1",
|
||||
)
|
||||
|
||||
result = storage.as_dict()
|
||||
|
||||
assert result["storage_version"] == 1
|
||||
assert result["cookie_secret"] == "secret123"
|
||||
assert result["last_update_check"] == "2024-01-15T10:30:00"
|
||||
assert result["remote_version"] == "2024.1.1"
|
||||
|
||||
|
||||
def test_esphome_storage_json_last_update_check_property() -> None:
|
||||
"""Test EsphomeStorageJSON.last_update_check property."""
|
||||
storage = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="secret",
|
||||
last_update_check="2024-01-15T10:30:00",
|
||||
remote_version=None,
|
||||
)
|
||||
|
||||
# Test getter
|
||||
result = storage.last_update_check
|
||||
assert isinstance(result, datetime)
|
||||
assert result.year == 2024
|
||||
assert result.month == 1
|
||||
assert result.day == 15
|
||||
assert result.hour == 10
|
||||
assert result.minute == 30
|
||||
|
||||
# Test setter
|
||||
new_date = datetime(2024, 2, 20, 15, 45, 30)
|
||||
storage.last_update_check = new_date
|
||||
assert storage.last_update_check_str == "2024-02-20T15:45:30"
|
||||
|
||||
|
||||
def test_esphome_storage_json_last_update_check_invalid() -> None:
|
||||
"""Test EsphomeStorageJSON.last_update_check with invalid date."""
|
||||
storage = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="secret",
|
||||
last_update_check="invalid date",
|
||||
remote_version=None,
|
||||
)
|
||||
|
||||
result = storage.last_update_check
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_esphome_storage_json_to_json() -> None:
|
||||
"""Test EsphomeStorageJSON.to_json returns valid JSON string."""
|
||||
storage = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="mysecret",
|
||||
last_update_check="2024-01-15T10:30:00",
|
||||
remote_version="2024.1.1",
|
||||
)
|
||||
|
||||
json_str = storage.to_json()
|
||||
|
||||
# Should be valid JSON
|
||||
parsed = json.loads(json_str)
|
||||
assert parsed["cookie_secret"] == "mysecret"
|
||||
assert parsed["storage_version"] == 1
|
||||
|
||||
# Should end with newline
|
||||
assert json_str.endswith("\n")
|
||||
|
||||
|
||||
def test_esphome_storage_json_save(tmp_path: Path) -> None:
|
||||
"""Test EsphomeStorageJSON.save writes file correctly."""
|
||||
storage = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="secret",
|
||||
last_update_check=None,
|
||||
remote_version=None,
|
||||
)
|
||||
|
||||
save_path = tmp_path / "esphome.json"
|
||||
|
||||
with patch("esphome.storage_json.write_file_if_changed") as mock_write:
|
||||
storage.save(str(save_path))
|
||||
mock_write.assert_called_once_with(str(save_path), storage.to_json())
|
||||
|
||||
|
||||
def test_esphome_storage_json_load_valid_file(tmp_path: Path) -> None:
|
||||
"""Test EsphomeStorageJSON.load with valid JSON file."""
|
||||
storage_data = {
|
||||
"storage_version": 1,
|
||||
"cookie_secret": "loaded_secret",
|
||||
"last_update_check": "2024-01-20T14:30:00",
|
||||
"remote_version": "2024.1.2",
|
||||
}
|
||||
|
||||
file_path = tmp_path / "esphome.json"
|
||||
file_path.write_text(json.dumps(storage_data))
|
||||
|
||||
result = storage_json.EsphomeStorageJSON.load(str(file_path))
|
||||
|
||||
assert result is not None
|
||||
assert result.storage_version == 1
|
||||
assert result.cookie_secret == "loaded_secret"
|
||||
assert result.last_update_check_str == "2024-01-20T14:30:00"
|
||||
assert result.remote_version == "2024.1.2"
|
||||
|
||||
|
||||
def test_esphome_storage_json_load_invalid_file(tmp_path: Path) -> None:
|
||||
"""Test EsphomeStorageJSON.load with invalid JSON file."""
|
||||
file_path = tmp_path / "invalid.json"
|
||||
file_path.write_text("not valid json{")
|
||||
|
||||
result = storage_json.EsphomeStorageJSON.load(str(file_path))
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_esphome_storage_json_load_nonexistent_file() -> None:
|
||||
"""Test EsphomeStorageJSON.load with non-existent file."""
|
||||
result = storage_json.EsphomeStorageJSON.load("/nonexistent/file.json")
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_esphome_storage_json_get_default() -> None:
|
||||
"""Test EsphomeStorageJSON.get_default creates default storage."""
|
||||
with patch("esphome.storage_json.os.urandom") as mock_urandom:
|
||||
# Mock urandom to return predictable bytes
|
||||
mock_urandom.return_value = b"test" * 16 # 64 bytes
|
||||
|
||||
result = storage_json.EsphomeStorageJSON.get_default()
|
||||
|
||||
assert result.storage_version == 1
|
||||
assert len(result.cookie_secret) == 128 # 64 bytes hex = 128 chars
|
||||
assert result.last_update_check is None
|
||||
assert result.remote_version is None
|
||||
|
||||
|
||||
def test_esphome_storage_json_equality() -> None:
|
||||
"""Test EsphomeStorageJSON equality comparison."""
|
||||
storage1 = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="secret",
|
||||
last_update_check="2024-01-15T10:30:00",
|
||||
remote_version="2024.1.1",
|
||||
)
|
||||
|
||||
storage2 = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="secret",
|
||||
last_update_check="2024-01-15T10:30:00",
|
||||
remote_version="2024.1.1",
|
||||
)
|
||||
|
||||
storage3 = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="different", # Different secret
|
||||
last_update_check="2024-01-15T10:30:00",
|
||||
remote_version="2024.1.1",
|
||||
)
|
||||
|
||||
assert storage1 == storage2
|
||||
assert storage1 != storage3
|
||||
assert storage1 != "not a storage object"
|
||||
|
||||
|
||||
def test_storage_json_load_legacy_esphomeyaml_version(tmp_path: Path) -> None:
|
||||
"""Test loading storage with legacy esphomeyaml_version field."""
|
||||
storage_data = {
|
||||
"storage_version": 1,
|
||||
"name": "legacy_device",
|
||||
"friendly_name": "Legacy Device",
|
||||
"esphomeyaml_version": "1.14.0", # Legacy field name
|
||||
"address": "legacy.local",
|
||||
"esp_platform": "ESP8266",
|
||||
}
|
||||
|
||||
file_path = tmp_path / "legacy.json"
|
||||
file_path.write_text(json.dumps(storage_data))
|
||||
|
||||
result = storage_json.StorageJSON.load(str(file_path))
|
||||
|
||||
assert result is not None
|
||||
assert result.esphome_version == "1.14.0" # Should map to esphome_version
|
||||
|
@@ -1,5 +1,7 @@
|
||||
"""Tests for esphome.util module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
@@ -308,3 +310,85 @@ def test_filter_yaml_files_case_sensitive() -> None:
|
||||
assert "/path/to/config.YAML" not in result
|
||||
assert "/path/to/config.YML" not in result
|
||||
assert "/path/to/config.Yaml" not in result
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("input_str", "expected"),
|
||||
[
|
||||
# Empty string
|
||||
("", "''"),
|
||||
# Simple strings that don't need quoting
|
||||
("hello", "hello"),
|
||||
("test123", "test123"),
|
||||
("file.txt", "file.txt"),
|
||||
("/path/to/file", "/path/to/file"),
|
||||
("user@host", "user@host"),
|
||||
("value:123", "value:123"),
|
||||
("item,list", "item,list"),
|
||||
("path-with-dash", "path-with-dash"),
|
||||
# Strings that need quoting
|
||||
("hello world", "'hello world'"),
|
||||
("test\ttab", "'test\ttab'"),
|
||||
("line\nbreak", "'line\nbreak'"),
|
||||
("semicolon;here", "'semicolon;here'"),
|
||||
("pipe|symbol", "'pipe|symbol'"),
|
||||
("redirect>file", "'redirect>file'"),
|
||||
("redirect<file", "'redirect<file'"),
|
||||
("background&", "'background&'"),
|
||||
("dollar$sign", "'dollar$sign'"),
|
||||
("backtick`cmd", "'backtick`cmd'"),
|
||||
('double"quote', "'double\"quote'"),
|
||||
("backslash\\path", "'backslash\\path'"),
|
||||
("question?mark", "'question?mark'"),
|
||||
("asterisk*wild", "'asterisk*wild'"),
|
||||
("bracket[test]", "'bracket[test]'"),
|
||||
("paren(test)", "'paren(test)'"),
|
||||
("curly{brace}", "'curly{brace}'"),
|
||||
# Single quotes in string (special escaping)
|
||||
("it's", "'it'\"'\"'s'"),
|
||||
("don't", "'don'\"'\"'t'"),
|
||||
("'quoted'", "''\"'\"'quoted'\"'\"''"),
|
||||
# Complex combinations
|
||||
("test 'with' quotes", "'test '\"'\"'with'\"'\"' quotes'"),
|
||||
("path/to/file's.txt", "'path/to/file'\"'\"'s.txt'"),
|
||||
],
|
||||
)
|
||||
def test_shlex_quote(input_str: str, expected: str) -> None:
|
||||
"""Test shlex_quote properly escapes shell arguments."""
|
||||
assert util.shlex_quote(input_str) == expected
|
||||
|
||||
|
||||
def test_shlex_quote_safe_characters() -> None:
|
||||
"""Test that safe characters are not quoted."""
|
||||
# These characters are considered safe and shouldn't be quoted
|
||||
safe_chars = (
|
||||
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789@%+=:,./-_"
|
||||
)
|
||||
for char in safe_chars:
|
||||
assert util.shlex_quote(char) == char
|
||||
assert util.shlex_quote(f"test{char}test") == f"test{char}test"
|
||||
|
||||
|
||||
def test_shlex_quote_unsafe_characters() -> None:
|
||||
"""Test that unsafe characters trigger quoting."""
|
||||
# These characters should trigger quoting
|
||||
unsafe_chars = ' \t\n;|>&<$`"\\?*[](){}!#~^'
|
||||
for char in unsafe_chars:
|
||||
result = util.shlex_quote(f"test{char}test")
|
||||
assert result.startswith("'")
|
||||
assert result.endswith("'")
|
||||
|
||||
|
||||
def test_shlex_quote_edge_cases() -> None:
|
||||
"""Test edge cases for shlex_quote."""
|
||||
# Multiple single quotes
|
||||
assert util.shlex_quote("'''") == "''\"'\"''\"'\"''\"'\"''"
|
||||
|
||||
# Mixed quotes
|
||||
assert util.shlex_quote('"\'"') == "'\"'\"'\"'\"'"
|
||||
|
||||
# Only whitespace
|
||||
assert util.shlex_quote(" ") == "' '"
|
||||
assert util.shlex_quote("\t") == "'\t'"
|
||||
assert util.shlex_quote("\n") == "'\n'"
|
||||
assert util.shlex_quote(" ") == "' '"
|
||||
|
Reference in New Issue
Block a user