1
0
mirror of https://github.com/esphome/esphome.git synced 2025-09-18 19:22:22 +01:00

Merge pull request #10752 from esphome/bump-2025.9.0b3

2025.9.0b3
This commit is contained in:
Jesse Hills
2025-09-17 00:15:06 +12:00
committed by GitHub
18 changed files with 2185 additions and 17 deletions

View File

@@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome
# could be handy for archiving the generated documentation or if some version # could be handy for archiving the generated documentation or if some version
# control system is used. # control system is used.
PROJECT_NUMBER = 2025.9.0b2 PROJECT_NUMBER = 2025.9.0b3
# Using the PROJECT_BRIEF tag one can provide an optional one line description # Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a # for a project that appears at the top of each page and should give viewer a

View File

@@ -113,7 +113,7 @@ void ADE7880::update() {
if (this->channel_a_ != nullptr) { if (this->channel_a_ != nullptr) {
auto *chan = this->channel_a_; auto *chan = this->channel_a_;
this->update_sensor_from_s24zp_register16_(chan->current, AIRMS, [](float val) { return val / 100000.0f; }); this->update_sensor_from_s24zp_register16_(chan->current, AIRMS, [](float val) { return val / 100000.0f; });
this->update_sensor_from_s24zp_register16_(chan->voltage, BVRMS, [](float val) { return val / 10000.0f; }); this->update_sensor_from_s24zp_register16_(chan->voltage, AVRMS, [](float val) { return val / 10000.0f; });
this->update_sensor_from_s24zp_register16_(chan->active_power, AWATT, [](float val) { return val / 100.0f; }); this->update_sensor_from_s24zp_register16_(chan->active_power, AWATT, [](float val) { return val / 100.0f; });
this->update_sensor_from_s24zp_register16_(chan->apparent_power, AVA, [](float val) { return val / 100.0f; }); this->update_sensor_from_s24zp_register16_(chan->apparent_power, AVA, [](float val) { return val / 100.0f; });
this->update_sensor_from_s16_register16_(chan->power_factor, APF, this->update_sensor_from_s16_register16_(chan->power_factor, APF,

View File

@@ -77,6 +77,13 @@ ETHERNET_TYPES = {
"DM9051": EthernetType.ETHERNET_TYPE_DM9051, "DM9051": EthernetType.ETHERNET_TYPE_DM9051,
} }
# PHY types that need compile-time defines for conditional compilation
_PHY_TYPE_TO_DEFINE = {
"KSZ8081": "USE_ETHERNET_KSZ8081",
"KSZ8081RNA": "USE_ETHERNET_KSZ8081",
# Add other PHY types here only if they need conditional compilation
}
SPI_ETHERNET_TYPES = ["W5500", "DM9051"] SPI_ETHERNET_TYPES = ["W5500", "DM9051"]
SPI_ETHERNET_DEFAULT_POLLING_INTERVAL = TimePeriodMilliseconds(milliseconds=10) SPI_ETHERNET_DEFAULT_POLLING_INTERVAL = TimePeriodMilliseconds(milliseconds=10)
@@ -345,6 +352,10 @@ async def to_code(config):
if CONF_MANUAL_IP in config: if CONF_MANUAL_IP in config:
cg.add(var.set_manual_ip(manual_ip(config[CONF_MANUAL_IP]))) cg.add(var.set_manual_ip(manual_ip(config[CONF_MANUAL_IP])))
# Add compile-time define for PHY types with specific code
if phy_define := _PHY_TYPE_TO_DEFINE.get(config[CONF_TYPE]):
cg.add_define(phy_define)
cg.add_define("USE_ETHERNET") cg.add_define("USE_ETHERNET")
# Disable WiFi when using Ethernet to save memory # Disable WiFi when using Ethernet to save memory

View File

@@ -229,10 +229,12 @@ void EthernetComponent::setup() {
ESPHL_ERROR_CHECK(err, "ETH driver install error"); ESPHL_ERROR_CHECK(err, "ETH driver install error");
#ifndef USE_ETHERNET_SPI #ifndef USE_ETHERNET_SPI
#ifdef USE_ETHERNET_KSZ8081
if (this->type_ == ETHERNET_TYPE_KSZ8081RNA && this->clk_mode_ == EMAC_CLK_OUT) { if (this->type_ == ETHERNET_TYPE_KSZ8081RNA && this->clk_mode_ == EMAC_CLK_OUT) {
// KSZ8081RNA default is incorrect. It expects a 25MHz clock instead of the 50MHz we provide. // KSZ8081RNA default is incorrect. It expects a 25MHz clock instead of the 50MHz we provide.
this->ksz8081_set_clock_reference_(mac); this->ksz8081_set_clock_reference_(mac);
} }
#endif // USE_ETHERNET_KSZ8081
for (const auto &phy_register : this->phy_registers_) { for (const auto &phy_register : this->phy_registers_) {
this->write_phy_register_(mac, phy_register); this->write_phy_register_(mac, phy_register);
@@ -721,6 +723,7 @@ bool EthernetComponent::powerdown() {
#ifndef USE_ETHERNET_SPI #ifndef USE_ETHERNET_SPI
#ifdef USE_ETHERNET_KSZ8081
constexpr uint8_t KSZ80XX_PC2R_REG_ADDR = 0x1F; constexpr uint8_t KSZ80XX_PC2R_REG_ADDR = 0x1F;
void EthernetComponent::ksz8081_set_clock_reference_(esp_eth_mac_t *mac) { void EthernetComponent::ksz8081_set_clock_reference_(esp_eth_mac_t *mac) {
@@ -749,6 +752,7 @@ void EthernetComponent::ksz8081_set_clock_reference_(esp_eth_mac_t *mac) {
ESP_LOGVV(TAG, "KSZ8081 PHY Control 2: %s", format_hex_pretty((u_int8_t *) &phy_control_2, 2).c_str()); ESP_LOGVV(TAG, "KSZ8081 PHY Control 2: %s", format_hex_pretty((u_int8_t *) &phy_control_2, 2).c_str());
} }
} }
#endif // USE_ETHERNET_KSZ8081
void EthernetComponent::write_phy_register_(esp_eth_mac_t *mac, PHYRegister register_data) { void EthernetComponent::write_phy_register_(esp_eth_mac_t *mac, PHYRegister register_data) {
esp_err_t err; esp_err_t err;

View File

@@ -104,8 +104,10 @@ class EthernetComponent : public Component {
void start_connect_(); void start_connect_();
void finish_connect_(); void finish_connect_();
void dump_connect_params_(); void dump_connect_params_();
#ifdef USE_ETHERNET_KSZ8081
/// @brief Set `RMII Reference Clock Select` bit for KSZ8081. /// @brief Set `RMII Reference Clock Select` bit for KSZ8081.
void ksz8081_set_clock_reference_(esp_eth_mac_t *mac); void ksz8081_set_clock_reference_(esp_eth_mac_t *mac);
#endif
/// @brief Set arbitratry PHY registers from config. /// @brief Set arbitratry PHY registers from config.
void write_phy_register_(esp_eth_mac_t *mac, PHYRegister register_data); void write_phy_register_(esp_eth_mac_t *mac, PHYRegister register_data);

View File

@@ -491,7 +491,7 @@ bool MQTTClientComponent::publish(const std::string &topic, const std::string &p
bool MQTTClientComponent::publish(const std::string &topic, const char *payload, size_t payload_length, uint8_t qos, bool MQTTClientComponent::publish(const std::string &topic, const char *payload, size_t payload_length, uint8_t qos,
bool retain) { bool retain) {
return publish({.topic = topic, .payload = payload, .qos = qos, .retain = retain}); return publish({.topic = topic, .payload = std::string(payload, payload_length), .qos = qos, .retain = retain});
} }
bool MQTTClientComponent::publish(const MQTTMessage &message) { bool MQTTClientComponent::publish(const MQTTMessage &message) {

View File

@@ -28,12 +28,12 @@ bool Select::has_option(const std::string &option) const { return this->index_of
bool Select::has_index(size_t index) const { return index < this->size(); } bool Select::has_index(size_t index) const { return index < this->size(); }
size_t Select::size() const { size_t Select::size() const {
auto options = traits.get_options(); const auto &options = traits.get_options();
return options.size(); return options.size();
} }
optional<size_t> Select::index_of(const std::string &option) const { optional<size_t> Select::index_of(const std::string &option) const {
auto options = traits.get_options(); const auto &options = traits.get_options();
auto it = std::find(options.begin(), options.end(), option); auto it = std::find(options.begin(), options.end(), option);
if (it == options.end()) { if (it == options.end()) {
return {}; return {};
@@ -51,7 +51,7 @@ optional<size_t> Select::active_index() const {
optional<std::string> Select::at(size_t index) const { optional<std::string> Select::at(size_t index) const {
if (this->has_index(index)) { if (this->has_index(index)) {
auto options = traits.get_options(); const auto &options = traits.get_options();
return options.at(index); return options.at(index);
} else { } else {
return {}; return {};

View File

@@ -45,7 +45,7 @@ void SelectCall::perform() {
auto *parent = this->parent_; auto *parent = this->parent_;
const auto *name = parent->get_name().c_str(); const auto *name = parent->get_name().c_str();
const auto &traits = parent->traits; const auto &traits = parent->traits;
auto options = traits.get_options(); const auto &options = traits.get_options();
if (this->operation_ == SELECT_OP_NONE) { if (this->operation_ == SELECT_OP_NONE) {
ESP_LOGW(TAG, "'%s' - SelectCall performed without selecting an operation", name); ESP_LOGW(TAG, "'%s' - SelectCall performed without selecting an operation", name);

View File

@@ -4,7 +4,7 @@ from enum import Enum
from esphome.enum import StrEnum from esphome.enum import StrEnum
__version__ = "2025.9.0b2" __version__ = "2025.9.0b3"
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_" ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
VALID_SUBSTITUTIONS_CHARACTERS = ( VALID_SUBSTITUTIONS_CHARACTERS = (

View File

@@ -175,6 +175,7 @@
#ifdef USE_ARDUINO #ifdef USE_ARDUINO
#define USE_ARDUINO_VERSION_CODE VERSION_CODE(3, 2, 1) #define USE_ARDUINO_VERSION_CODE VERSION_CODE(3, 2, 1)
#define USE_ETHERNET #define USE_ETHERNET
#define USE_ETHERNET_KSZ8081
#endif #endif
#ifdef USE_ESP_IDF #ifdef USE_ESP_IDF

View File

@@ -1038,12 +1038,9 @@ class ArchiveRequestHandler(BaseHandler):
shutil.move(config_file, os.path.join(archive_path, configuration)) shutil.move(config_file, os.path.join(archive_path, configuration))
storage_json = StorageJSON.load(storage_path) storage_json = StorageJSON.load(storage_path)
if storage_json is not None: if storage_json is not None and storage_json.build_path:
# Delete build folder (if exists) # Delete build folder (if exists)
name = storage_json.name shutil.rmtree(storage_json.build_path, ignore_errors=True)
build_folder = os.path.join(settings.config_dir, name)
if build_folder is not None:
shutil.rmtree(build_folder, os.path.join(archive_path, name))
class UnArchiveRequestHandler(BaseHandler): class UnArchiveRequestHandler(BaseHandler):

View File

@@ -589,7 +589,7 @@ async def test_archive_request_handler_post(
mock_ext_storage_path: MagicMock, mock_ext_storage_path: MagicMock,
tmp_path: Path, tmp_path: Path,
) -> None: ) -> None:
"""Test ArchiveRequestHandler.post method.""" """Test ArchiveRequestHandler.post method without storage_json."""
# Set up temp directories # Set up temp directories
config_dir = Path(get_fixture_path("conf")) config_dir = Path(get_fixture_path("conf"))
@@ -616,6 +616,97 @@ async def test_archive_request_handler_post(
).read_text() == "esphome:\n name: test_archive\n" ).read_text() == "esphome:\n name: test_archive\n"
@pytest.mark.asyncio
async def test_archive_handler_with_build_folder(
dashboard: DashboardTestHelper,
mock_archive_storage_path: MagicMock,
mock_ext_storage_path: MagicMock,
mock_dashboard_settings: MagicMock,
mock_storage_json: MagicMock,
tmp_path: Path,
) -> None:
"""Test ArchiveRequestHandler.post with storage_json and build folder."""
config_dir = tmp_path / "config"
config_dir.mkdir()
archive_dir = tmp_path / "archive"
archive_dir.mkdir()
build_dir = tmp_path / "build"
build_dir.mkdir()
configuration = "test_device.yaml"
test_config = config_dir / configuration
test_config.write_text("esphome:\n name: test_device\n")
build_folder = build_dir / "test_device"
build_folder.mkdir()
(build_folder / "firmware.bin").write_text("binary content")
(build_folder / ".pioenvs").mkdir()
mock_dashboard_settings.config_dir = str(config_dir)
mock_dashboard_settings.rel_path.return_value = str(test_config)
mock_archive_storage_path.return_value = str(archive_dir)
mock_storage = MagicMock()
mock_storage.name = "test_device"
mock_storage.build_path = str(build_folder)
mock_storage_json.load.return_value = mock_storage
response = await dashboard.fetch(
"/archive",
method="POST",
body=f"configuration={configuration}",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
assert response.code == 200
assert not test_config.exists()
assert (archive_dir / configuration).exists()
assert not build_folder.exists()
assert not (archive_dir / "test_device").exists()
@pytest.mark.asyncio
async def test_archive_handler_no_build_folder(
dashboard: DashboardTestHelper,
mock_archive_storage_path: MagicMock,
mock_ext_storage_path: MagicMock,
mock_dashboard_settings: MagicMock,
mock_storage_json: MagicMock,
tmp_path: Path,
) -> None:
"""Test ArchiveRequestHandler.post with storage_json but no build folder."""
config_dir = tmp_path / "config"
config_dir.mkdir()
archive_dir = tmp_path / "archive"
archive_dir.mkdir()
configuration = "test_device.yaml"
test_config = config_dir / configuration
test_config.write_text("esphome:\n name: test_device\n")
mock_dashboard_settings.config_dir = str(config_dir)
mock_dashboard_settings.rel_path.return_value = str(test_config)
mock_archive_storage_path.return_value = str(archive_dir)
mock_storage = MagicMock()
mock_storage.name = "test_device"
mock_storage.build_path = None
mock_storage_json.load.return_value = mock_storage
response = await dashboard.fetch(
"/archive",
method="POST",
body=f"configuration={configuration}",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
assert response.code == 200
assert not test_config.exists()
assert (archive_dir / configuration).exists()
assert not (archive_dir / "test_device").exists()
@pytest.mark.skipif(os.name == "nt", reason="Unix sockets are not supported on Windows") @pytest.mark.skipif(os.name == "nt", reason="Unix sockets are not supported on Windows")
@pytest.mark.usefixtures("mock_trash_storage_path", "mock_archive_storage_path") @pytest.mark.usefixtures("mock_trash_storage_path", "mock_archive_storage_path")
def test_start_web_server_with_unix_socket(tmp_path: Path) -> None: def test_start_web_server_with_unix_socket(tmp_path: Path) -> None:

View File

@@ -9,8 +9,10 @@ not be part of a unit test suite.
""" """
from collections.abc import Generator
from pathlib import Path from pathlib import Path
import sys import sys
from unittest.mock import Mock, patch
import pytest import pytest
@@ -36,3 +38,52 @@ def fixture_path() -> Path:
Location of all fixture files. Location of all fixture files.
""" """
return here / "fixtures" return here / "fixtures"
@pytest.fixture
def setup_core(tmp_path: Path) -> Path:
"""Set up CORE with test paths."""
CORE.config_path = str(tmp_path / "test.yaml")
return tmp_path
@pytest.fixture
def mock_write_file_if_changed() -> Generator[Mock, None, None]:
"""Mock write_file_if_changed for storage_json."""
with patch("esphome.storage_json.write_file_if_changed") as mock:
yield mock
@pytest.fixture
def mock_copy_file_if_changed() -> Generator[Mock, None, None]:
"""Mock copy_file_if_changed for core.config."""
with patch("esphome.core.config.copy_file_if_changed") as mock:
yield mock
@pytest.fixture
def mock_run_platformio_cli() -> Generator[Mock, None, None]:
"""Mock run_platformio_cli for platformio_api."""
with patch("esphome.platformio_api.run_platformio_cli") as mock:
yield mock
@pytest.fixture
def mock_run_platformio_cli_run() -> Generator[Mock, None, None]:
"""Mock run_platformio_cli_run for platformio_api."""
with patch("esphome.platformio_api.run_platformio_cli_run") as mock:
yield mock
@pytest.fixture
def mock_decode_pc() -> Generator[Mock, None, None]:
"""Mock _decode_pc for platformio_api."""
with patch("esphome.platformio_api._decode_pc") as mock:
yield mock
@pytest.fixture
def mock_run_external_command() -> Generator[Mock, None, None]:
"""Mock run_external_command for platformio_api."""
with patch("esphome.platformio_api.run_external_command") as mock:
yield mock

View File

@@ -1,15 +1,34 @@
"""Unit tests for core config functionality including areas and devices.""" """Unit tests for core config functionality including areas and devices."""
from collections.abc import Callable from collections.abc import Callable
import os
from pathlib import Path from pathlib import Path
import types
from typing import Any from typing import Any
from unittest.mock import MagicMock, Mock, patch
import pytest import pytest
from esphome import config_validation as cv, core from esphome import config_validation as cv, core
from esphome.const import CONF_AREA, CONF_AREAS, CONF_DEVICES from esphome.const import (
from esphome.core import config CONF_AREA,
from esphome.core.config import Area, validate_area_config CONF_AREAS,
CONF_BUILD_PATH,
CONF_DEVICES,
CONF_ESPHOME,
CONF_NAME,
CONF_NAME_ADD_MAC_SUFFIX,
KEY_CORE,
)
from esphome.core import CORE, config
from esphome.core.config import (
Area,
preload_core_config,
valid_include,
valid_project_name,
validate_area_config,
validate_hostname,
)
from .common import load_config_from_fixture from .common import load_config_from_fixture
@@ -245,3 +264,316 @@ def test_add_platform_defines_priority() -> None:
f"_add_platform_defines priority ({config._add_platform_defines.priority}) must be lower than " f"_add_platform_defines priority ({config._add_platform_defines.priority}) must be lower than "
f"globals priority ({globals_to_code.priority}) to fix issue #10431 (sensor count bug with lambdas)" f"globals priority ({globals_to_code.priority}) to fix issue #10431 (sensor count bug with lambdas)"
) )
def test_valid_include_with_angle_brackets() -> None:
"""Test valid_include accepts angle bracket includes."""
assert valid_include("<ArduinoJson.h>") == "<ArduinoJson.h>"
def test_valid_include_with_valid_file(tmp_path: Path) -> None:
"""Test valid_include accepts valid include files."""
CORE.config_path = str(tmp_path / "test.yaml")
include_file = tmp_path / "include.h"
include_file.touch()
assert valid_include(str(include_file)) == str(include_file)
def test_valid_include_with_valid_directory(tmp_path: Path) -> None:
"""Test valid_include accepts valid directories."""
CORE.config_path = str(tmp_path / "test.yaml")
include_dir = tmp_path / "includes"
include_dir.mkdir()
assert valid_include(str(include_dir)) == str(include_dir)
def test_valid_include_invalid_extension(tmp_path: Path) -> None:
"""Test valid_include rejects files with invalid extensions."""
CORE.config_path = str(tmp_path / "test.yaml")
invalid_file = tmp_path / "file.txt"
invalid_file.touch()
with pytest.raises(cv.Invalid, match="Include has invalid file extension"):
valid_include(str(invalid_file))
def test_valid_project_name_valid() -> None:
"""Test valid_project_name accepts valid project names."""
assert valid_project_name("esphome.my_project") == "esphome.my_project"
def test_valid_project_name_no_namespace() -> None:
"""Test valid_project_name rejects names without namespace."""
with pytest.raises(cv.Invalid, match="project name needs to have a namespace"):
valid_project_name("my_project")
def test_valid_project_name_multiple_dots() -> None:
"""Test valid_project_name rejects names with multiple dots."""
with pytest.raises(cv.Invalid, match="project name needs to have a namespace"):
valid_project_name("esphome.my.project")
def test_validate_hostname_valid() -> None:
"""Test validate_hostname accepts valid hostnames."""
config = {CONF_NAME: "my-device", CONF_NAME_ADD_MAC_SUFFIX: False}
assert validate_hostname(config) == config
def test_validate_hostname_too_long() -> None:
"""Test validate_hostname rejects hostnames that are too long."""
config = {
CONF_NAME: "a" * 32, # 32 chars, max is 31
CONF_NAME_ADD_MAC_SUFFIX: False,
}
with pytest.raises(cv.Invalid, match="Hostnames can only be 31 characters long"):
validate_hostname(config)
def test_validate_hostname_too_long_with_mac_suffix() -> None:
"""Test validate_hostname accounts for MAC suffix length."""
config = {
CONF_NAME: "a" * 25, # 25 chars, max is 24 with MAC suffix
CONF_NAME_ADD_MAC_SUFFIX: True,
}
with pytest.raises(cv.Invalid, match="Hostnames can only be 24 characters long"):
validate_hostname(config)
def test_validate_hostname_with_underscore(caplog) -> None:
"""Test validate_hostname warns about underscores."""
config = {CONF_NAME: "my_device", CONF_NAME_ADD_MAC_SUFFIX: False}
assert validate_hostname(config) == config
assert (
"Using the '_' (underscore) character in the hostname is discouraged"
in caplog.text
)
def test_preload_core_config_basic(setup_core: Path) -> None:
"""Test preload_core_config sets basic CORE attributes."""
config = {
CONF_ESPHOME: {
CONF_NAME: "test_device",
},
"esp32": {},
}
result = {}
platform = preload_core_config(config, result)
assert CORE.name == "test_device"
assert platform == "esp32"
assert KEY_CORE in CORE.data
assert CONF_BUILD_PATH in config[CONF_ESPHOME]
# Verify default build path is "build/<device_name>"
build_path = config[CONF_ESPHOME][CONF_BUILD_PATH]
assert build_path.endswith(os.path.join("build", "test_device"))
def test_preload_core_config_with_build_path(setup_core: Path) -> None:
"""Test preload_core_config uses provided build path."""
config = {
CONF_ESPHOME: {
CONF_NAME: "test_device",
CONF_BUILD_PATH: "/custom/build/path",
},
"esp8266": {},
}
result = {}
platform = preload_core_config(config, result)
assert config[CONF_ESPHOME][CONF_BUILD_PATH] == "/custom/build/path"
assert platform == "esp8266"
def test_preload_core_config_env_build_path(setup_core: Path) -> None:
"""Test preload_core_config uses ESPHOME_BUILD_PATH env var."""
config = {
CONF_ESPHOME: {
CONF_NAME: "test_device",
},
"rp2040": {},
}
result = {}
with patch.dict(os.environ, {"ESPHOME_BUILD_PATH": "/env/build"}):
platform = preload_core_config(config, result)
assert CONF_BUILD_PATH in config[CONF_ESPHOME]
assert "test_device" in config[CONF_ESPHOME][CONF_BUILD_PATH]
# Verify it uses the env var path with device name appended
build_path = config[CONF_ESPHOME][CONF_BUILD_PATH]
expected_path = os.path.join("/env/build", "test_device")
assert build_path == expected_path or build_path == expected_path.replace(
"/", os.sep
)
assert platform == "rp2040"
def test_preload_core_config_no_platform(setup_core: Path) -> None:
"""Test preload_core_config raises when no platform is specified."""
config = {
CONF_ESPHOME: {
CONF_NAME: "test_device",
},
}
result = {}
# Mock _is_target_platform to avoid expensive component loading
with patch("esphome.core.config._is_target_platform") as mock_is_platform:
# Return True for known platforms
mock_is_platform.side_effect = lambda name: name in [
"esp32",
"esp8266",
"rp2040",
]
with pytest.raises(cv.Invalid, match="Platform missing"):
preload_core_config(config, result)
def test_preload_core_config_multiple_platforms(setup_core: Path) -> None:
"""Test preload_core_config raises when multiple platforms are specified."""
config = {
CONF_ESPHOME: {
CONF_NAME: "test_device",
},
"esp32": {},
"esp8266": {},
}
result = {}
# Mock _is_target_platform to avoid expensive component loading
with patch("esphome.core.config._is_target_platform") as mock_is_platform:
# Return True for known platforms
mock_is_platform.side_effect = lambda name: name in [
"esp32",
"esp8266",
"rp2040",
]
with pytest.raises(cv.Invalid, match="Found multiple target platform blocks"):
preload_core_config(config, result)
def test_include_file_header(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None:
"""Test include_file adds include statement for header files."""
src_file = tmp_path / "source.h"
src_file.write_text("// Header content")
CORE.build_path = str(tmp_path / "build")
with patch("esphome.core.config.cg") as mock_cg:
# Mock RawStatement to capture the text
mock_raw_statement = MagicMock()
mock_raw_statement.text = ""
def raw_statement_side_effect(text):
mock_raw_statement.text = text
return mock_raw_statement
mock_cg.RawStatement.side_effect = raw_statement_side_effect
config.include_file(str(src_file), "test.h")
mock_copy_file_if_changed.assert_called_once()
mock_cg.add_global.assert_called_once()
# Check that include statement was added
assert '#include "test.h"' in mock_raw_statement.text
def test_include_file_cpp(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None:
"""Test include_file does not add include for cpp files."""
src_file = tmp_path / "source.cpp"
src_file.write_text("// CPP content")
CORE.build_path = str(tmp_path / "build")
with patch("esphome.core.config.cg") as mock_cg:
config.include_file(str(src_file), "test.cpp")
mock_copy_file_if_changed.assert_called_once()
# Should not add include statement for .cpp files
mock_cg.add_global.assert_not_called()
def test_get_usable_cpu_count() -> None:
"""Test get_usable_cpu_count returns CPU count."""
count = config.get_usable_cpu_count()
assert isinstance(count, int)
assert count > 0
def test_get_usable_cpu_count_with_process_cpu_count() -> None:
"""Test get_usable_cpu_count uses process_cpu_count when available."""
# Test with process_cpu_count (Python 3.13+)
# Create a mock os module with process_cpu_count
mock_os = types.SimpleNamespace(process_cpu_count=lambda: 8, cpu_count=lambda: 4)
with patch("esphome.core.config.os", mock_os):
# When process_cpu_count exists, it should be used
count = config.get_usable_cpu_count()
assert count == 8
# Test fallback to cpu_count when process_cpu_count not available
mock_os_no_process = types.SimpleNamespace(cpu_count=lambda: 4)
with patch("esphome.core.config.os", mock_os_no_process):
count = config.get_usable_cpu_count()
assert count == 4
def test_list_target_platforms(tmp_path: Path) -> None:
"""Test _list_target_platforms returns available platforms."""
# Create mock components directory structure
components_dir = tmp_path / "components"
components_dir.mkdir()
# Create platform and non-platform directories with __init__.py
platforms = ["esp32", "esp8266", "rp2040", "libretiny", "host"]
non_platforms = ["sensor"]
for component in platforms + non_platforms:
component_dir = components_dir / component
component_dir.mkdir()
(component_dir / "__init__.py").touch()
# Create a file (not a directory)
(components_dir / "README.md").touch()
# Create a directory without __init__.py
(components_dir / "no_init").mkdir()
# Mock Path(__file__).parents[1] to return our tmp_path
with patch("esphome.core.config.Path") as mock_path:
mock_file_path = MagicMock()
mock_file_path.parents = [MagicMock(), tmp_path]
mock_path.return_value = mock_file_path
platforms = config._list_target_platforms()
assert isinstance(platforms, list)
# Should include platform components
assert "esp32" in platforms
assert "esp8266" in platforms
assert "rp2040" in platforms
assert "libretiny" in platforms
assert "host" in platforms
# Should not include non-platform components
assert "sensor" not in platforms
assert "README.md" not in platforms
assert "no_init" not in platforms
def test_is_target_platform() -> None:
"""Test _is_target_platform identifies valid platforms."""
assert config._is_target_platform("esp32") is True
assert config._is_target_platform("esp8266") is True
assert config._is_target_platform("rp2040") is True
assert config._is_target_platform("invalid_platform") is False
assert config._is_target_platform("api") is False # Component but not platform

View File

@@ -0,0 +1,187 @@
"""Tests for config_validation.py path-related functions."""
from pathlib import Path
import pytest
import voluptuous as vol
from esphome import config_validation as cv
def test_directory_valid_path(setup_core: Path) -> None:
"""Test directory validator with valid directory."""
test_dir = setup_core / "test_directory"
test_dir.mkdir()
result = cv.directory("test_directory")
assert result == "test_directory"
def test_directory_absolute_path(setup_core: Path) -> None:
"""Test directory validator with absolute path."""
test_dir = setup_core / "test_directory"
test_dir.mkdir()
result = cv.directory(str(test_dir))
assert result == str(test_dir)
def test_directory_nonexistent_path(setup_core: Path) -> None:
"""Test directory validator raises error for non-existent directory."""
with pytest.raises(
vol.Invalid, match="Could not find directory.*nonexistent_directory"
):
cv.directory("nonexistent_directory")
def test_directory_file_instead_of_directory(setup_core: Path) -> None:
"""Test directory validator raises error when path is a file."""
test_file = setup_core / "test_file.txt"
test_file.write_text("content")
with pytest.raises(vol.Invalid, match="is not a directory"):
cv.directory("test_file.txt")
def test_directory_with_parent_directory(setup_core: Path) -> None:
"""Test directory validator with nested directory structure."""
nested_dir = setup_core / "parent" / "child" / "grandchild"
nested_dir.mkdir(parents=True)
result = cv.directory("parent/child/grandchild")
assert result == "parent/child/grandchild"
def test_file_valid_path(setup_core: Path) -> None:
"""Test file_ validator with valid file."""
test_file = setup_core / "test_file.yaml"
test_file.write_text("test content")
result = cv.file_("test_file.yaml")
assert result == "test_file.yaml"
def test_file_absolute_path(setup_core: Path) -> None:
"""Test file_ validator with absolute path."""
test_file = setup_core / "test_file.yaml"
test_file.write_text("test content")
result = cv.file_(str(test_file))
assert result == str(test_file)
def test_file_nonexistent_path(setup_core: Path) -> None:
"""Test file_ validator raises error for non-existent file."""
with pytest.raises(vol.Invalid, match="Could not find file.*nonexistent_file.yaml"):
cv.file_("nonexistent_file.yaml")
def test_file_directory_instead_of_file(setup_core: Path) -> None:
"""Test file_ validator raises error when path is a directory."""
test_dir = setup_core / "test_directory"
test_dir.mkdir()
with pytest.raises(vol.Invalid, match="is not a file"):
cv.file_("test_directory")
def test_file_with_parent_directory(setup_core: Path) -> None:
"""Test file_ validator with file in nested directory."""
nested_dir = setup_core / "configs" / "sensors"
nested_dir.mkdir(parents=True)
test_file = nested_dir / "temperature.yaml"
test_file.write_text("sensor config")
result = cv.file_("configs/sensors/temperature.yaml")
assert result == "configs/sensors/temperature.yaml"
def test_directory_handles_trailing_slash(setup_core: Path) -> None:
"""Test directory validator handles trailing slashes correctly."""
test_dir = setup_core / "test_dir"
test_dir.mkdir()
result = cv.directory("test_dir/")
assert result == "test_dir/"
result = cv.directory("test_dir")
assert result == "test_dir"
def test_file_handles_various_extensions(setup_core: Path) -> None:
"""Test file_ validator works with different file extensions."""
yaml_file = setup_core / "config.yaml"
yaml_file.write_text("yaml content")
assert cv.file_("config.yaml") == "config.yaml"
yml_file = setup_core / "config.yml"
yml_file.write_text("yml content")
assert cv.file_("config.yml") == "config.yml"
txt_file = setup_core / "readme.txt"
txt_file.write_text("text content")
assert cv.file_("readme.txt") == "readme.txt"
no_ext_file = setup_core / "LICENSE"
no_ext_file.write_text("license content")
assert cv.file_("LICENSE") == "LICENSE"
def test_directory_with_symlink(setup_core: Path) -> None:
"""Test directory validator follows symlinks."""
actual_dir = setup_core / "actual_directory"
actual_dir.mkdir()
symlink_dir = setup_core / "symlink_directory"
symlink_dir.symlink_to(actual_dir)
result = cv.directory("symlink_directory")
assert result == "symlink_directory"
def test_file_with_symlink(setup_core: Path) -> None:
"""Test file_ validator follows symlinks."""
actual_file = setup_core / "actual_file.txt"
actual_file.write_text("content")
symlink_file = setup_core / "symlink_file.txt"
symlink_file.symlink_to(actual_file)
result = cv.file_("symlink_file.txt")
assert result == "symlink_file.txt"
def test_directory_error_shows_full_path(setup_core: Path) -> None:
"""Test directory validator error message includes full path."""
with pytest.raises(vol.Invalid, match=".*missing_dir.*full path:.*"):
cv.directory("missing_dir")
def test_file_error_shows_full_path(setup_core: Path) -> None:
"""Test file_ validator error message includes full path."""
with pytest.raises(vol.Invalid, match=".*missing_file.yaml.*full path:.*"):
cv.file_("missing_file.yaml")
def test_directory_with_spaces_in_name(setup_core: Path) -> None:
"""Test directory validator handles spaces in directory names."""
dir_with_spaces = setup_core / "my test directory"
dir_with_spaces.mkdir()
result = cv.directory("my test directory")
assert result == "my test directory"
def test_file_with_spaces_in_name(setup_core: Path) -> None:
"""Test file_ validator handles spaces in file names."""
file_with_spaces = setup_core / "my test file.yaml"
file_with_spaces.write_text("content")
result = cv.file_("my test file.yaml")
assert result == "my test file.yaml"

View File

@@ -0,0 +1,196 @@
"""Tests for external_files.py functions."""
from pathlib import Path
import time
from unittest.mock import MagicMock, patch
import pytest
import requests
from esphome import external_files
from esphome.config_validation import Invalid
from esphome.core import CORE, TimePeriod
def test_compute_local_file_dir(setup_core: Path) -> None:
"""Test compute_local_file_dir creates and returns correct path."""
domain = "font"
result = external_files.compute_local_file_dir(domain)
assert isinstance(result, Path)
assert result == Path(CORE.data_dir) / domain
assert result.exists()
assert result.is_dir()
def test_compute_local_file_dir_nested(setup_core: Path) -> None:
"""Test compute_local_file_dir works with nested domains."""
domain = "images/icons"
result = external_files.compute_local_file_dir(domain)
assert result == Path(CORE.data_dir) / "images" / "icons"
assert result.exists()
assert result.is_dir()
def test_is_file_recent_with_recent_file(setup_core: Path) -> None:
"""Test is_file_recent returns True for recently created file."""
test_file = setup_core / "recent.txt"
test_file.write_text("content")
refresh = TimePeriod(seconds=3600)
result = external_files.is_file_recent(str(test_file), refresh)
assert result is True
def test_is_file_recent_with_old_file(setup_core: Path) -> None:
"""Test is_file_recent returns False for old file."""
test_file = setup_core / "old.txt"
test_file.write_text("content")
old_time = time.time() - 7200
with patch("os.path.getctime", return_value=old_time):
refresh = TimePeriod(seconds=3600)
result = external_files.is_file_recent(str(test_file), refresh)
assert result is False
def test_is_file_recent_nonexistent_file(setup_core: Path) -> None:
"""Test is_file_recent returns False for non-existent file."""
test_file = setup_core / "nonexistent.txt"
refresh = TimePeriod(seconds=3600)
result = external_files.is_file_recent(str(test_file), refresh)
assert result is False
def test_is_file_recent_with_zero_refresh(setup_core: Path) -> None:
"""Test is_file_recent with zero refresh period returns False."""
test_file = setup_core / "test.txt"
test_file.write_text("content")
# Mock getctime to return a time 10 seconds ago
with patch("os.path.getctime", return_value=time.time() - 10):
refresh = TimePeriod(seconds=0)
result = external_files.is_file_recent(str(test_file), refresh)
assert result is False
@patch("esphome.external_files.requests.head")
def test_has_remote_file_changed_not_modified(
mock_head: MagicMock, setup_core: Path
) -> None:
"""Test has_remote_file_changed returns False when file not modified."""
test_file = setup_core / "cached.txt"
test_file.write_text("cached content")
mock_response = MagicMock()
mock_response.status_code = 304
mock_head.return_value = mock_response
url = "https://example.com/file.txt"
result = external_files.has_remote_file_changed(url, str(test_file))
assert result is False
mock_head.assert_called_once()
call_args = mock_head.call_args
headers = call_args[1]["headers"]
assert external_files.IF_MODIFIED_SINCE in headers
assert external_files.CACHE_CONTROL in headers
@patch("esphome.external_files.requests.head")
def test_has_remote_file_changed_modified(
mock_head: MagicMock, setup_core: Path
) -> None:
"""Test has_remote_file_changed returns True when file modified."""
test_file = setup_core / "cached.txt"
test_file.write_text("cached content")
mock_response = MagicMock()
mock_response.status_code = 200
mock_head.return_value = mock_response
url = "https://example.com/file.txt"
result = external_files.has_remote_file_changed(url, str(test_file))
assert result is True
def test_has_remote_file_changed_no_local_file(setup_core: Path) -> None:
"""Test has_remote_file_changed returns True when local file doesn't exist."""
test_file = setup_core / "nonexistent.txt"
url = "https://example.com/file.txt"
result = external_files.has_remote_file_changed(url, str(test_file))
assert result is True
@patch("esphome.external_files.requests.head")
def test_has_remote_file_changed_network_error(
mock_head: MagicMock, setup_core: Path
) -> None:
"""Test has_remote_file_changed handles network errors gracefully."""
test_file = setup_core / "cached.txt"
test_file.write_text("cached content")
mock_head.side_effect = requests.exceptions.RequestException("Network error")
url = "https://example.com/file.txt"
with pytest.raises(Invalid, match="Could not check if.*Network error"):
external_files.has_remote_file_changed(url, str(test_file))
@patch("esphome.external_files.requests.head")
def test_has_remote_file_changed_timeout(
mock_head: MagicMock, setup_core: Path
) -> None:
"""Test has_remote_file_changed respects timeout."""
test_file = setup_core / "cached.txt"
test_file.write_text("cached content")
mock_response = MagicMock()
mock_response.status_code = 304
mock_head.return_value = mock_response
url = "https://example.com/file.txt"
external_files.has_remote_file_changed(url, str(test_file))
call_args = mock_head.call_args
assert call_args[1]["timeout"] == external_files.NETWORK_TIMEOUT
def test_compute_local_file_dir_creates_parent_dirs(setup_core: Path) -> None:
"""Test compute_local_file_dir creates parent directories."""
domain = "level1/level2/level3/level4"
result = external_files.compute_local_file_dir(domain)
assert result.exists()
assert result.is_dir()
assert result.parent.name == "level3"
assert result.parent.parent.name == "level2"
assert result.parent.parent.parent.name == "level1"
def test_is_file_recent_handles_float_seconds(setup_core: Path) -> None:
"""Test is_file_recent works with float seconds in TimePeriod."""
test_file = setup_core / "test.txt"
test_file.write_text("content")
refresh = TimePeriod(seconds=3600.5)
result = external_files.is_file_recent(str(test_file), refresh)
assert result is True

View File

@@ -0,0 +1,636 @@
"""Tests for platformio_api.py path functions."""
import json
import os
from pathlib import Path
import shutil
from types import SimpleNamespace
from unittest.mock import MagicMock, Mock, patch
import pytest
from esphome import platformio_api
from esphome.core import CORE, EsphomeError
def test_idedata_firmware_elf_path(setup_core: Path) -> None:
"""Test IDEData.firmware_elf_path returns correct path."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
raw_data = {"prog_path": "/path/to/firmware.elf"}
idedata = platformio_api.IDEData(raw_data)
assert idedata.firmware_elf_path == "/path/to/firmware.elf"
def test_idedata_firmware_bin_path(setup_core: Path) -> None:
"""Test IDEData.firmware_bin_path returns Path with .bin extension."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
prog_path = str(Path("/path/to/firmware.elf"))
raw_data = {"prog_path": prog_path}
idedata = platformio_api.IDEData(raw_data)
result = idedata.firmware_bin_path
assert isinstance(result, str)
expected = str(Path("/path/to/firmware.bin"))
assert result == expected
assert result.endswith(".bin")
def test_idedata_firmware_bin_path_preserves_directory(setup_core: Path) -> None:
"""Test firmware_bin_path preserves the directory structure."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
prog_path = str(Path("/complex/path/to/build/firmware.elf"))
raw_data = {"prog_path": prog_path}
idedata = platformio_api.IDEData(raw_data)
result = idedata.firmware_bin_path
expected = str(Path("/complex/path/to/build/firmware.bin"))
assert result == expected
def test_idedata_extra_flash_images(setup_core: Path) -> None:
"""Test IDEData.extra_flash_images returns list of FlashImage objects."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
raw_data = {
"prog_path": "/path/to/firmware.elf",
"extra": {
"flash_images": [
{"path": "/path/to/bootloader.bin", "offset": "0x1000"},
{"path": "/path/to/partition.bin", "offset": "0x8000"},
]
},
}
idedata = platformio_api.IDEData(raw_data)
images = idedata.extra_flash_images
assert len(images) == 2
assert all(isinstance(img, platformio_api.FlashImage) for img in images)
assert images[0].path == "/path/to/bootloader.bin"
assert images[0].offset == "0x1000"
assert images[1].path == "/path/to/partition.bin"
assert images[1].offset == "0x8000"
def test_idedata_extra_flash_images_empty(setup_core: Path) -> None:
"""Test extra_flash_images returns empty list when no extra images."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
raw_data = {"prog_path": "/path/to/firmware.elf", "extra": {"flash_images": []}}
idedata = platformio_api.IDEData(raw_data)
images = idedata.extra_flash_images
assert images == []
def test_idedata_cc_path(setup_core: Path) -> None:
"""Test IDEData.cc_path returns compiler path."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
raw_data = {
"prog_path": "/path/to/firmware.elf",
"cc_path": "/Users/test/.platformio/packages/toolchain-xtensa32/bin/xtensa-esp32-elf-gcc",
}
idedata = platformio_api.IDEData(raw_data)
assert (
idedata.cc_path
== "/Users/test/.platformio/packages/toolchain-xtensa32/bin/xtensa-esp32-elf-gcc"
)
def test_flash_image_dataclass() -> None:
"""Test FlashImage dataclass stores path and offset correctly."""
image = platformio_api.FlashImage(path="/path/to/image.bin", offset="0x10000")
assert image.path == "/path/to/image.bin"
assert image.offset == "0x10000"
def test_load_idedata_returns_dict(
setup_core: Path, mock_run_platformio_cli_run
) -> None:
"""Test _load_idedata returns parsed idedata dict when successful."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
# Create required files
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
platformio_ini.touch()
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
idedata_path.parent.mkdir(parents=True, exist_ok=True)
idedata_path.write_text('{"prog_path": "/test/firmware.elf"}')
mock_run_platformio_cli_run.return_value = '{"prog_path": "/test/firmware.elf"}'
config = {"name": "test"}
result = platformio_api._load_idedata(config)
assert result is not None
assert isinstance(result, dict)
assert result["prog_path"] == "/test/firmware.elf"
def test_load_idedata_uses_cache_when_valid(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _load_idedata uses cached data when unchanged."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
# Create platformio.ini
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
platformio_ini.write_text("content")
# Create idedata cache file that's newer
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
idedata_path.parent.mkdir(parents=True, exist_ok=True)
idedata_path.write_text('{"prog_path": "/cached/firmware.elf"}')
# Make idedata newer than platformio.ini
platformio_ini_mtime = platformio_ini.stat().st_mtime
os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1))
config = {"name": "test"}
result = platformio_api._load_idedata(config)
# Should not call _run_idedata since cache is valid
mock_run_platformio_cli_run.assert_not_called()
assert result["prog_path"] == "/cached/firmware.elf"
def test_load_idedata_regenerates_when_platformio_ini_newer(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _load_idedata regenerates when platformio.ini is newer."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
# Create idedata cache file first
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
idedata_path.parent.mkdir(parents=True, exist_ok=True)
idedata_path.write_text('{"prog_path": "/old/firmware.elf"}')
# Create platformio.ini that's newer
idedata_mtime = idedata_path.stat().st_mtime
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
platformio_ini.write_text("content")
# Make platformio.ini newer than idedata
os.utime(platformio_ini, (idedata_mtime + 1, idedata_mtime + 1))
# Mock platformio to return new data
new_data = {"prog_path": "/new/firmware.elf"}
mock_run_platformio_cli_run.return_value = json.dumps(new_data)
config = {"name": "test"}
result = platformio_api._load_idedata(config)
# Should call _run_idedata since platformio.ini is newer
mock_run_platformio_cli_run.assert_called_once()
assert result["prog_path"] == "/new/firmware.elf"
def test_load_idedata_regenerates_on_corrupted_cache(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _load_idedata regenerates when cache file is corrupted."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
# Create platformio.ini
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
platformio_ini.write_text("content")
# Create corrupted idedata cache file
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
idedata_path.parent.mkdir(parents=True, exist_ok=True)
idedata_path.write_text('{"prog_path": invalid json')
# Make idedata newer so it would be used if valid
platformio_ini_mtime = platformio_ini.stat().st_mtime
os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1))
# Mock platformio to return new data
new_data = {"prog_path": "/new/firmware.elf"}
mock_run_platformio_cli_run.return_value = json.dumps(new_data)
config = {"name": "test"}
result = platformio_api._load_idedata(config)
# Should call _run_idedata since cache is corrupted
mock_run_platformio_cli_run.assert_called_once()
assert result["prog_path"] == "/new/firmware.elf"
def test_run_idedata_parses_json_from_output(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _run_idedata extracts JSON from platformio output."""
config = {"name": "test"}
expected_data = {
"prog_path": "/path/to/firmware.elf",
"cc_path": "/path/to/gcc",
"extra": {"flash_images": []},
}
# Simulate platformio output with JSON embedded
mock_run_platformio_cli_run.return_value = (
f"Some preamble\n{json.dumps(expected_data)}\nSome postamble"
)
result = platformio_api._run_idedata(config)
assert result == expected_data
def test_run_idedata_raises_on_no_json(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _run_idedata raises EsphomeError when no JSON found."""
config = {"name": "test"}
mock_run_platformio_cli_run.return_value = "No JSON in this output"
with pytest.raises(EsphomeError):
platformio_api._run_idedata(config)
def test_run_idedata_raises_on_invalid_json(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _run_idedata raises on malformed JSON."""
config = {"name": "test"}
mock_run_platformio_cli_run.return_value = '{"invalid": json"}'
# The ValueError from json.loads is re-raised
with pytest.raises(ValueError):
platformio_api._run_idedata(config)
def test_run_platformio_cli_sets_environment_variables(
setup_core: Path, mock_run_external_command: Mock
) -> None:
"""Test run_platformio_cli sets correct environment variables."""
CORE.build_path = str(setup_core / "build" / "test")
with patch.dict(os.environ, {}, clear=False):
mock_run_external_command.return_value = 0
platformio_api.run_platformio_cli("test", "arg")
# Check environment variables were set
assert os.environ["PLATFORMIO_FORCE_COLOR"] == "true"
assert (
setup_core / "build" / "test"
in Path(os.environ["PLATFORMIO_BUILD_DIR"]).parents
or Path(os.environ["PLATFORMIO_BUILD_DIR"]) == setup_core / "build" / "test"
)
assert "PLATFORMIO_LIBDEPS_DIR" in os.environ
assert "PYTHONWARNINGS" in os.environ
# Check command was called correctly
mock_run_external_command.assert_called_once()
args = mock_run_external_command.call_args[0]
assert "platformio" in args
assert "test" in args
assert "arg" in args
def test_run_platformio_cli_run_builds_command(
setup_core: Path, mock_run_platformio_cli: Mock
) -> None:
"""Test run_platformio_cli_run builds correct command."""
CORE.build_path = str(setup_core / "build" / "test")
mock_run_platformio_cli.return_value = 0
config = {"name": "test"}
platformio_api.run_platformio_cli_run(config, True, "extra", "args")
mock_run_platformio_cli.assert_called_once_with(
"run", "-d", CORE.build_path, "-v", "extra", "args"
)
def test_run_compile(setup_core: Path, mock_run_platformio_cli_run: Mock) -> None:
"""Test run_compile with process limit."""
from esphome.const import CONF_COMPILE_PROCESS_LIMIT, CONF_ESPHOME
CORE.build_path = str(setup_core / "build" / "test")
config = {CONF_ESPHOME: {CONF_COMPILE_PROCESS_LIMIT: 4}}
mock_run_platformio_cli_run.return_value = 0
platformio_api.run_compile(config, verbose=True)
mock_run_platformio_cli_run.assert_called_once_with(config, True, "-j4")
def test_get_idedata_caches_result(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test get_idedata caches result in CORE.data."""
from esphome.const import KEY_CORE
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
CORE.data[KEY_CORE] = {}
# Create platformio.ini to avoid regeneration
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
platformio_ini.write_text("content")
# Mock platformio to return data
idedata = {"prog_path": "/test/firmware.elf"}
mock_run_platformio_cli_run.return_value = json.dumps(idedata)
config = {"name": "test"}
# First call should load and cache
result1 = platformio_api.get_idedata(config)
mock_run_platformio_cli_run.assert_called_once()
# Second call should use cache from CORE.data
result2 = platformio_api.get_idedata(config)
mock_run_platformio_cli_run.assert_called_once() # Still only called once
assert result1 is result2
assert isinstance(result1, platformio_api.IDEData)
assert result1.firmware_elf_path == "/test/firmware.elf"
def test_idedata_addr2line_path_windows(setup_core: Path) -> None:
"""Test IDEData.addr2line_path on Windows."""
raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "C:\\tools\\gcc.exe"}
idedata = platformio_api.IDEData(raw_data)
result = idedata.addr2line_path
assert result == "C:\\tools\\addr2line.exe"
def test_idedata_addr2line_path_unix(setup_core: Path) -> None:
"""Test IDEData.addr2line_path on Unix."""
raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "/usr/bin/gcc"}
idedata = platformio_api.IDEData(raw_data)
result = idedata.addr2line_path
assert result == "/usr/bin/addr2line"
def test_patch_structhash(setup_core: Path) -> None:
"""Test patch_structhash monkey patches platformio functions."""
# Create simple namespace objects to act as modules
mock_cli = SimpleNamespace()
mock_helpers = SimpleNamespace()
mock_run = SimpleNamespace(cli=mock_cli, helpers=mock_helpers)
# Mock platformio modules
with patch.dict(
"sys.modules",
{
"platformio.run.cli": mock_cli,
"platformio.run.helpers": mock_helpers,
"platformio.run": mock_run,
"platformio.project.helpers": MagicMock(),
"platformio.fs": MagicMock(),
"platformio": MagicMock(),
},
):
# Call patch_structhash
platformio_api.patch_structhash()
# Verify both modules had clean_build_dir patched
# Check that clean_build_dir was set on both modules
assert hasattr(mock_cli, "clean_build_dir")
assert hasattr(mock_helpers, "clean_build_dir")
# Verify they got the same function assigned
assert mock_cli.clean_build_dir is mock_helpers.clean_build_dir
# Verify it's a real function (not a Mock)
assert callable(mock_cli.clean_build_dir)
assert mock_cli.clean_build_dir.__name__ == "patched_clean_build_dir"
def test_patched_clean_build_dir_removes_outdated(setup_core: Path) -> None:
"""Test patched_clean_build_dir removes build dir when platformio.ini is newer."""
build_dir = setup_core / "build"
build_dir.mkdir()
platformio_ini = setup_core / "platformio.ini"
platformio_ini.write_text("config")
# Make platformio.ini newer than build_dir
build_mtime = build_dir.stat().st_mtime
os.utime(platformio_ini, (build_mtime + 1, build_mtime + 1))
# Track if directory was removed
removed_paths: list[str] = []
def track_rmtree(path: str) -> None:
removed_paths.append(path)
shutil.rmtree(path)
# Create mock modules that patch_structhash expects
mock_cli = SimpleNamespace()
mock_helpers = SimpleNamespace()
mock_project_helpers = MagicMock()
mock_project_helpers.get_project_dir.return_value = str(setup_core)
mock_fs = SimpleNamespace(rmtree=track_rmtree)
with patch.dict(
"sys.modules",
{
"platformio": SimpleNamespace(fs=mock_fs),
"platformio.fs": mock_fs,
"platformio.project.helpers": mock_project_helpers,
"platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers),
"platformio.run.cli": mock_cli,
"platformio.run.helpers": mock_helpers,
},
):
# Call patch_structhash to install the patched function
platformio_api.patch_structhash()
# Call the patched function
mock_helpers.clean_build_dir(str(build_dir), [])
# Verify directory was removed and recreated
assert len(removed_paths) == 1
assert removed_paths[0] == str(build_dir)
assert build_dir.exists() # makedirs recreated it
def test_patched_clean_build_dir_keeps_updated(setup_core: Path) -> None:
"""Test patched_clean_build_dir keeps build dir when it's up to date."""
build_dir = setup_core / "build"
build_dir.mkdir()
test_file = build_dir / "test.txt"
test_file.write_text("test content")
platformio_ini = setup_core / "platformio.ini"
platformio_ini.write_text("config")
# Make build_dir newer than platformio.ini
ini_mtime = platformio_ini.stat().st_mtime
os.utime(build_dir, (ini_mtime + 1, ini_mtime + 1))
# Track if rmtree is called
removed_paths: list[str] = []
def track_rmtree(path: str) -> None:
removed_paths.append(path)
# Create mock modules
mock_cli = SimpleNamespace()
mock_helpers = SimpleNamespace()
mock_project_helpers = MagicMock()
mock_project_helpers.get_project_dir.return_value = str(setup_core)
mock_fs = SimpleNamespace(rmtree=track_rmtree)
with patch.dict(
"sys.modules",
{
"platformio": SimpleNamespace(fs=mock_fs),
"platformio.fs": mock_fs,
"platformio.project.helpers": mock_project_helpers,
"platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers),
"platformio.run.cli": mock_cli,
"platformio.run.helpers": mock_helpers,
},
):
# Call patch_structhash to install the patched function
platformio_api.patch_structhash()
# Call the patched function
mock_helpers.clean_build_dir(str(build_dir), [])
# Verify rmtree was NOT called
assert len(removed_paths) == 0
# Verify directory and file still exist
assert build_dir.exists()
assert test_file.exists()
assert test_file.read_text() == "test content"
def test_patched_clean_build_dir_creates_missing(setup_core: Path) -> None:
"""Test patched_clean_build_dir creates build dir when it doesn't exist."""
build_dir = setup_core / "build"
platformio_ini = setup_core / "platformio.ini"
platformio_ini.write_text("config")
# Ensure build_dir doesn't exist
assert not build_dir.exists()
# Track if rmtree is called
removed_paths: list[str] = []
def track_rmtree(path: str) -> None:
removed_paths.append(path)
# Create mock modules
mock_cli = SimpleNamespace()
mock_helpers = SimpleNamespace()
mock_project_helpers = MagicMock()
mock_project_helpers.get_project_dir.return_value = str(setup_core)
mock_fs = SimpleNamespace(rmtree=track_rmtree)
with patch.dict(
"sys.modules",
{
"platformio": SimpleNamespace(fs=mock_fs),
"platformio.fs": mock_fs,
"platformio.project.helpers": mock_project_helpers,
"platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers),
"platformio.run.cli": mock_cli,
"platformio.run.helpers": mock_helpers,
},
):
# Call patch_structhash to install the patched function
platformio_api.patch_structhash()
# Call the patched function
mock_helpers.clean_build_dir(str(build_dir), [])
# Verify rmtree was NOT called
assert len(removed_paths) == 0
# Verify directory was created
assert build_dir.exists()
def test_process_stacktrace_esp8266_exception(setup_core: Path, caplog) -> None:
"""Test process_stacktrace handles ESP8266 exceptions."""
config = {"name": "test"}
# Test exception type parsing
line = "Exception (28):"
backtrace_state = False
result = platformio_api.process_stacktrace(config, line, backtrace_state)
assert "Access to invalid address: LOAD (wild pointer?)" in caplog.text
assert result is False
def test_process_stacktrace_esp8266_backtrace(
setup_core: Path, mock_decode_pc: Mock
) -> None:
"""Test process_stacktrace handles ESP8266 multi-line backtrace."""
config = {"name": "test"}
# Start of backtrace
line1 = ">>>stack>>>"
state = platformio_api.process_stacktrace(config, line1, False)
assert state is True
# Backtrace content with addresses
line2 = "40201234 40205678"
state = platformio_api.process_stacktrace(config, line2, state)
assert state is True
assert mock_decode_pc.call_count == 2
# End of backtrace
line3 = "<<<stack<<<"
state = platformio_api.process_stacktrace(config, line3, state)
assert state is False
def test_process_stacktrace_esp32_backtrace(
setup_core: Path, mock_decode_pc: Mock
) -> None:
"""Test process_stacktrace handles ESP32 single-line backtrace."""
config = {"name": "test"}
line = "Backtrace: 0x40081234:0x3ffb1234 0x40085678:0x3ffb5678"
state = platformio_api.process_stacktrace(config, line, False)
# Should decode both addresses
assert mock_decode_pc.call_count == 2
mock_decode_pc.assert_any_call(config, "40081234")
mock_decode_pc.assert_any_call(config, "40085678")
assert state is False
def test_process_stacktrace_bad_alloc(
setup_core: Path, mock_decode_pc: Mock, caplog
) -> None:
"""Test process_stacktrace handles bad alloc messages."""
config = {"name": "test"}
line = "last failed alloc call: 40201234(512)"
state = platformio_api.process_stacktrace(config, line, False)
assert "Memory allocation of 512 bytes failed at 40201234" in caplog.text
mock_decode_pc.assert_called_once_with(config, "40201234")
assert state is False

View File

@@ -0,0 +1,660 @@
"""Tests for storage_json.py path functions."""
from datetime import datetime
import json
from pathlib import Path
import sys
from unittest.mock import MagicMock, Mock, patch
import pytest
from esphome import storage_json
from esphome.const import CONF_DISABLED, CONF_MDNS
from esphome.core import CORE
def test_storage_path(setup_core: Path) -> None:
"""Test storage_path returns correct path for current config."""
CORE.config_path = str(setup_core / "my_device.yaml")
result = storage_json.storage_path()
data_dir = Path(CORE.data_dir)
expected = str(data_dir / "storage" / "my_device.yaml.json")
assert result == expected
def test_ext_storage_path(setup_core: Path) -> None:
"""Test ext_storage_path returns correct path for given filename."""
result = storage_json.ext_storage_path("other_device.yaml")
data_dir = Path(CORE.data_dir)
expected = str(data_dir / "storage" / "other_device.yaml.json")
assert result == expected
def test_ext_storage_path_handles_various_extensions(setup_core: Path) -> None:
"""Test ext_storage_path works with different file extensions."""
result_yml = storage_json.ext_storage_path("device.yml")
assert result_yml.endswith("device.yml.json")
result_no_ext = storage_json.ext_storage_path("device")
assert result_no_ext.endswith("device.json")
result_path = storage_json.ext_storage_path("my/device.yaml")
assert result_path.endswith("device.yaml.json")
def test_esphome_storage_path(setup_core: Path) -> None:
"""Test esphome_storage_path returns correct path."""
result = storage_json.esphome_storage_path()
data_dir = Path(CORE.data_dir)
expected = str(data_dir / "esphome.json")
assert result == expected
def test_ignored_devices_storage_path(setup_core: Path) -> None:
"""Test ignored_devices_storage_path returns correct path."""
result = storage_json.ignored_devices_storage_path()
data_dir = Path(CORE.data_dir)
expected = str(data_dir / "ignored-devices.json")
assert result == expected
def test_trash_storage_path(setup_core: Path) -> None:
"""Test trash_storage_path returns correct path."""
CORE.config_path = str(setup_core / "configs" / "device.yaml")
result = storage_json.trash_storage_path()
expected = str(setup_core / "configs" / "trash")
assert result == expected
def test_archive_storage_path(setup_core: Path) -> None:
"""Test archive_storage_path returns correct path."""
CORE.config_path = str(setup_core / "configs" / "device.yaml")
result = storage_json.archive_storage_path()
expected = str(setup_core / "configs" / "archive")
assert result == expected
def test_storage_path_with_subdirectory(setup_core: Path) -> None:
"""Test storage paths work correctly when config is in subdirectory."""
subdir = setup_core / "configs" / "basement"
subdir.mkdir(parents=True, exist_ok=True)
CORE.config_path = str(subdir / "sensor.yaml")
result = storage_json.storage_path()
data_dir = Path(CORE.data_dir)
expected = str(data_dir / "storage" / "sensor.yaml.json")
assert result == expected
def test_storage_json_firmware_bin_path_property(setup_core: Path) -> None:
"""Test StorageJSON firmware_bin_path property."""
storage = storage_json.StorageJSON(
storage_version=1,
name="test_device",
friendly_name="Test Device",
comment=None,
esphome_version="2024.1.0",
src_version=None,
address="192.168.1.100",
web_port=80,
target_platform="ESP32",
build_path="build/test_device",
firmware_bin_path="/path/to/firmware.bin",
loaded_integrations={"wifi", "api"},
loaded_platforms=set(),
no_mdns=False,
)
assert storage.firmware_bin_path == "/path/to/firmware.bin"
def test_storage_json_save_creates_directory(
setup_core: Path, tmp_path: Path, mock_write_file_if_changed: Mock
) -> None:
"""Test StorageJSON.save creates storage directory if it doesn't exist."""
storage_dir = tmp_path / "new_data" / "storage"
storage_file = storage_dir / "test.json"
assert not storage_dir.exists()
storage = storage_json.StorageJSON(
storage_version=1,
name="test",
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=None,
address="test.local",
web_port=None,
target_platform="ESP8266",
build_path=None,
firmware_bin_path=None,
loaded_integrations=set(),
loaded_platforms=set(),
no_mdns=False,
)
storage.save(str(storage_file))
mock_write_file_if_changed.assert_called_once()
call_args = mock_write_file_if_changed.call_args[0]
assert call_args[0] == str(storage_file)
def test_storage_json_from_wizard(setup_core: Path) -> None:
"""Test StorageJSON.from_wizard creates correct storage object."""
storage = storage_json.StorageJSON.from_wizard(
name="my_device",
friendly_name="My Device",
address="my_device.local",
platform="ESP32",
)
assert storage.name == "my_device"
assert storage.friendly_name == "My Device"
assert storage.address == "my_device.local"
assert storage.target_platform == "ESP32"
assert storage.build_path is None
assert storage.firmware_bin_path is None
@pytest.mark.skipif(sys.platform == "win32", reason="HA addons don't run on Windows")
@patch("esphome.core.is_ha_addon")
def test_storage_paths_with_ha_addon(mock_is_ha_addon: bool, tmp_path: Path) -> None:
"""Test storage paths when running as Home Assistant addon."""
mock_is_ha_addon.return_value = True
CORE.config_path = str(tmp_path / "test.yaml")
result = storage_json.storage_path()
# When is_ha_addon is True, CORE.data_dir returns "/data"
# This is the standard mount point for HA addon containers
expected = str(Path("/data") / "storage" / "test.yaml.json")
assert result == expected
result = storage_json.esphome_storage_path()
expected = str(Path("/data") / "esphome.json")
assert result == expected
def test_storage_json_as_dict() -> None:
"""Test StorageJSON.as_dict returns correct dictionary."""
storage = storage_json.StorageJSON(
storage_version=1,
name="test_device",
friendly_name="Test Device",
comment="Test comment",
esphome_version="2024.1.0",
src_version=1,
address="192.168.1.100",
web_port=80,
target_platform="ESP32",
build_path="/path/to/build",
firmware_bin_path="/path/to/firmware.bin",
loaded_integrations={"wifi", "api", "ota"},
loaded_platforms={"sensor", "binary_sensor"},
no_mdns=True,
framework="arduino",
core_platform="esp32",
)
result = storage.as_dict()
assert result["storage_version"] == 1
assert result["name"] == "test_device"
assert result["friendly_name"] == "Test Device"
assert result["comment"] == "Test comment"
assert result["esphome_version"] == "2024.1.0"
assert result["src_version"] == 1
assert result["address"] == "192.168.1.100"
assert result["web_port"] == 80
assert result["esp_platform"] == "ESP32"
assert result["build_path"] == "/path/to/build"
assert result["firmware_bin_path"] == "/path/to/firmware.bin"
assert "api" in result["loaded_integrations"]
assert "wifi" in result["loaded_integrations"]
assert "ota" in result["loaded_integrations"]
assert result["loaded_integrations"] == sorted(
["wifi", "api", "ota"]
) # Should be sorted
assert "sensor" in result["loaded_platforms"]
assert result["loaded_platforms"] == sorted(
["sensor", "binary_sensor"]
) # Should be sorted
assert result["no_mdns"] is True
assert result["framework"] == "arduino"
assert result["core_platform"] == "esp32"
def test_storage_json_to_json() -> None:
"""Test StorageJSON.to_json returns valid JSON string."""
storage = storage_json.StorageJSON(
storage_version=1,
name="test",
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=None,
address="test.local",
web_port=None,
target_platform="ESP8266",
build_path=None,
firmware_bin_path=None,
loaded_integrations=set(),
loaded_platforms=set(),
no_mdns=False,
)
json_str = storage.to_json()
# Should be valid JSON
parsed = json.loads(json_str)
assert parsed["name"] == "test"
assert parsed["storage_version"] == 1
# Should end with newline
assert json_str.endswith("\n")
def test_storage_json_save(tmp_path: Path) -> None:
"""Test StorageJSON.save writes file correctly."""
storage = storage_json.StorageJSON(
storage_version=1,
name="test",
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=None,
address="test.local",
web_port=None,
target_platform="ESP32",
build_path=None,
firmware_bin_path=None,
loaded_integrations=set(),
loaded_platforms=set(),
no_mdns=False,
)
save_path = tmp_path / "test.json"
with patch("esphome.storage_json.write_file_if_changed") as mock_write:
storage.save(str(save_path))
mock_write.assert_called_once_with(str(save_path), storage.to_json())
def test_storage_json_from_esphome_core(setup_core: Path) -> None:
"""Test StorageJSON.from_esphome_core creates correct storage object."""
# Mock CORE object
mock_core = MagicMock()
mock_core.name = "my_device"
mock_core.friendly_name = "My Device"
mock_core.comment = "A test device"
mock_core.address = "192.168.1.50"
mock_core.web_port = 8080
mock_core.target_platform = "esp32"
mock_core.is_esp32 = True
mock_core.build_path = "/build/my_device"
mock_core.firmware_bin = "/build/my_device/firmware.bin"
mock_core.loaded_integrations = {"wifi", "api"}
mock_core.loaded_platforms = {"sensor"}
mock_core.config = {CONF_MDNS: {CONF_DISABLED: True}}
mock_core.target_framework = "esp-idf"
with patch("esphome.components.esp32.get_esp32_variant") as mock_variant:
mock_variant.return_value = "ESP32-C3"
result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None)
assert result.name == "my_device"
assert result.friendly_name == "My Device"
assert result.comment == "A test device"
assert result.address == "192.168.1.50"
assert result.web_port == 8080
assert result.target_platform == "ESP32-C3"
assert result.build_path == "/build/my_device"
assert result.firmware_bin_path == "/build/my_device/firmware.bin"
assert result.loaded_integrations == {"wifi", "api"}
assert result.loaded_platforms == {"sensor"}
assert result.no_mdns is True
assert result.framework == "esp-idf"
assert result.core_platform == "esp32"
def test_storage_json_from_esphome_core_mdns_enabled(setup_core: Path) -> None:
"""Test from_esphome_core with mDNS enabled."""
mock_core = MagicMock()
mock_core.name = "test"
mock_core.friendly_name = "Test"
mock_core.comment = None
mock_core.address = "test.local"
mock_core.web_port = None
mock_core.target_platform = "esp8266"
mock_core.is_esp32 = False
mock_core.build_path = "/build"
mock_core.firmware_bin = "/build/firmware.bin"
mock_core.loaded_integrations = set()
mock_core.loaded_platforms = set()
mock_core.config = {} # No MDNS config means enabled
mock_core.target_framework = "arduino"
result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None)
assert result.no_mdns is False
def test_storage_json_load_valid_file(tmp_path: Path) -> None:
"""Test StorageJSON.load with valid JSON file."""
storage_data = {
"storage_version": 1,
"name": "loaded_device",
"friendly_name": "Loaded Device",
"comment": "Loaded from file",
"esphome_version": "2024.1.0",
"src_version": 2,
"address": "10.0.0.1",
"web_port": 8080,
"esp_platform": "ESP32",
"build_path": "/loaded/build",
"firmware_bin_path": "/loaded/firmware.bin",
"loaded_integrations": ["wifi", "api"],
"loaded_platforms": ["sensor"],
"no_mdns": True,
"framework": "arduino",
"core_platform": "esp32",
}
file_path = tmp_path / "storage.json"
file_path.write_text(json.dumps(storage_data))
result = storage_json.StorageJSON.load(str(file_path))
assert result is not None
assert result.name == "loaded_device"
assert result.friendly_name == "Loaded Device"
assert result.comment == "Loaded from file"
assert result.esphome_version == "2024.1.0"
assert result.src_version == 2
assert result.address == "10.0.0.1"
assert result.web_port == 8080
assert result.target_platform == "ESP32"
assert result.build_path == "/loaded/build"
assert result.firmware_bin_path == "/loaded/firmware.bin"
assert result.loaded_integrations == {"wifi", "api"}
assert result.loaded_platforms == {"sensor"}
assert result.no_mdns is True
assert result.framework == "arduino"
assert result.core_platform == "esp32"
def test_storage_json_load_invalid_file(tmp_path: Path) -> None:
"""Test StorageJSON.load with invalid JSON file."""
file_path = tmp_path / "invalid.json"
file_path.write_text("not valid json{")
result = storage_json.StorageJSON.load(str(file_path))
assert result is None
def test_storage_json_load_nonexistent_file() -> None:
"""Test StorageJSON.load with non-existent file."""
result = storage_json.StorageJSON.load("/nonexistent/file.json")
assert result is None
def test_storage_json_equality() -> None:
"""Test StorageJSON equality comparison."""
storage1 = storage_json.StorageJSON(
storage_version=1,
name="test",
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=1,
address="test.local",
web_port=80,
target_platform="ESP32",
build_path="/build",
firmware_bin_path="/firmware.bin",
loaded_integrations={"wifi"},
loaded_platforms=set(),
no_mdns=False,
)
storage2 = storage_json.StorageJSON(
storage_version=1,
name="test",
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=1,
address="test.local",
web_port=80,
target_platform="ESP32",
build_path="/build",
firmware_bin_path="/firmware.bin",
loaded_integrations={"wifi"},
loaded_platforms=set(),
no_mdns=False,
)
storage3 = storage_json.StorageJSON(
storage_version=1,
name="different", # Different name
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=1,
address="test.local",
web_port=80,
target_platform="ESP32",
build_path="/build",
firmware_bin_path="/firmware.bin",
loaded_integrations={"wifi"},
loaded_platforms=set(),
no_mdns=False,
)
assert storage1 == storage2
assert storage1 != storage3
assert storage1 != "not a storage object"
def test_esphome_storage_json_as_dict() -> None:
"""Test EsphomeStorageJSON.as_dict returns correct dictionary."""
storage = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret123",
last_update_check="2024-01-15T10:30:00",
remote_version="2024.1.1",
)
result = storage.as_dict()
assert result["storage_version"] == 1
assert result["cookie_secret"] == "secret123"
assert result["last_update_check"] == "2024-01-15T10:30:00"
assert result["remote_version"] == "2024.1.1"
def test_esphome_storage_json_last_update_check_property() -> None:
"""Test EsphomeStorageJSON.last_update_check property."""
storage = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret",
last_update_check="2024-01-15T10:30:00",
remote_version=None,
)
# Test getter
result = storage.last_update_check
assert isinstance(result, datetime)
assert result.year == 2024
assert result.month == 1
assert result.day == 15
assert result.hour == 10
assert result.minute == 30
# Test setter
new_date = datetime(2024, 2, 20, 15, 45, 30)
storage.last_update_check = new_date
assert storage.last_update_check_str == "2024-02-20T15:45:30"
def test_esphome_storage_json_last_update_check_invalid() -> None:
"""Test EsphomeStorageJSON.last_update_check with invalid date."""
storage = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret",
last_update_check="invalid date",
remote_version=None,
)
result = storage.last_update_check
assert result is None
def test_esphome_storage_json_to_json() -> None:
"""Test EsphomeStorageJSON.to_json returns valid JSON string."""
storage = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="mysecret",
last_update_check="2024-01-15T10:30:00",
remote_version="2024.1.1",
)
json_str = storage.to_json()
# Should be valid JSON
parsed = json.loads(json_str)
assert parsed["cookie_secret"] == "mysecret"
assert parsed["storage_version"] == 1
# Should end with newline
assert json_str.endswith("\n")
def test_esphome_storage_json_save(tmp_path: Path) -> None:
"""Test EsphomeStorageJSON.save writes file correctly."""
storage = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret",
last_update_check=None,
remote_version=None,
)
save_path = tmp_path / "esphome.json"
with patch("esphome.storage_json.write_file_if_changed") as mock_write:
storage.save(str(save_path))
mock_write.assert_called_once_with(str(save_path), storage.to_json())
def test_esphome_storage_json_load_valid_file(tmp_path: Path) -> None:
"""Test EsphomeStorageJSON.load with valid JSON file."""
storage_data = {
"storage_version": 1,
"cookie_secret": "loaded_secret",
"last_update_check": "2024-01-20T14:30:00",
"remote_version": "2024.1.2",
}
file_path = tmp_path / "esphome.json"
file_path.write_text(json.dumps(storage_data))
result = storage_json.EsphomeStorageJSON.load(str(file_path))
assert result is not None
assert result.storage_version == 1
assert result.cookie_secret == "loaded_secret"
assert result.last_update_check_str == "2024-01-20T14:30:00"
assert result.remote_version == "2024.1.2"
def test_esphome_storage_json_load_invalid_file(tmp_path: Path) -> None:
"""Test EsphomeStorageJSON.load with invalid JSON file."""
file_path = tmp_path / "invalid.json"
file_path.write_text("not valid json{")
result = storage_json.EsphomeStorageJSON.load(str(file_path))
assert result is None
def test_esphome_storage_json_load_nonexistent_file() -> None:
"""Test EsphomeStorageJSON.load with non-existent file."""
result = storage_json.EsphomeStorageJSON.load("/nonexistent/file.json")
assert result is None
def test_esphome_storage_json_get_default() -> None:
"""Test EsphomeStorageJSON.get_default creates default storage."""
with patch("esphome.storage_json.os.urandom") as mock_urandom:
# Mock urandom to return predictable bytes
mock_urandom.return_value = b"test" * 16 # 64 bytes
result = storage_json.EsphomeStorageJSON.get_default()
assert result.storage_version == 1
assert len(result.cookie_secret) == 128 # 64 bytes hex = 128 chars
assert result.last_update_check is None
assert result.remote_version is None
def test_esphome_storage_json_equality() -> None:
"""Test EsphomeStorageJSON equality comparison."""
storage1 = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret",
last_update_check="2024-01-15T10:30:00",
remote_version="2024.1.1",
)
storage2 = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret",
last_update_check="2024-01-15T10:30:00",
remote_version="2024.1.1",
)
storage3 = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="different", # Different secret
last_update_check="2024-01-15T10:30:00",
remote_version="2024.1.1",
)
assert storage1 == storage2
assert storage1 != storage3
assert storage1 != "not a storage object"
def test_storage_json_load_legacy_esphomeyaml_version(tmp_path: Path) -> None:
"""Test loading storage with legacy esphomeyaml_version field."""
storage_data = {
"storage_version": 1,
"name": "legacy_device",
"friendly_name": "Legacy Device",
"esphomeyaml_version": "1.14.0", # Legacy field name
"address": "legacy.local",
"esp_platform": "ESP8266",
}
file_path = tmp_path / "legacy.json"
file_path.write_text(json.dumps(storage_data))
result = storage_json.StorageJSON.load(str(file_path))
assert result is not None
assert result.esphome_version == "1.14.0" # Should map to esphome_version