mirror of
https://github.com/esphome/esphome.git
synced 2025-11-01 15:41:52 +00:00
Compare commits
16 Commits
2025.9.0b2
...
2025.9.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d2df232706 | ||
|
|
404e679e66 | ||
|
|
8d401ad05a | ||
|
|
e542816f7d | ||
|
|
12cadf0a04 | ||
|
|
adc3d3127d | ||
|
|
61ab682099 | ||
|
|
c05b7cca5e | ||
|
|
6ac395da6d | ||
|
|
54616ae1b4 | ||
|
|
e33dcda907 | ||
|
|
04c1b90e57 | ||
|
|
ddb8fedef7 | ||
|
|
04f4f79cb4 | ||
|
|
8890071360 | ||
|
|
4b3a997a8e |
2
Doxyfile
2
Doxyfile
@@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome
|
||||
# could be handy for archiving the generated documentation or if some version
|
||||
# control system is used.
|
||||
|
||||
PROJECT_NUMBER = 2025.9.0b2
|
||||
PROJECT_NUMBER = 2025.9.0
|
||||
|
||||
# Using the PROJECT_BRIEF tag one can provide an optional one line description
|
||||
# for a project that appears at the top of each page and should give viewer a
|
||||
|
||||
@@ -113,7 +113,7 @@ void ADE7880::update() {
|
||||
if (this->channel_a_ != nullptr) {
|
||||
auto *chan = this->channel_a_;
|
||||
this->update_sensor_from_s24zp_register16_(chan->current, AIRMS, [](float val) { return val / 100000.0f; });
|
||||
this->update_sensor_from_s24zp_register16_(chan->voltage, BVRMS, [](float val) { return val / 10000.0f; });
|
||||
this->update_sensor_from_s24zp_register16_(chan->voltage, AVRMS, [](float val) { return val / 10000.0f; });
|
||||
this->update_sensor_from_s24zp_register16_(chan->active_power, AWATT, [](float val) { return val / 100.0f; });
|
||||
this->update_sensor_from_s24zp_register16_(chan->apparent_power, AVA, [](float val) { return val / 100.0f; });
|
||||
this->update_sensor_from_s16_register16_(chan->power_factor, APF,
|
||||
|
||||
@@ -77,6 +77,13 @@ ETHERNET_TYPES = {
|
||||
"DM9051": EthernetType.ETHERNET_TYPE_DM9051,
|
||||
}
|
||||
|
||||
# PHY types that need compile-time defines for conditional compilation
|
||||
_PHY_TYPE_TO_DEFINE = {
|
||||
"KSZ8081": "USE_ETHERNET_KSZ8081",
|
||||
"KSZ8081RNA": "USE_ETHERNET_KSZ8081",
|
||||
# Add other PHY types here only if they need conditional compilation
|
||||
}
|
||||
|
||||
SPI_ETHERNET_TYPES = ["W5500", "DM9051"]
|
||||
SPI_ETHERNET_DEFAULT_POLLING_INTERVAL = TimePeriodMilliseconds(milliseconds=10)
|
||||
|
||||
@@ -345,6 +352,10 @@ async def to_code(config):
|
||||
if CONF_MANUAL_IP in config:
|
||||
cg.add(var.set_manual_ip(manual_ip(config[CONF_MANUAL_IP])))
|
||||
|
||||
# Add compile-time define for PHY types with specific code
|
||||
if phy_define := _PHY_TYPE_TO_DEFINE.get(config[CONF_TYPE]):
|
||||
cg.add_define(phy_define)
|
||||
|
||||
cg.add_define("USE_ETHERNET")
|
||||
|
||||
# Disable WiFi when using Ethernet to save memory
|
||||
|
||||
@@ -229,10 +229,12 @@ void EthernetComponent::setup() {
|
||||
ESPHL_ERROR_CHECK(err, "ETH driver install error");
|
||||
|
||||
#ifndef USE_ETHERNET_SPI
|
||||
#ifdef USE_ETHERNET_KSZ8081
|
||||
if (this->type_ == ETHERNET_TYPE_KSZ8081RNA && this->clk_mode_ == EMAC_CLK_OUT) {
|
||||
// KSZ8081RNA default is incorrect. It expects a 25MHz clock instead of the 50MHz we provide.
|
||||
this->ksz8081_set_clock_reference_(mac);
|
||||
}
|
||||
#endif // USE_ETHERNET_KSZ8081
|
||||
|
||||
for (const auto &phy_register : this->phy_registers_) {
|
||||
this->write_phy_register_(mac, phy_register);
|
||||
@@ -721,6 +723,7 @@ bool EthernetComponent::powerdown() {
|
||||
|
||||
#ifndef USE_ETHERNET_SPI
|
||||
|
||||
#ifdef USE_ETHERNET_KSZ8081
|
||||
constexpr uint8_t KSZ80XX_PC2R_REG_ADDR = 0x1F;
|
||||
|
||||
void EthernetComponent::ksz8081_set_clock_reference_(esp_eth_mac_t *mac) {
|
||||
@@ -749,6 +752,7 @@ void EthernetComponent::ksz8081_set_clock_reference_(esp_eth_mac_t *mac) {
|
||||
ESP_LOGVV(TAG, "KSZ8081 PHY Control 2: %s", format_hex_pretty((u_int8_t *) &phy_control_2, 2).c_str());
|
||||
}
|
||||
}
|
||||
#endif // USE_ETHERNET_KSZ8081
|
||||
|
||||
void EthernetComponent::write_phy_register_(esp_eth_mac_t *mac, PHYRegister register_data) {
|
||||
esp_err_t err;
|
||||
|
||||
@@ -104,8 +104,10 @@ class EthernetComponent : public Component {
|
||||
void start_connect_();
|
||||
void finish_connect_();
|
||||
void dump_connect_params_();
|
||||
#ifdef USE_ETHERNET_KSZ8081
|
||||
/// @brief Set `RMII Reference Clock Select` bit for KSZ8081.
|
||||
void ksz8081_set_clock_reference_(esp_eth_mac_t *mac);
|
||||
#endif
|
||||
/// @brief Set arbitratry PHY registers from config.
|
||||
void write_phy_register_(esp_eth_mac_t *mac, PHYRegister register_data);
|
||||
|
||||
|
||||
@@ -491,7 +491,7 @@ bool MQTTClientComponent::publish(const std::string &topic, const std::string &p
|
||||
|
||||
bool MQTTClientComponent::publish(const std::string &topic, const char *payload, size_t payload_length, uint8_t qos,
|
||||
bool retain) {
|
||||
return publish({.topic = topic, .payload = payload, .qos = qos, .retain = retain});
|
||||
return publish({.topic = topic, .payload = std::string(payload, payload_length), .qos = qos, .retain = retain});
|
||||
}
|
||||
|
||||
bool MQTTClientComponent::publish(const MQTTMessage &message) {
|
||||
|
||||
@@ -28,12 +28,12 @@ bool Select::has_option(const std::string &option) const { return this->index_of
|
||||
bool Select::has_index(size_t index) const { return index < this->size(); }
|
||||
|
||||
size_t Select::size() const {
|
||||
auto options = traits.get_options();
|
||||
const auto &options = traits.get_options();
|
||||
return options.size();
|
||||
}
|
||||
|
||||
optional<size_t> Select::index_of(const std::string &option) const {
|
||||
auto options = traits.get_options();
|
||||
const auto &options = traits.get_options();
|
||||
auto it = std::find(options.begin(), options.end(), option);
|
||||
if (it == options.end()) {
|
||||
return {};
|
||||
@@ -51,7 +51,7 @@ optional<size_t> Select::active_index() const {
|
||||
|
||||
optional<std::string> Select::at(size_t index) const {
|
||||
if (this->has_index(index)) {
|
||||
auto options = traits.get_options();
|
||||
const auto &options = traits.get_options();
|
||||
return options.at(index);
|
||||
} else {
|
||||
return {};
|
||||
|
||||
@@ -45,7 +45,7 @@ void SelectCall::perform() {
|
||||
auto *parent = this->parent_;
|
||||
const auto *name = parent->get_name().c_str();
|
||||
const auto &traits = parent->traits;
|
||||
auto options = traits.get_options();
|
||||
const auto &options = traits.get_options();
|
||||
|
||||
if (this->operation_ == SELECT_OP_NONE) {
|
||||
ESP_LOGW(TAG, "'%s' - SelectCall performed without selecting an operation", name);
|
||||
|
||||
@@ -4,7 +4,7 @@ from enum import Enum
|
||||
|
||||
from esphome.enum import StrEnum
|
||||
|
||||
__version__ = "2025.9.0b2"
|
||||
__version__ = "2025.9.0"
|
||||
|
||||
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
|
||||
VALID_SUBSTITUTIONS_CHARACTERS = (
|
||||
|
||||
@@ -175,6 +175,7 @@
|
||||
#ifdef USE_ARDUINO
|
||||
#define USE_ARDUINO_VERSION_CODE VERSION_CODE(3, 2, 1)
|
||||
#define USE_ETHERNET
|
||||
#define USE_ETHERNET_KSZ8081
|
||||
#endif
|
||||
|
||||
#ifdef USE_ESP_IDF
|
||||
|
||||
@@ -1038,12 +1038,9 @@ class ArchiveRequestHandler(BaseHandler):
|
||||
shutil.move(config_file, os.path.join(archive_path, configuration))
|
||||
|
||||
storage_json = StorageJSON.load(storage_path)
|
||||
if storage_json is not None:
|
||||
if storage_json is not None and storage_json.build_path:
|
||||
# Delete build folder (if exists)
|
||||
name = storage_json.name
|
||||
build_folder = os.path.join(settings.config_dir, name)
|
||||
if build_folder is not None:
|
||||
shutil.rmtree(build_folder, os.path.join(archive_path, name))
|
||||
shutil.rmtree(storage_json.build_path, ignore_errors=True)
|
||||
|
||||
|
||||
class UnArchiveRequestHandler(BaseHandler):
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
from typing import Literal, NotRequired, TypedDict, Unpack
|
||||
import unicodedata
|
||||
|
||||
import voluptuous as vol
|
||||
@@ -103,11 +104,25 @@ HARDWARE_BASE_CONFIGS = {
|
||||
}
|
||||
|
||||
|
||||
def sanitize_double_quotes(value):
|
||||
def sanitize_double_quotes(value: str) -> str:
|
||||
return value.replace("\\", "\\\\").replace('"', '\\"')
|
||||
|
||||
|
||||
def wizard_file(**kwargs):
|
||||
class WizardFileKwargs(TypedDict):
|
||||
"""Keyword arguments for wizard_file function."""
|
||||
|
||||
name: str
|
||||
platform: Literal["ESP8266", "ESP32", "RP2040", "BK72XX", "LN882X", "RTL87XX"]
|
||||
board: str
|
||||
ssid: NotRequired[str]
|
||||
psk: NotRequired[str]
|
||||
password: NotRequired[str]
|
||||
ota_password: NotRequired[str]
|
||||
api_encryption_key: NotRequired[str]
|
||||
friendly_name: NotRequired[str]
|
||||
|
||||
|
||||
def wizard_file(**kwargs: Unpack[WizardFileKwargs]) -> str:
|
||||
letters = string.ascii_letters + string.digits
|
||||
ap_name_base = kwargs["name"].replace("_", " ").title()
|
||||
ap_name = f"{ap_name_base} Fallback Hotspot"
|
||||
@@ -180,7 +195,25 @@ captive_portal:
|
||||
return config
|
||||
|
||||
|
||||
def wizard_write(path, **kwargs):
|
||||
class WizardWriteKwargs(TypedDict):
|
||||
"""Keyword arguments for wizard_write function."""
|
||||
|
||||
name: str
|
||||
type: Literal["basic", "empty", "upload"]
|
||||
# Required for "basic" type
|
||||
board: NotRequired[str]
|
||||
platform: NotRequired[str]
|
||||
ssid: NotRequired[str]
|
||||
psk: NotRequired[str]
|
||||
password: NotRequired[str]
|
||||
ota_password: NotRequired[str]
|
||||
api_encryption_key: NotRequired[str]
|
||||
friendly_name: NotRequired[str]
|
||||
# Required for "upload" type
|
||||
file_text: NotRequired[str]
|
||||
|
||||
|
||||
def wizard_write(path: str, **kwargs: Unpack[WizardWriteKwargs]) -> bool:
|
||||
from esphome.components.bk72xx import boards as bk72xx_boards
|
||||
from esphome.components.esp32 import boards as esp32_boards
|
||||
from esphome.components.esp8266 import boards as esp8266_boards
|
||||
@@ -237,14 +270,14 @@ def wizard_write(path, **kwargs):
|
||||
|
||||
if get_bool_env(ENV_QUICKWIZARD):
|
||||
|
||||
def sleep(time):
|
||||
def sleep(time: float) -> None:
|
||||
pass
|
||||
|
||||
else:
|
||||
from time import sleep
|
||||
|
||||
|
||||
def safe_print_step(step, big):
|
||||
def safe_print_step(step: int, big: str) -> None:
|
||||
safe_print()
|
||||
safe_print()
|
||||
safe_print(f"============= STEP {step} =============")
|
||||
@@ -253,14 +286,14 @@ def safe_print_step(step, big):
|
||||
sleep(0.25)
|
||||
|
||||
|
||||
def default_input(text, default):
|
||||
def default_input(text: str, default: str) -> str:
|
||||
safe_print()
|
||||
safe_print(f"Press ENTER for default ({default})")
|
||||
return safe_input(text.format(default)) or default
|
||||
|
||||
|
||||
# From https://stackoverflow.com/a/518232/8924614
|
||||
def strip_accents(value):
|
||||
def strip_accents(value: str) -> str:
|
||||
return "".join(
|
||||
c
|
||||
for c in unicodedata.normalize("NFD", str(value))
|
||||
@@ -268,7 +301,7 @@ def strip_accents(value):
|
||||
)
|
||||
|
||||
|
||||
def wizard(path):
|
||||
def wizard(path: str) -> int:
|
||||
from esphome.components.bk72xx import boards as bk72xx_boards
|
||||
from esphome.components.esp32 import boards as esp32_boards
|
||||
from esphome.components.esp8266 import boards as esp8266_boards
|
||||
@@ -509,6 +542,7 @@ def wizard(path):
|
||||
ssid=ssid,
|
||||
psk=psk,
|
||||
password=password,
|
||||
type="basic",
|
||||
):
|
||||
return 1
|
||||
|
||||
|
||||
@@ -315,6 +315,19 @@ def clean_build():
|
||||
_LOGGER.info("Deleting %s", dependencies_lock)
|
||||
os.remove(dependencies_lock)
|
||||
|
||||
# Clean PlatformIO cache to resolve CMake compiler detection issues
|
||||
# This helps when toolchain paths change or get corrupted
|
||||
try:
|
||||
from platformio.project.helpers import get_project_cache_dir
|
||||
except ImportError:
|
||||
# PlatformIO is not available, skip cache cleaning
|
||||
pass
|
||||
else:
|
||||
cache_dir = get_project_cache_dir()
|
||||
if cache_dir and cache_dir.strip() and os.path.isdir(cache_dir):
|
||||
_LOGGER.info("Deleting PlatformIO cache %s", cache_dir)
|
||||
shutil.rmtree(cache_dir)
|
||||
|
||||
|
||||
GITIGNORE_CONTENT = """# Gitignore settings for ESPHome
|
||||
# This is an example and may include too much for your use-case.
|
||||
|
||||
@@ -589,7 +589,7 @@ async def test_archive_request_handler_post(
|
||||
mock_ext_storage_path: MagicMock,
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
"""Test ArchiveRequestHandler.post method."""
|
||||
"""Test ArchiveRequestHandler.post method without storage_json."""
|
||||
|
||||
# Set up temp directories
|
||||
config_dir = Path(get_fixture_path("conf"))
|
||||
@@ -616,6 +616,97 @@ async def test_archive_request_handler_post(
|
||||
).read_text() == "esphome:\n name: test_archive\n"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_archive_handler_with_build_folder(
|
||||
dashboard: DashboardTestHelper,
|
||||
mock_archive_storage_path: MagicMock,
|
||||
mock_ext_storage_path: MagicMock,
|
||||
mock_dashboard_settings: MagicMock,
|
||||
mock_storage_json: MagicMock,
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
"""Test ArchiveRequestHandler.post with storage_json and build folder."""
|
||||
config_dir = tmp_path / "config"
|
||||
config_dir.mkdir()
|
||||
archive_dir = tmp_path / "archive"
|
||||
archive_dir.mkdir()
|
||||
build_dir = tmp_path / "build"
|
||||
build_dir.mkdir()
|
||||
|
||||
configuration = "test_device.yaml"
|
||||
test_config = config_dir / configuration
|
||||
test_config.write_text("esphome:\n name: test_device\n")
|
||||
|
||||
build_folder = build_dir / "test_device"
|
||||
build_folder.mkdir()
|
||||
(build_folder / "firmware.bin").write_text("binary content")
|
||||
(build_folder / ".pioenvs").mkdir()
|
||||
|
||||
mock_dashboard_settings.config_dir = str(config_dir)
|
||||
mock_dashboard_settings.rel_path.return_value = str(test_config)
|
||||
mock_archive_storage_path.return_value = str(archive_dir)
|
||||
|
||||
mock_storage = MagicMock()
|
||||
mock_storage.name = "test_device"
|
||||
mock_storage.build_path = str(build_folder)
|
||||
mock_storage_json.load.return_value = mock_storage
|
||||
|
||||
response = await dashboard.fetch(
|
||||
"/archive",
|
||||
method="POST",
|
||||
body=f"configuration={configuration}",
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
)
|
||||
assert response.code == 200
|
||||
|
||||
assert not test_config.exists()
|
||||
assert (archive_dir / configuration).exists()
|
||||
|
||||
assert not build_folder.exists()
|
||||
assert not (archive_dir / "test_device").exists()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_archive_handler_no_build_folder(
|
||||
dashboard: DashboardTestHelper,
|
||||
mock_archive_storage_path: MagicMock,
|
||||
mock_ext_storage_path: MagicMock,
|
||||
mock_dashboard_settings: MagicMock,
|
||||
mock_storage_json: MagicMock,
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
"""Test ArchiveRequestHandler.post with storage_json but no build folder."""
|
||||
config_dir = tmp_path / "config"
|
||||
config_dir.mkdir()
|
||||
archive_dir = tmp_path / "archive"
|
||||
archive_dir.mkdir()
|
||||
|
||||
configuration = "test_device.yaml"
|
||||
test_config = config_dir / configuration
|
||||
test_config.write_text("esphome:\n name: test_device\n")
|
||||
|
||||
mock_dashboard_settings.config_dir = str(config_dir)
|
||||
mock_dashboard_settings.rel_path.return_value = str(test_config)
|
||||
mock_archive_storage_path.return_value = str(archive_dir)
|
||||
|
||||
mock_storage = MagicMock()
|
||||
mock_storage.name = "test_device"
|
||||
mock_storage.build_path = None
|
||||
mock_storage_json.load.return_value = mock_storage
|
||||
|
||||
response = await dashboard.fetch(
|
||||
"/archive",
|
||||
method="POST",
|
||||
body=f"configuration={configuration}",
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
)
|
||||
assert response.code == 200
|
||||
|
||||
assert not test_config.exists()
|
||||
assert (archive_dir / configuration).exists()
|
||||
assert not (archive_dir / "test_device").exists()
|
||||
|
||||
|
||||
@pytest.mark.skipif(os.name == "nt", reason="Unix sockets are not supported on Windows")
|
||||
@pytest.mark.usefixtures("mock_trash_storage_path", "mock_archive_storage_path")
|
||||
def test_start_web_server_with_unix_socket(tmp_path: Path) -> None:
|
||||
|
||||
@@ -9,8 +9,10 @@ not be part of a unit test suite.
|
||||
|
||||
"""
|
||||
|
||||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
import sys
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -36,3 +38,52 @@ def fixture_path() -> Path:
|
||||
Location of all fixture files.
|
||||
"""
|
||||
return here / "fixtures"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def setup_core(tmp_path: Path) -> Path:
|
||||
"""Set up CORE with test paths."""
|
||||
CORE.config_path = str(tmp_path / "test.yaml")
|
||||
return tmp_path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_write_file_if_changed() -> Generator[Mock, None, None]:
|
||||
"""Mock write_file_if_changed for storage_json."""
|
||||
with patch("esphome.storage_json.write_file_if_changed") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_copy_file_if_changed() -> Generator[Mock, None, None]:
|
||||
"""Mock copy_file_if_changed for core.config."""
|
||||
with patch("esphome.core.config.copy_file_if_changed") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_run_platformio_cli() -> Generator[Mock, None, None]:
|
||||
"""Mock run_platformio_cli for platformio_api."""
|
||||
with patch("esphome.platformio_api.run_platformio_cli") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_run_platformio_cli_run() -> Generator[Mock, None, None]:
|
||||
"""Mock run_platformio_cli_run for platformio_api."""
|
||||
with patch("esphome.platformio_api.run_platformio_cli_run") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_decode_pc() -> Generator[Mock, None, None]:
|
||||
"""Mock _decode_pc for platformio_api."""
|
||||
with patch("esphome.platformio_api._decode_pc") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_run_external_command() -> Generator[Mock, None, None]:
|
||||
"""Mock run_external_command for platformio_api."""
|
||||
with patch("esphome.platformio_api.run_external_command") as mock:
|
||||
yield mock
|
||||
|
||||
@@ -1,15 +1,34 @@
|
||||
"""Unit tests for core config functionality including areas and devices."""
|
||||
|
||||
from collections.abc import Callable
|
||||
import os
|
||||
from pathlib import Path
|
||||
import types
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from esphome import config_validation as cv, core
|
||||
from esphome.const import CONF_AREA, CONF_AREAS, CONF_DEVICES
|
||||
from esphome.core import config
|
||||
from esphome.core.config import Area, validate_area_config
|
||||
from esphome.const import (
|
||||
CONF_AREA,
|
||||
CONF_AREAS,
|
||||
CONF_BUILD_PATH,
|
||||
CONF_DEVICES,
|
||||
CONF_ESPHOME,
|
||||
CONF_NAME,
|
||||
CONF_NAME_ADD_MAC_SUFFIX,
|
||||
KEY_CORE,
|
||||
)
|
||||
from esphome.core import CORE, config
|
||||
from esphome.core.config import (
|
||||
Area,
|
||||
preload_core_config,
|
||||
valid_include,
|
||||
valid_project_name,
|
||||
validate_area_config,
|
||||
validate_hostname,
|
||||
)
|
||||
|
||||
from .common import load_config_from_fixture
|
||||
|
||||
@@ -245,3 +264,316 @@ def test_add_platform_defines_priority() -> None:
|
||||
f"_add_platform_defines priority ({config._add_platform_defines.priority}) must be lower than "
|
||||
f"globals priority ({globals_to_code.priority}) to fix issue #10431 (sensor count bug with lambdas)"
|
||||
)
|
||||
|
||||
|
||||
def test_valid_include_with_angle_brackets() -> None:
|
||||
"""Test valid_include accepts angle bracket includes."""
|
||||
assert valid_include("<ArduinoJson.h>") == "<ArduinoJson.h>"
|
||||
|
||||
|
||||
def test_valid_include_with_valid_file(tmp_path: Path) -> None:
|
||||
"""Test valid_include accepts valid include files."""
|
||||
CORE.config_path = str(tmp_path / "test.yaml")
|
||||
include_file = tmp_path / "include.h"
|
||||
include_file.touch()
|
||||
|
||||
assert valid_include(str(include_file)) == str(include_file)
|
||||
|
||||
|
||||
def test_valid_include_with_valid_directory(tmp_path: Path) -> None:
|
||||
"""Test valid_include accepts valid directories."""
|
||||
CORE.config_path = str(tmp_path / "test.yaml")
|
||||
include_dir = tmp_path / "includes"
|
||||
include_dir.mkdir()
|
||||
|
||||
assert valid_include(str(include_dir)) == str(include_dir)
|
||||
|
||||
|
||||
def test_valid_include_invalid_extension(tmp_path: Path) -> None:
|
||||
"""Test valid_include rejects files with invalid extensions."""
|
||||
CORE.config_path = str(tmp_path / "test.yaml")
|
||||
invalid_file = tmp_path / "file.txt"
|
||||
invalid_file.touch()
|
||||
|
||||
with pytest.raises(cv.Invalid, match="Include has invalid file extension"):
|
||||
valid_include(str(invalid_file))
|
||||
|
||||
|
||||
def test_valid_project_name_valid() -> None:
|
||||
"""Test valid_project_name accepts valid project names."""
|
||||
assert valid_project_name("esphome.my_project") == "esphome.my_project"
|
||||
|
||||
|
||||
def test_valid_project_name_no_namespace() -> None:
|
||||
"""Test valid_project_name rejects names without namespace."""
|
||||
with pytest.raises(cv.Invalid, match="project name needs to have a namespace"):
|
||||
valid_project_name("my_project")
|
||||
|
||||
|
||||
def test_valid_project_name_multiple_dots() -> None:
|
||||
"""Test valid_project_name rejects names with multiple dots."""
|
||||
with pytest.raises(cv.Invalid, match="project name needs to have a namespace"):
|
||||
valid_project_name("esphome.my.project")
|
||||
|
||||
|
||||
def test_validate_hostname_valid() -> None:
|
||||
"""Test validate_hostname accepts valid hostnames."""
|
||||
config = {CONF_NAME: "my-device", CONF_NAME_ADD_MAC_SUFFIX: False}
|
||||
assert validate_hostname(config) == config
|
||||
|
||||
|
||||
def test_validate_hostname_too_long() -> None:
|
||||
"""Test validate_hostname rejects hostnames that are too long."""
|
||||
config = {
|
||||
CONF_NAME: "a" * 32, # 32 chars, max is 31
|
||||
CONF_NAME_ADD_MAC_SUFFIX: False,
|
||||
}
|
||||
with pytest.raises(cv.Invalid, match="Hostnames can only be 31 characters long"):
|
||||
validate_hostname(config)
|
||||
|
||||
|
||||
def test_validate_hostname_too_long_with_mac_suffix() -> None:
|
||||
"""Test validate_hostname accounts for MAC suffix length."""
|
||||
config = {
|
||||
CONF_NAME: "a" * 25, # 25 chars, max is 24 with MAC suffix
|
||||
CONF_NAME_ADD_MAC_SUFFIX: True,
|
||||
}
|
||||
with pytest.raises(cv.Invalid, match="Hostnames can only be 24 characters long"):
|
||||
validate_hostname(config)
|
||||
|
||||
|
||||
def test_validate_hostname_with_underscore(caplog) -> None:
|
||||
"""Test validate_hostname warns about underscores."""
|
||||
config = {CONF_NAME: "my_device", CONF_NAME_ADD_MAC_SUFFIX: False}
|
||||
assert validate_hostname(config) == config
|
||||
assert (
|
||||
"Using the '_' (underscore) character in the hostname is discouraged"
|
||||
in caplog.text
|
||||
)
|
||||
|
||||
|
||||
def test_preload_core_config_basic(setup_core: Path) -> None:
|
||||
"""Test preload_core_config sets basic CORE attributes."""
|
||||
config = {
|
||||
CONF_ESPHOME: {
|
||||
CONF_NAME: "test_device",
|
||||
},
|
||||
"esp32": {},
|
||||
}
|
||||
result = {}
|
||||
|
||||
platform = preload_core_config(config, result)
|
||||
|
||||
assert CORE.name == "test_device"
|
||||
assert platform == "esp32"
|
||||
assert KEY_CORE in CORE.data
|
||||
assert CONF_BUILD_PATH in config[CONF_ESPHOME]
|
||||
# Verify default build path is "build/<device_name>"
|
||||
build_path = config[CONF_ESPHOME][CONF_BUILD_PATH]
|
||||
assert build_path.endswith(os.path.join("build", "test_device"))
|
||||
|
||||
|
||||
def test_preload_core_config_with_build_path(setup_core: Path) -> None:
|
||||
"""Test preload_core_config uses provided build path."""
|
||||
config = {
|
||||
CONF_ESPHOME: {
|
||||
CONF_NAME: "test_device",
|
||||
CONF_BUILD_PATH: "/custom/build/path",
|
||||
},
|
||||
"esp8266": {},
|
||||
}
|
||||
result = {}
|
||||
|
||||
platform = preload_core_config(config, result)
|
||||
|
||||
assert config[CONF_ESPHOME][CONF_BUILD_PATH] == "/custom/build/path"
|
||||
assert platform == "esp8266"
|
||||
|
||||
|
||||
def test_preload_core_config_env_build_path(setup_core: Path) -> None:
|
||||
"""Test preload_core_config uses ESPHOME_BUILD_PATH env var."""
|
||||
config = {
|
||||
CONF_ESPHOME: {
|
||||
CONF_NAME: "test_device",
|
||||
},
|
||||
"rp2040": {},
|
||||
}
|
||||
result = {}
|
||||
|
||||
with patch.dict(os.environ, {"ESPHOME_BUILD_PATH": "/env/build"}):
|
||||
platform = preload_core_config(config, result)
|
||||
|
||||
assert CONF_BUILD_PATH in config[CONF_ESPHOME]
|
||||
assert "test_device" in config[CONF_ESPHOME][CONF_BUILD_PATH]
|
||||
# Verify it uses the env var path with device name appended
|
||||
build_path = config[CONF_ESPHOME][CONF_BUILD_PATH]
|
||||
expected_path = os.path.join("/env/build", "test_device")
|
||||
assert build_path == expected_path or build_path == expected_path.replace(
|
||||
"/", os.sep
|
||||
)
|
||||
assert platform == "rp2040"
|
||||
|
||||
|
||||
def test_preload_core_config_no_platform(setup_core: Path) -> None:
|
||||
"""Test preload_core_config raises when no platform is specified."""
|
||||
config = {
|
||||
CONF_ESPHOME: {
|
||||
CONF_NAME: "test_device",
|
||||
},
|
||||
}
|
||||
result = {}
|
||||
|
||||
# Mock _is_target_platform to avoid expensive component loading
|
||||
with patch("esphome.core.config._is_target_platform") as mock_is_platform:
|
||||
# Return True for known platforms
|
||||
mock_is_platform.side_effect = lambda name: name in [
|
||||
"esp32",
|
||||
"esp8266",
|
||||
"rp2040",
|
||||
]
|
||||
|
||||
with pytest.raises(cv.Invalid, match="Platform missing"):
|
||||
preload_core_config(config, result)
|
||||
|
||||
|
||||
def test_preload_core_config_multiple_platforms(setup_core: Path) -> None:
|
||||
"""Test preload_core_config raises when multiple platforms are specified."""
|
||||
config = {
|
||||
CONF_ESPHOME: {
|
||||
CONF_NAME: "test_device",
|
||||
},
|
||||
"esp32": {},
|
||||
"esp8266": {},
|
||||
}
|
||||
result = {}
|
||||
|
||||
# Mock _is_target_platform to avoid expensive component loading
|
||||
with patch("esphome.core.config._is_target_platform") as mock_is_platform:
|
||||
# Return True for known platforms
|
||||
mock_is_platform.side_effect = lambda name: name in [
|
||||
"esp32",
|
||||
"esp8266",
|
||||
"rp2040",
|
||||
]
|
||||
|
||||
with pytest.raises(cv.Invalid, match="Found multiple target platform blocks"):
|
||||
preload_core_config(config, result)
|
||||
|
||||
|
||||
def test_include_file_header(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None:
|
||||
"""Test include_file adds include statement for header files."""
|
||||
src_file = tmp_path / "source.h"
|
||||
src_file.write_text("// Header content")
|
||||
|
||||
CORE.build_path = str(tmp_path / "build")
|
||||
|
||||
with patch("esphome.core.config.cg") as mock_cg:
|
||||
# Mock RawStatement to capture the text
|
||||
mock_raw_statement = MagicMock()
|
||||
mock_raw_statement.text = ""
|
||||
|
||||
def raw_statement_side_effect(text):
|
||||
mock_raw_statement.text = text
|
||||
return mock_raw_statement
|
||||
|
||||
mock_cg.RawStatement.side_effect = raw_statement_side_effect
|
||||
|
||||
config.include_file(str(src_file), "test.h")
|
||||
|
||||
mock_copy_file_if_changed.assert_called_once()
|
||||
mock_cg.add_global.assert_called_once()
|
||||
# Check that include statement was added
|
||||
assert '#include "test.h"' in mock_raw_statement.text
|
||||
|
||||
|
||||
def test_include_file_cpp(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None:
|
||||
"""Test include_file does not add include for cpp files."""
|
||||
src_file = tmp_path / "source.cpp"
|
||||
src_file.write_text("// CPP content")
|
||||
|
||||
CORE.build_path = str(tmp_path / "build")
|
||||
|
||||
with patch("esphome.core.config.cg") as mock_cg:
|
||||
config.include_file(str(src_file), "test.cpp")
|
||||
|
||||
mock_copy_file_if_changed.assert_called_once()
|
||||
# Should not add include statement for .cpp files
|
||||
mock_cg.add_global.assert_not_called()
|
||||
|
||||
|
||||
def test_get_usable_cpu_count() -> None:
|
||||
"""Test get_usable_cpu_count returns CPU count."""
|
||||
count = config.get_usable_cpu_count()
|
||||
assert isinstance(count, int)
|
||||
assert count > 0
|
||||
|
||||
|
||||
def test_get_usable_cpu_count_with_process_cpu_count() -> None:
|
||||
"""Test get_usable_cpu_count uses process_cpu_count when available."""
|
||||
# Test with process_cpu_count (Python 3.13+)
|
||||
# Create a mock os module with process_cpu_count
|
||||
|
||||
mock_os = types.SimpleNamespace(process_cpu_count=lambda: 8, cpu_count=lambda: 4)
|
||||
|
||||
with patch("esphome.core.config.os", mock_os):
|
||||
# When process_cpu_count exists, it should be used
|
||||
count = config.get_usable_cpu_count()
|
||||
assert count == 8
|
||||
|
||||
# Test fallback to cpu_count when process_cpu_count not available
|
||||
mock_os_no_process = types.SimpleNamespace(cpu_count=lambda: 4)
|
||||
|
||||
with patch("esphome.core.config.os", mock_os_no_process):
|
||||
count = config.get_usable_cpu_count()
|
||||
assert count == 4
|
||||
|
||||
|
||||
def test_list_target_platforms(tmp_path: Path) -> None:
|
||||
"""Test _list_target_platforms returns available platforms."""
|
||||
# Create mock components directory structure
|
||||
components_dir = tmp_path / "components"
|
||||
components_dir.mkdir()
|
||||
|
||||
# Create platform and non-platform directories with __init__.py
|
||||
platforms = ["esp32", "esp8266", "rp2040", "libretiny", "host"]
|
||||
non_platforms = ["sensor"]
|
||||
|
||||
for component in platforms + non_platforms:
|
||||
component_dir = components_dir / component
|
||||
component_dir.mkdir()
|
||||
(component_dir / "__init__.py").touch()
|
||||
|
||||
# Create a file (not a directory)
|
||||
(components_dir / "README.md").touch()
|
||||
|
||||
# Create a directory without __init__.py
|
||||
(components_dir / "no_init").mkdir()
|
||||
|
||||
# Mock Path(__file__).parents[1] to return our tmp_path
|
||||
with patch("esphome.core.config.Path") as mock_path:
|
||||
mock_file_path = MagicMock()
|
||||
mock_file_path.parents = [MagicMock(), tmp_path]
|
||||
mock_path.return_value = mock_file_path
|
||||
|
||||
platforms = config._list_target_platforms()
|
||||
|
||||
assert isinstance(platforms, list)
|
||||
# Should include platform components
|
||||
assert "esp32" in platforms
|
||||
assert "esp8266" in platforms
|
||||
assert "rp2040" in platforms
|
||||
assert "libretiny" in platforms
|
||||
assert "host" in platforms
|
||||
# Should not include non-platform components
|
||||
assert "sensor" not in platforms
|
||||
assert "README.md" not in platforms
|
||||
assert "no_init" not in platforms
|
||||
|
||||
|
||||
def test_is_target_platform() -> None:
|
||||
"""Test _is_target_platform identifies valid platforms."""
|
||||
assert config._is_target_platform("esp32") is True
|
||||
assert config._is_target_platform("esp8266") is True
|
||||
assert config._is_target_platform("rp2040") is True
|
||||
assert config._is_target_platform("invalid_platform") is False
|
||||
assert config._is_target_platform("api") is False # Component but not platform
|
||||
|
||||
187
tests/unit_tests/test_config_validation_paths.py
Normal file
187
tests/unit_tests/test_config_validation_paths.py
Normal file
@@ -0,0 +1,187 @@
|
||||
"""Tests for config_validation.py path-related functions."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import voluptuous as vol
|
||||
|
||||
from esphome import config_validation as cv
|
||||
|
||||
|
||||
def test_directory_valid_path(setup_core: Path) -> None:
|
||||
"""Test directory validator with valid directory."""
|
||||
test_dir = setup_core / "test_directory"
|
||||
test_dir.mkdir()
|
||||
|
||||
result = cv.directory("test_directory")
|
||||
|
||||
assert result == "test_directory"
|
||||
|
||||
|
||||
def test_directory_absolute_path(setup_core: Path) -> None:
|
||||
"""Test directory validator with absolute path."""
|
||||
test_dir = setup_core / "test_directory"
|
||||
test_dir.mkdir()
|
||||
|
||||
result = cv.directory(str(test_dir))
|
||||
|
||||
assert result == str(test_dir)
|
||||
|
||||
|
||||
def test_directory_nonexistent_path(setup_core: Path) -> None:
|
||||
"""Test directory validator raises error for non-existent directory."""
|
||||
with pytest.raises(
|
||||
vol.Invalid, match="Could not find directory.*nonexistent_directory"
|
||||
):
|
||||
cv.directory("nonexistent_directory")
|
||||
|
||||
|
||||
def test_directory_file_instead_of_directory(setup_core: Path) -> None:
|
||||
"""Test directory validator raises error when path is a file."""
|
||||
test_file = setup_core / "test_file.txt"
|
||||
test_file.write_text("content")
|
||||
|
||||
with pytest.raises(vol.Invalid, match="is not a directory"):
|
||||
cv.directory("test_file.txt")
|
||||
|
||||
|
||||
def test_directory_with_parent_directory(setup_core: Path) -> None:
|
||||
"""Test directory validator with nested directory structure."""
|
||||
nested_dir = setup_core / "parent" / "child" / "grandchild"
|
||||
nested_dir.mkdir(parents=True)
|
||||
|
||||
result = cv.directory("parent/child/grandchild")
|
||||
|
||||
assert result == "parent/child/grandchild"
|
||||
|
||||
|
||||
def test_file_valid_path(setup_core: Path) -> None:
|
||||
"""Test file_ validator with valid file."""
|
||||
test_file = setup_core / "test_file.yaml"
|
||||
test_file.write_text("test content")
|
||||
|
||||
result = cv.file_("test_file.yaml")
|
||||
|
||||
assert result == "test_file.yaml"
|
||||
|
||||
|
||||
def test_file_absolute_path(setup_core: Path) -> None:
|
||||
"""Test file_ validator with absolute path."""
|
||||
test_file = setup_core / "test_file.yaml"
|
||||
test_file.write_text("test content")
|
||||
|
||||
result = cv.file_(str(test_file))
|
||||
|
||||
assert result == str(test_file)
|
||||
|
||||
|
||||
def test_file_nonexistent_path(setup_core: Path) -> None:
|
||||
"""Test file_ validator raises error for non-existent file."""
|
||||
with pytest.raises(vol.Invalid, match="Could not find file.*nonexistent_file.yaml"):
|
||||
cv.file_("nonexistent_file.yaml")
|
||||
|
||||
|
||||
def test_file_directory_instead_of_file(setup_core: Path) -> None:
|
||||
"""Test file_ validator raises error when path is a directory."""
|
||||
test_dir = setup_core / "test_directory"
|
||||
test_dir.mkdir()
|
||||
|
||||
with pytest.raises(vol.Invalid, match="is not a file"):
|
||||
cv.file_("test_directory")
|
||||
|
||||
|
||||
def test_file_with_parent_directory(setup_core: Path) -> None:
|
||||
"""Test file_ validator with file in nested directory."""
|
||||
nested_dir = setup_core / "configs" / "sensors"
|
||||
nested_dir.mkdir(parents=True)
|
||||
test_file = nested_dir / "temperature.yaml"
|
||||
test_file.write_text("sensor config")
|
||||
|
||||
result = cv.file_("configs/sensors/temperature.yaml")
|
||||
|
||||
assert result == "configs/sensors/temperature.yaml"
|
||||
|
||||
|
||||
def test_directory_handles_trailing_slash(setup_core: Path) -> None:
|
||||
"""Test directory validator handles trailing slashes correctly."""
|
||||
test_dir = setup_core / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
result = cv.directory("test_dir/")
|
||||
assert result == "test_dir/"
|
||||
|
||||
result = cv.directory("test_dir")
|
||||
assert result == "test_dir"
|
||||
|
||||
|
||||
def test_file_handles_various_extensions(setup_core: Path) -> None:
|
||||
"""Test file_ validator works with different file extensions."""
|
||||
yaml_file = setup_core / "config.yaml"
|
||||
yaml_file.write_text("yaml content")
|
||||
assert cv.file_("config.yaml") == "config.yaml"
|
||||
|
||||
yml_file = setup_core / "config.yml"
|
||||
yml_file.write_text("yml content")
|
||||
assert cv.file_("config.yml") == "config.yml"
|
||||
|
||||
txt_file = setup_core / "readme.txt"
|
||||
txt_file.write_text("text content")
|
||||
assert cv.file_("readme.txt") == "readme.txt"
|
||||
|
||||
no_ext_file = setup_core / "LICENSE"
|
||||
no_ext_file.write_text("license content")
|
||||
assert cv.file_("LICENSE") == "LICENSE"
|
||||
|
||||
|
||||
def test_directory_with_symlink(setup_core: Path) -> None:
|
||||
"""Test directory validator follows symlinks."""
|
||||
actual_dir = setup_core / "actual_directory"
|
||||
actual_dir.mkdir()
|
||||
|
||||
symlink_dir = setup_core / "symlink_directory"
|
||||
symlink_dir.symlink_to(actual_dir)
|
||||
|
||||
result = cv.directory("symlink_directory")
|
||||
assert result == "symlink_directory"
|
||||
|
||||
|
||||
def test_file_with_symlink(setup_core: Path) -> None:
|
||||
"""Test file_ validator follows symlinks."""
|
||||
actual_file = setup_core / "actual_file.txt"
|
||||
actual_file.write_text("content")
|
||||
|
||||
symlink_file = setup_core / "symlink_file.txt"
|
||||
symlink_file.symlink_to(actual_file)
|
||||
|
||||
result = cv.file_("symlink_file.txt")
|
||||
assert result == "symlink_file.txt"
|
||||
|
||||
|
||||
def test_directory_error_shows_full_path(setup_core: Path) -> None:
|
||||
"""Test directory validator error message includes full path."""
|
||||
with pytest.raises(vol.Invalid, match=".*missing_dir.*full path:.*"):
|
||||
cv.directory("missing_dir")
|
||||
|
||||
|
||||
def test_file_error_shows_full_path(setup_core: Path) -> None:
|
||||
"""Test file_ validator error message includes full path."""
|
||||
with pytest.raises(vol.Invalid, match=".*missing_file.yaml.*full path:.*"):
|
||||
cv.file_("missing_file.yaml")
|
||||
|
||||
|
||||
def test_directory_with_spaces_in_name(setup_core: Path) -> None:
|
||||
"""Test directory validator handles spaces in directory names."""
|
||||
dir_with_spaces = setup_core / "my test directory"
|
||||
dir_with_spaces.mkdir()
|
||||
|
||||
result = cv.directory("my test directory")
|
||||
assert result == "my test directory"
|
||||
|
||||
|
||||
def test_file_with_spaces_in_name(setup_core: Path) -> None:
|
||||
"""Test file_ validator handles spaces in file names."""
|
||||
file_with_spaces = setup_core / "my test file.yaml"
|
||||
file_with_spaces.write_text("content")
|
||||
|
||||
result = cv.file_("my test file.yaml")
|
||||
assert result == "my test file.yaml"
|
||||
196
tests/unit_tests/test_external_files.py
Normal file
196
tests/unit_tests/test_external_files.py
Normal file
@@ -0,0 +1,196 @@
|
||||
"""Tests for external_files.py functions."""
|
||||
|
||||
from pathlib import Path
|
||||
import time
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from esphome import external_files
|
||||
from esphome.config_validation import Invalid
|
||||
from esphome.core import CORE, TimePeriod
|
||||
|
||||
|
||||
def test_compute_local_file_dir(setup_core: Path) -> None:
|
||||
"""Test compute_local_file_dir creates and returns correct path."""
|
||||
domain = "font"
|
||||
|
||||
result = external_files.compute_local_file_dir(domain)
|
||||
|
||||
assert isinstance(result, Path)
|
||||
assert result == Path(CORE.data_dir) / domain
|
||||
assert result.exists()
|
||||
assert result.is_dir()
|
||||
|
||||
|
||||
def test_compute_local_file_dir_nested(setup_core: Path) -> None:
|
||||
"""Test compute_local_file_dir works with nested domains."""
|
||||
domain = "images/icons"
|
||||
|
||||
result = external_files.compute_local_file_dir(domain)
|
||||
|
||||
assert result == Path(CORE.data_dir) / "images" / "icons"
|
||||
assert result.exists()
|
||||
assert result.is_dir()
|
||||
|
||||
|
||||
def test_is_file_recent_with_recent_file(setup_core: Path) -> None:
|
||||
"""Test is_file_recent returns True for recently created file."""
|
||||
test_file = setup_core / "recent.txt"
|
||||
test_file.write_text("content")
|
||||
|
||||
refresh = TimePeriod(seconds=3600)
|
||||
|
||||
result = external_files.is_file_recent(str(test_file), refresh)
|
||||
|
||||
assert result is True
|
||||
|
||||
|
||||
def test_is_file_recent_with_old_file(setup_core: Path) -> None:
|
||||
"""Test is_file_recent returns False for old file."""
|
||||
test_file = setup_core / "old.txt"
|
||||
test_file.write_text("content")
|
||||
|
||||
old_time = time.time() - 7200
|
||||
|
||||
with patch("os.path.getctime", return_value=old_time):
|
||||
refresh = TimePeriod(seconds=3600)
|
||||
|
||||
result = external_files.is_file_recent(str(test_file), refresh)
|
||||
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_is_file_recent_nonexistent_file(setup_core: Path) -> None:
|
||||
"""Test is_file_recent returns False for non-existent file."""
|
||||
test_file = setup_core / "nonexistent.txt"
|
||||
refresh = TimePeriod(seconds=3600)
|
||||
|
||||
result = external_files.is_file_recent(str(test_file), refresh)
|
||||
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_is_file_recent_with_zero_refresh(setup_core: Path) -> None:
|
||||
"""Test is_file_recent with zero refresh period returns False."""
|
||||
test_file = setup_core / "test.txt"
|
||||
test_file.write_text("content")
|
||||
|
||||
# Mock getctime to return a time 10 seconds ago
|
||||
with patch("os.path.getctime", return_value=time.time() - 10):
|
||||
refresh = TimePeriod(seconds=0)
|
||||
result = external_files.is_file_recent(str(test_file), refresh)
|
||||
assert result is False
|
||||
|
||||
|
||||
@patch("esphome.external_files.requests.head")
|
||||
def test_has_remote_file_changed_not_modified(
|
||||
mock_head: MagicMock, setup_core: Path
|
||||
) -> None:
|
||||
"""Test has_remote_file_changed returns False when file not modified."""
|
||||
test_file = setup_core / "cached.txt"
|
||||
test_file.write_text("cached content")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 304
|
||||
mock_head.return_value = mock_response
|
||||
|
||||
url = "https://example.com/file.txt"
|
||||
result = external_files.has_remote_file_changed(url, str(test_file))
|
||||
|
||||
assert result is False
|
||||
mock_head.assert_called_once()
|
||||
|
||||
call_args = mock_head.call_args
|
||||
headers = call_args[1]["headers"]
|
||||
assert external_files.IF_MODIFIED_SINCE in headers
|
||||
assert external_files.CACHE_CONTROL in headers
|
||||
|
||||
|
||||
@patch("esphome.external_files.requests.head")
|
||||
def test_has_remote_file_changed_modified(
|
||||
mock_head: MagicMock, setup_core: Path
|
||||
) -> None:
|
||||
"""Test has_remote_file_changed returns True when file modified."""
|
||||
test_file = setup_core / "cached.txt"
|
||||
test_file.write_text("cached content")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_head.return_value = mock_response
|
||||
|
||||
url = "https://example.com/file.txt"
|
||||
result = external_files.has_remote_file_changed(url, str(test_file))
|
||||
|
||||
assert result is True
|
||||
|
||||
|
||||
def test_has_remote_file_changed_no_local_file(setup_core: Path) -> None:
|
||||
"""Test has_remote_file_changed returns True when local file doesn't exist."""
|
||||
test_file = setup_core / "nonexistent.txt"
|
||||
|
||||
url = "https://example.com/file.txt"
|
||||
result = external_files.has_remote_file_changed(url, str(test_file))
|
||||
|
||||
assert result is True
|
||||
|
||||
|
||||
@patch("esphome.external_files.requests.head")
|
||||
def test_has_remote_file_changed_network_error(
|
||||
mock_head: MagicMock, setup_core: Path
|
||||
) -> None:
|
||||
"""Test has_remote_file_changed handles network errors gracefully."""
|
||||
test_file = setup_core / "cached.txt"
|
||||
test_file.write_text("cached content")
|
||||
|
||||
mock_head.side_effect = requests.exceptions.RequestException("Network error")
|
||||
|
||||
url = "https://example.com/file.txt"
|
||||
|
||||
with pytest.raises(Invalid, match="Could not check if.*Network error"):
|
||||
external_files.has_remote_file_changed(url, str(test_file))
|
||||
|
||||
|
||||
@patch("esphome.external_files.requests.head")
|
||||
def test_has_remote_file_changed_timeout(
|
||||
mock_head: MagicMock, setup_core: Path
|
||||
) -> None:
|
||||
"""Test has_remote_file_changed respects timeout."""
|
||||
test_file = setup_core / "cached.txt"
|
||||
test_file.write_text("cached content")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 304
|
||||
mock_head.return_value = mock_response
|
||||
|
||||
url = "https://example.com/file.txt"
|
||||
external_files.has_remote_file_changed(url, str(test_file))
|
||||
|
||||
call_args = mock_head.call_args
|
||||
assert call_args[1]["timeout"] == external_files.NETWORK_TIMEOUT
|
||||
|
||||
|
||||
def test_compute_local_file_dir_creates_parent_dirs(setup_core: Path) -> None:
|
||||
"""Test compute_local_file_dir creates parent directories."""
|
||||
domain = "level1/level2/level3/level4"
|
||||
|
||||
result = external_files.compute_local_file_dir(domain)
|
||||
|
||||
assert result.exists()
|
||||
assert result.is_dir()
|
||||
assert result.parent.name == "level3"
|
||||
assert result.parent.parent.name == "level2"
|
||||
assert result.parent.parent.parent.name == "level1"
|
||||
|
||||
|
||||
def test_is_file_recent_handles_float_seconds(setup_core: Path) -> None:
|
||||
"""Test is_file_recent works with float seconds in TimePeriod."""
|
||||
test_file = setup_core / "test.txt"
|
||||
test_file.write_text("content")
|
||||
|
||||
refresh = TimePeriod(seconds=3600.5)
|
||||
|
||||
result = external_files.is_file_recent(str(test_file), refresh)
|
||||
|
||||
assert result is True
|
||||
636
tests/unit_tests/test_platformio_api.py
Normal file
636
tests/unit_tests/test_platformio_api.py
Normal file
@@ -0,0 +1,636 @@
|
||||
"""Tests for platformio_api.py path functions."""
|
||||
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from esphome import platformio_api
|
||||
from esphome.core import CORE, EsphomeError
|
||||
|
||||
|
||||
def test_idedata_firmware_elf_path(setup_core: Path) -> None:
|
||||
"""Test IDEData.firmware_elf_path returns correct path."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
raw_data = {"prog_path": "/path/to/firmware.elf"}
|
||||
idedata = platformio_api.IDEData(raw_data)
|
||||
|
||||
assert idedata.firmware_elf_path == "/path/to/firmware.elf"
|
||||
|
||||
|
||||
def test_idedata_firmware_bin_path(setup_core: Path) -> None:
|
||||
"""Test IDEData.firmware_bin_path returns Path with .bin extension."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
prog_path = str(Path("/path/to/firmware.elf"))
|
||||
raw_data = {"prog_path": prog_path}
|
||||
idedata = platformio_api.IDEData(raw_data)
|
||||
|
||||
result = idedata.firmware_bin_path
|
||||
assert isinstance(result, str)
|
||||
expected = str(Path("/path/to/firmware.bin"))
|
||||
assert result == expected
|
||||
assert result.endswith(".bin")
|
||||
|
||||
|
||||
def test_idedata_firmware_bin_path_preserves_directory(setup_core: Path) -> None:
|
||||
"""Test firmware_bin_path preserves the directory structure."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
prog_path = str(Path("/complex/path/to/build/firmware.elf"))
|
||||
raw_data = {"prog_path": prog_path}
|
||||
idedata = platformio_api.IDEData(raw_data)
|
||||
|
||||
result = idedata.firmware_bin_path
|
||||
expected = str(Path("/complex/path/to/build/firmware.bin"))
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_idedata_extra_flash_images(setup_core: Path) -> None:
|
||||
"""Test IDEData.extra_flash_images returns list of FlashImage objects."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
raw_data = {
|
||||
"prog_path": "/path/to/firmware.elf",
|
||||
"extra": {
|
||||
"flash_images": [
|
||||
{"path": "/path/to/bootloader.bin", "offset": "0x1000"},
|
||||
{"path": "/path/to/partition.bin", "offset": "0x8000"},
|
||||
]
|
||||
},
|
||||
}
|
||||
idedata = platformio_api.IDEData(raw_data)
|
||||
|
||||
images = idedata.extra_flash_images
|
||||
assert len(images) == 2
|
||||
assert all(isinstance(img, platformio_api.FlashImage) for img in images)
|
||||
assert images[0].path == "/path/to/bootloader.bin"
|
||||
assert images[0].offset == "0x1000"
|
||||
assert images[1].path == "/path/to/partition.bin"
|
||||
assert images[1].offset == "0x8000"
|
||||
|
||||
|
||||
def test_idedata_extra_flash_images_empty(setup_core: Path) -> None:
|
||||
"""Test extra_flash_images returns empty list when no extra images."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
raw_data = {"prog_path": "/path/to/firmware.elf", "extra": {"flash_images": []}}
|
||||
idedata = platformio_api.IDEData(raw_data)
|
||||
|
||||
images = idedata.extra_flash_images
|
||||
assert images == []
|
||||
|
||||
|
||||
def test_idedata_cc_path(setup_core: Path) -> None:
|
||||
"""Test IDEData.cc_path returns compiler path."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
raw_data = {
|
||||
"prog_path": "/path/to/firmware.elf",
|
||||
"cc_path": "/Users/test/.platformio/packages/toolchain-xtensa32/bin/xtensa-esp32-elf-gcc",
|
||||
}
|
||||
idedata = platformio_api.IDEData(raw_data)
|
||||
|
||||
assert (
|
||||
idedata.cc_path
|
||||
== "/Users/test/.platformio/packages/toolchain-xtensa32/bin/xtensa-esp32-elf-gcc"
|
||||
)
|
||||
|
||||
|
||||
def test_flash_image_dataclass() -> None:
|
||||
"""Test FlashImage dataclass stores path and offset correctly."""
|
||||
image = platformio_api.FlashImage(path="/path/to/image.bin", offset="0x10000")
|
||||
|
||||
assert image.path == "/path/to/image.bin"
|
||||
assert image.offset == "0x10000"
|
||||
|
||||
|
||||
def test_load_idedata_returns_dict(
|
||||
setup_core: Path, mock_run_platformio_cli_run
|
||||
) -> None:
|
||||
"""Test _load_idedata returns parsed idedata dict when successful."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
|
||||
# Create required files
|
||||
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
|
||||
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
|
||||
platformio_ini.touch()
|
||||
|
||||
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
|
||||
idedata_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
idedata_path.write_text('{"prog_path": "/test/firmware.elf"}')
|
||||
|
||||
mock_run_platformio_cli_run.return_value = '{"prog_path": "/test/firmware.elf"}'
|
||||
|
||||
config = {"name": "test"}
|
||||
result = platformio_api._load_idedata(config)
|
||||
|
||||
assert result is not None
|
||||
assert isinstance(result, dict)
|
||||
assert result["prog_path"] == "/test/firmware.elf"
|
||||
|
||||
|
||||
def test_load_idedata_uses_cache_when_valid(
|
||||
setup_core: Path, mock_run_platformio_cli_run: Mock
|
||||
) -> None:
|
||||
"""Test _load_idedata uses cached data when unchanged."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
|
||||
# Create platformio.ini
|
||||
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
|
||||
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
|
||||
platformio_ini.write_text("content")
|
||||
|
||||
# Create idedata cache file that's newer
|
||||
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
|
||||
idedata_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
idedata_path.write_text('{"prog_path": "/cached/firmware.elf"}')
|
||||
|
||||
# Make idedata newer than platformio.ini
|
||||
platformio_ini_mtime = platformio_ini.stat().st_mtime
|
||||
os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1))
|
||||
|
||||
config = {"name": "test"}
|
||||
result = platformio_api._load_idedata(config)
|
||||
|
||||
# Should not call _run_idedata since cache is valid
|
||||
mock_run_platformio_cli_run.assert_not_called()
|
||||
|
||||
assert result["prog_path"] == "/cached/firmware.elf"
|
||||
|
||||
|
||||
def test_load_idedata_regenerates_when_platformio_ini_newer(
|
||||
setup_core: Path, mock_run_platformio_cli_run: Mock
|
||||
) -> None:
|
||||
"""Test _load_idedata regenerates when platformio.ini is newer."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
|
||||
# Create idedata cache file first
|
||||
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
|
||||
idedata_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
idedata_path.write_text('{"prog_path": "/old/firmware.elf"}')
|
||||
|
||||
# Create platformio.ini that's newer
|
||||
idedata_mtime = idedata_path.stat().st_mtime
|
||||
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
|
||||
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
|
||||
platformio_ini.write_text("content")
|
||||
# Make platformio.ini newer than idedata
|
||||
os.utime(platformio_ini, (idedata_mtime + 1, idedata_mtime + 1))
|
||||
|
||||
# Mock platformio to return new data
|
||||
new_data = {"prog_path": "/new/firmware.elf"}
|
||||
mock_run_platformio_cli_run.return_value = json.dumps(new_data)
|
||||
|
||||
config = {"name": "test"}
|
||||
result = platformio_api._load_idedata(config)
|
||||
|
||||
# Should call _run_idedata since platformio.ini is newer
|
||||
mock_run_platformio_cli_run.assert_called_once()
|
||||
|
||||
assert result["prog_path"] == "/new/firmware.elf"
|
||||
|
||||
|
||||
def test_load_idedata_regenerates_on_corrupted_cache(
|
||||
setup_core: Path, mock_run_platformio_cli_run: Mock
|
||||
) -> None:
|
||||
"""Test _load_idedata regenerates when cache file is corrupted."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
|
||||
# Create platformio.ini
|
||||
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
|
||||
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
|
||||
platformio_ini.write_text("content")
|
||||
|
||||
# Create corrupted idedata cache file
|
||||
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
|
||||
idedata_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
idedata_path.write_text('{"prog_path": invalid json')
|
||||
|
||||
# Make idedata newer so it would be used if valid
|
||||
platformio_ini_mtime = platformio_ini.stat().st_mtime
|
||||
os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1))
|
||||
|
||||
# Mock platformio to return new data
|
||||
new_data = {"prog_path": "/new/firmware.elf"}
|
||||
mock_run_platformio_cli_run.return_value = json.dumps(new_data)
|
||||
|
||||
config = {"name": "test"}
|
||||
result = platformio_api._load_idedata(config)
|
||||
|
||||
# Should call _run_idedata since cache is corrupted
|
||||
mock_run_platformio_cli_run.assert_called_once()
|
||||
|
||||
assert result["prog_path"] == "/new/firmware.elf"
|
||||
|
||||
|
||||
def test_run_idedata_parses_json_from_output(
|
||||
setup_core: Path, mock_run_platformio_cli_run: Mock
|
||||
) -> None:
|
||||
"""Test _run_idedata extracts JSON from platformio output."""
|
||||
config = {"name": "test"}
|
||||
|
||||
expected_data = {
|
||||
"prog_path": "/path/to/firmware.elf",
|
||||
"cc_path": "/path/to/gcc",
|
||||
"extra": {"flash_images": []},
|
||||
}
|
||||
|
||||
# Simulate platformio output with JSON embedded
|
||||
mock_run_platformio_cli_run.return_value = (
|
||||
f"Some preamble\n{json.dumps(expected_data)}\nSome postamble"
|
||||
)
|
||||
|
||||
result = platformio_api._run_idedata(config)
|
||||
|
||||
assert result == expected_data
|
||||
|
||||
|
||||
def test_run_idedata_raises_on_no_json(
|
||||
setup_core: Path, mock_run_platformio_cli_run: Mock
|
||||
) -> None:
|
||||
"""Test _run_idedata raises EsphomeError when no JSON found."""
|
||||
config = {"name": "test"}
|
||||
|
||||
mock_run_platformio_cli_run.return_value = "No JSON in this output"
|
||||
|
||||
with pytest.raises(EsphomeError):
|
||||
platformio_api._run_idedata(config)
|
||||
|
||||
|
||||
def test_run_idedata_raises_on_invalid_json(
|
||||
setup_core: Path, mock_run_platformio_cli_run: Mock
|
||||
) -> None:
|
||||
"""Test _run_idedata raises on malformed JSON."""
|
||||
config = {"name": "test"}
|
||||
mock_run_platformio_cli_run.return_value = '{"invalid": json"}'
|
||||
|
||||
# The ValueError from json.loads is re-raised
|
||||
with pytest.raises(ValueError):
|
||||
platformio_api._run_idedata(config)
|
||||
|
||||
|
||||
def test_run_platformio_cli_sets_environment_variables(
|
||||
setup_core: Path, mock_run_external_command: Mock
|
||||
) -> None:
|
||||
"""Test run_platformio_cli sets correct environment variables."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
|
||||
with patch.dict(os.environ, {}, clear=False):
|
||||
mock_run_external_command.return_value = 0
|
||||
platformio_api.run_platformio_cli("test", "arg")
|
||||
|
||||
# Check environment variables were set
|
||||
assert os.environ["PLATFORMIO_FORCE_COLOR"] == "true"
|
||||
assert (
|
||||
setup_core / "build" / "test"
|
||||
in Path(os.environ["PLATFORMIO_BUILD_DIR"]).parents
|
||||
or Path(os.environ["PLATFORMIO_BUILD_DIR"]) == setup_core / "build" / "test"
|
||||
)
|
||||
assert "PLATFORMIO_LIBDEPS_DIR" in os.environ
|
||||
assert "PYTHONWARNINGS" in os.environ
|
||||
|
||||
# Check command was called correctly
|
||||
mock_run_external_command.assert_called_once()
|
||||
args = mock_run_external_command.call_args[0]
|
||||
assert "platformio" in args
|
||||
assert "test" in args
|
||||
assert "arg" in args
|
||||
|
||||
|
||||
def test_run_platformio_cli_run_builds_command(
|
||||
setup_core: Path, mock_run_platformio_cli: Mock
|
||||
) -> None:
|
||||
"""Test run_platformio_cli_run builds correct command."""
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
mock_run_platformio_cli.return_value = 0
|
||||
|
||||
config = {"name": "test"}
|
||||
platformio_api.run_platformio_cli_run(config, True, "extra", "args")
|
||||
|
||||
mock_run_platformio_cli.assert_called_once_with(
|
||||
"run", "-d", CORE.build_path, "-v", "extra", "args"
|
||||
)
|
||||
|
||||
|
||||
def test_run_compile(setup_core: Path, mock_run_platformio_cli_run: Mock) -> None:
|
||||
"""Test run_compile with process limit."""
|
||||
from esphome.const import CONF_COMPILE_PROCESS_LIMIT, CONF_ESPHOME
|
||||
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
config = {CONF_ESPHOME: {CONF_COMPILE_PROCESS_LIMIT: 4}}
|
||||
mock_run_platformio_cli_run.return_value = 0
|
||||
|
||||
platformio_api.run_compile(config, verbose=True)
|
||||
|
||||
mock_run_platformio_cli_run.assert_called_once_with(config, True, "-j4")
|
||||
|
||||
|
||||
def test_get_idedata_caches_result(
|
||||
setup_core: Path, mock_run_platformio_cli_run: Mock
|
||||
) -> None:
|
||||
"""Test get_idedata caches result in CORE.data."""
|
||||
from esphome.const import KEY_CORE
|
||||
|
||||
CORE.build_path = str(setup_core / "build" / "test")
|
||||
CORE.name = "test"
|
||||
CORE.data[KEY_CORE] = {}
|
||||
|
||||
# Create platformio.ini to avoid regeneration
|
||||
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
|
||||
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
|
||||
platformio_ini.write_text("content")
|
||||
|
||||
# Mock platformio to return data
|
||||
idedata = {"prog_path": "/test/firmware.elf"}
|
||||
mock_run_platformio_cli_run.return_value = json.dumps(idedata)
|
||||
|
||||
config = {"name": "test"}
|
||||
|
||||
# First call should load and cache
|
||||
result1 = platformio_api.get_idedata(config)
|
||||
mock_run_platformio_cli_run.assert_called_once()
|
||||
|
||||
# Second call should use cache from CORE.data
|
||||
result2 = platformio_api.get_idedata(config)
|
||||
mock_run_platformio_cli_run.assert_called_once() # Still only called once
|
||||
|
||||
assert result1 is result2
|
||||
assert isinstance(result1, platformio_api.IDEData)
|
||||
assert result1.firmware_elf_path == "/test/firmware.elf"
|
||||
|
||||
|
||||
def test_idedata_addr2line_path_windows(setup_core: Path) -> None:
|
||||
"""Test IDEData.addr2line_path on Windows."""
|
||||
raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "C:\\tools\\gcc.exe"}
|
||||
idedata = platformio_api.IDEData(raw_data)
|
||||
|
||||
result = idedata.addr2line_path
|
||||
assert result == "C:\\tools\\addr2line.exe"
|
||||
|
||||
|
||||
def test_idedata_addr2line_path_unix(setup_core: Path) -> None:
|
||||
"""Test IDEData.addr2line_path on Unix."""
|
||||
raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "/usr/bin/gcc"}
|
||||
idedata = platformio_api.IDEData(raw_data)
|
||||
|
||||
result = idedata.addr2line_path
|
||||
assert result == "/usr/bin/addr2line"
|
||||
|
||||
|
||||
def test_patch_structhash(setup_core: Path) -> None:
|
||||
"""Test patch_structhash monkey patches platformio functions."""
|
||||
# Create simple namespace objects to act as modules
|
||||
mock_cli = SimpleNamespace()
|
||||
mock_helpers = SimpleNamespace()
|
||||
mock_run = SimpleNamespace(cli=mock_cli, helpers=mock_helpers)
|
||||
|
||||
# Mock platformio modules
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"platformio.run.cli": mock_cli,
|
||||
"platformio.run.helpers": mock_helpers,
|
||||
"platformio.run": mock_run,
|
||||
"platformio.project.helpers": MagicMock(),
|
||||
"platformio.fs": MagicMock(),
|
||||
"platformio": MagicMock(),
|
||||
},
|
||||
):
|
||||
# Call patch_structhash
|
||||
platformio_api.patch_structhash()
|
||||
|
||||
# Verify both modules had clean_build_dir patched
|
||||
# Check that clean_build_dir was set on both modules
|
||||
assert hasattr(mock_cli, "clean_build_dir")
|
||||
assert hasattr(mock_helpers, "clean_build_dir")
|
||||
|
||||
# Verify they got the same function assigned
|
||||
assert mock_cli.clean_build_dir is mock_helpers.clean_build_dir
|
||||
|
||||
# Verify it's a real function (not a Mock)
|
||||
assert callable(mock_cli.clean_build_dir)
|
||||
assert mock_cli.clean_build_dir.__name__ == "patched_clean_build_dir"
|
||||
|
||||
|
||||
def test_patched_clean_build_dir_removes_outdated(setup_core: Path) -> None:
|
||||
"""Test patched_clean_build_dir removes build dir when platformio.ini is newer."""
|
||||
build_dir = setup_core / "build"
|
||||
build_dir.mkdir()
|
||||
platformio_ini = setup_core / "platformio.ini"
|
||||
platformio_ini.write_text("config")
|
||||
|
||||
# Make platformio.ini newer than build_dir
|
||||
build_mtime = build_dir.stat().st_mtime
|
||||
os.utime(platformio_ini, (build_mtime + 1, build_mtime + 1))
|
||||
|
||||
# Track if directory was removed
|
||||
removed_paths: list[str] = []
|
||||
|
||||
def track_rmtree(path: str) -> None:
|
||||
removed_paths.append(path)
|
||||
shutil.rmtree(path)
|
||||
|
||||
# Create mock modules that patch_structhash expects
|
||||
mock_cli = SimpleNamespace()
|
||||
mock_helpers = SimpleNamespace()
|
||||
mock_project_helpers = MagicMock()
|
||||
mock_project_helpers.get_project_dir.return_value = str(setup_core)
|
||||
mock_fs = SimpleNamespace(rmtree=track_rmtree)
|
||||
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"platformio": SimpleNamespace(fs=mock_fs),
|
||||
"platformio.fs": mock_fs,
|
||||
"platformio.project.helpers": mock_project_helpers,
|
||||
"platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers),
|
||||
"platformio.run.cli": mock_cli,
|
||||
"platformio.run.helpers": mock_helpers,
|
||||
},
|
||||
):
|
||||
# Call patch_structhash to install the patched function
|
||||
platformio_api.patch_structhash()
|
||||
|
||||
# Call the patched function
|
||||
mock_helpers.clean_build_dir(str(build_dir), [])
|
||||
|
||||
# Verify directory was removed and recreated
|
||||
assert len(removed_paths) == 1
|
||||
assert removed_paths[0] == str(build_dir)
|
||||
assert build_dir.exists() # makedirs recreated it
|
||||
|
||||
|
||||
def test_patched_clean_build_dir_keeps_updated(setup_core: Path) -> None:
|
||||
"""Test patched_clean_build_dir keeps build dir when it's up to date."""
|
||||
build_dir = setup_core / "build"
|
||||
build_dir.mkdir()
|
||||
test_file = build_dir / "test.txt"
|
||||
test_file.write_text("test content")
|
||||
|
||||
platformio_ini = setup_core / "platformio.ini"
|
||||
platformio_ini.write_text("config")
|
||||
|
||||
# Make build_dir newer than platformio.ini
|
||||
ini_mtime = platformio_ini.stat().st_mtime
|
||||
os.utime(build_dir, (ini_mtime + 1, ini_mtime + 1))
|
||||
|
||||
# Track if rmtree is called
|
||||
removed_paths: list[str] = []
|
||||
|
||||
def track_rmtree(path: str) -> None:
|
||||
removed_paths.append(path)
|
||||
|
||||
# Create mock modules
|
||||
mock_cli = SimpleNamespace()
|
||||
mock_helpers = SimpleNamespace()
|
||||
mock_project_helpers = MagicMock()
|
||||
mock_project_helpers.get_project_dir.return_value = str(setup_core)
|
||||
mock_fs = SimpleNamespace(rmtree=track_rmtree)
|
||||
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"platformio": SimpleNamespace(fs=mock_fs),
|
||||
"platformio.fs": mock_fs,
|
||||
"platformio.project.helpers": mock_project_helpers,
|
||||
"platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers),
|
||||
"platformio.run.cli": mock_cli,
|
||||
"platformio.run.helpers": mock_helpers,
|
||||
},
|
||||
):
|
||||
# Call patch_structhash to install the patched function
|
||||
platformio_api.patch_structhash()
|
||||
|
||||
# Call the patched function
|
||||
mock_helpers.clean_build_dir(str(build_dir), [])
|
||||
|
||||
# Verify rmtree was NOT called
|
||||
assert len(removed_paths) == 0
|
||||
|
||||
# Verify directory and file still exist
|
||||
assert build_dir.exists()
|
||||
assert test_file.exists()
|
||||
assert test_file.read_text() == "test content"
|
||||
|
||||
|
||||
def test_patched_clean_build_dir_creates_missing(setup_core: Path) -> None:
|
||||
"""Test patched_clean_build_dir creates build dir when it doesn't exist."""
|
||||
build_dir = setup_core / "build"
|
||||
platformio_ini = setup_core / "platformio.ini"
|
||||
platformio_ini.write_text("config")
|
||||
|
||||
# Ensure build_dir doesn't exist
|
||||
assert not build_dir.exists()
|
||||
|
||||
# Track if rmtree is called
|
||||
removed_paths: list[str] = []
|
||||
|
||||
def track_rmtree(path: str) -> None:
|
||||
removed_paths.append(path)
|
||||
|
||||
# Create mock modules
|
||||
mock_cli = SimpleNamespace()
|
||||
mock_helpers = SimpleNamespace()
|
||||
mock_project_helpers = MagicMock()
|
||||
mock_project_helpers.get_project_dir.return_value = str(setup_core)
|
||||
mock_fs = SimpleNamespace(rmtree=track_rmtree)
|
||||
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"platformio": SimpleNamespace(fs=mock_fs),
|
||||
"platformio.fs": mock_fs,
|
||||
"platformio.project.helpers": mock_project_helpers,
|
||||
"platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers),
|
||||
"platformio.run.cli": mock_cli,
|
||||
"platformio.run.helpers": mock_helpers,
|
||||
},
|
||||
):
|
||||
# Call patch_structhash to install the patched function
|
||||
platformio_api.patch_structhash()
|
||||
|
||||
# Call the patched function
|
||||
mock_helpers.clean_build_dir(str(build_dir), [])
|
||||
|
||||
# Verify rmtree was NOT called
|
||||
assert len(removed_paths) == 0
|
||||
|
||||
# Verify directory was created
|
||||
assert build_dir.exists()
|
||||
|
||||
|
||||
def test_process_stacktrace_esp8266_exception(setup_core: Path, caplog) -> None:
|
||||
"""Test process_stacktrace handles ESP8266 exceptions."""
|
||||
config = {"name": "test"}
|
||||
|
||||
# Test exception type parsing
|
||||
line = "Exception (28):"
|
||||
backtrace_state = False
|
||||
|
||||
result = platformio_api.process_stacktrace(config, line, backtrace_state)
|
||||
|
||||
assert "Access to invalid address: LOAD (wild pointer?)" in caplog.text
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_process_stacktrace_esp8266_backtrace(
|
||||
setup_core: Path, mock_decode_pc: Mock
|
||||
) -> None:
|
||||
"""Test process_stacktrace handles ESP8266 multi-line backtrace."""
|
||||
config = {"name": "test"}
|
||||
|
||||
# Start of backtrace
|
||||
line1 = ">>>stack>>>"
|
||||
state = platformio_api.process_stacktrace(config, line1, False)
|
||||
assert state is True
|
||||
|
||||
# Backtrace content with addresses
|
||||
line2 = "40201234 40205678"
|
||||
state = platformio_api.process_stacktrace(config, line2, state)
|
||||
assert state is True
|
||||
assert mock_decode_pc.call_count == 2
|
||||
|
||||
# End of backtrace
|
||||
line3 = "<<<stack<<<"
|
||||
state = platformio_api.process_stacktrace(config, line3, state)
|
||||
assert state is False
|
||||
|
||||
|
||||
def test_process_stacktrace_esp32_backtrace(
|
||||
setup_core: Path, mock_decode_pc: Mock
|
||||
) -> None:
|
||||
"""Test process_stacktrace handles ESP32 single-line backtrace."""
|
||||
config = {"name": "test"}
|
||||
|
||||
line = "Backtrace: 0x40081234:0x3ffb1234 0x40085678:0x3ffb5678"
|
||||
state = platformio_api.process_stacktrace(config, line, False)
|
||||
|
||||
# Should decode both addresses
|
||||
assert mock_decode_pc.call_count == 2
|
||||
mock_decode_pc.assert_any_call(config, "40081234")
|
||||
mock_decode_pc.assert_any_call(config, "40085678")
|
||||
assert state is False
|
||||
|
||||
|
||||
def test_process_stacktrace_bad_alloc(
|
||||
setup_core: Path, mock_decode_pc: Mock, caplog
|
||||
) -> None:
|
||||
"""Test process_stacktrace handles bad alloc messages."""
|
||||
config = {"name": "test"}
|
||||
|
||||
line = "last failed alloc call: 40201234(512)"
|
||||
state = platformio_api.process_stacktrace(config, line, False)
|
||||
|
||||
assert "Memory allocation of 512 bytes failed at 40201234" in caplog.text
|
||||
mock_decode_pc.assert_called_once_with(config, "40201234")
|
||||
assert state is False
|
||||
660
tests/unit_tests/test_storage_json.py
Normal file
660
tests/unit_tests/test_storage_json.py
Normal file
@@ -0,0 +1,660 @@
|
||||
"""Tests for storage_json.py path functions."""
|
||||
|
||||
from datetime import datetime
|
||||
import json
|
||||
from pathlib import Path
|
||||
import sys
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from esphome import storage_json
|
||||
from esphome.const import CONF_DISABLED, CONF_MDNS
|
||||
from esphome.core import CORE
|
||||
|
||||
|
||||
def test_storage_path(setup_core: Path) -> None:
|
||||
"""Test storage_path returns correct path for current config."""
|
||||
CORE.config_path = str(setup_core / "my_device.yaml")
|
||||
|
||||
result = storage_json.storage_path()
|
||||
|
||||
data_dir = Path(CORE.data_dir)
|
||||
expected = str(data_dir / "storage" / "my_device.yaml.json")
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_ext_storage_path(setup_core: Path) -> None:
|
||||
"""Test ext_storage_path returns correct path for given filename."""
|
||||
result = storage_json.ext_storage_path("other_device.yaml")
|
||||
|
||||
data_dir = Path(CORE.data_dir)
|
||||
expected = str(data_dir / "storage" / "other_device.yaml.json")
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_ext_storage_path_handles_various_extensions(setup_core: Path) -> None:
|
||||
"""Test ext_storage_path works with different file extensions."""
|
||||
result_yml = storage_json.ext_storage_path("device.yml")
|
||||
assert result_yml.endswith("device.yml.json")
|
||||
|
||||
result_no_ext = storage_json.ext_storage_path("device")
|
||||
assert result_no_ext.endswith("device.json")
|
||||
|
||||
result_path = storage_json.ext_storage_path("my/device.yaml")
|
||||
assert result_path.endswith("device.yaml.json")
|
||||
|
||||
|
||||
def test_esphome_storage_path(setup_core: Path) -> None:
|
||||
"""Test esphome_storage_path returns correct path."""
|
||||
result = storage_json.esphome_storage_path()
|
||||
|
||||
data_dir = Path(CORE.data_dir)
|
||||
expected = str(data_dir / "esphome.json")
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_ignored_devices_storage_path(setup_core: Path) -> None:
|
||||
"""Test ignored_devices_storage_path returns correct path."""
|
||||
result = storage_json.ignored_devices_storage_path()
|
||||
|
||||
data_dir = Path(CORE.data_dir)
|
||||
expected = str(data_dir / "ignored-devices.json")
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_trash_storage_path(setup_core: Path) -> None:
|
||||
"""Test trash_storage_path returns correct path."""
|
||||
CORE.config_path = str(setup_core / "configs" / "device.yaml")
|
||||
|
||||
result = storage_json.trash_storage_path()
|
||||
|
||||
expected = str(setup_core / "configs" / "trash")
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_archive_storage_path(setup_core: Path) -> None:
|
||||
"""Test archive_storage_path returns correct path."""
|
||||
CORE.config_path = str(setup_core / "configs" / "device.yaml")
|
||||
|
||||
result = storage_json.archive_storage_path()
|
||||
|
||||
expected = str(setup_core / "configs" / "archive")
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_storage_path_with_subdirectory(setup_core: Path) -> None:
|
||||
"""Test storage paths work correctly when config is in subdirectory."""
|
||||
subdir = setup_core / "configs" / "basement"
|
||||
subdir.mkdir(parents=True, exist_ok=True)
|
||||
CORE.config_path = str(subdir / "sensor.yaml")
|
||||
|
||||
result = storage_json.storage_path()
|
||||
|
||||
data_dir = Path(CORE.data_dir)
|
||||
expected = str(data_dir / "storage" / "sensor.yaml.json")
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_storage_json_firmware_bin_path_property(setup_core: Path) -> None:
|
||||
"""Test StorageJSON firmware_bin_path property."""
|
||||
storage = storage_json.StorageJSON(
|
||||
storage_version=1,
|
||||
name="test_device",
|
||||
friendly_name="Test Device",
|
||||
comment=None,
|
||||
esphome_version="2024.1.0",
|
||||
src_version=None,
|
||||
address="192.168.1.100",
|
||||
web_port=80,
|
||||
target_platform="ESP32",
|
||||
build_path="build/test_device",
|
||||
firmware_bin_path="/path/to/firmware.bin",
|
||||
loaded_integrations={"wifi", "api"},
|
||||
loaded_platforms=set(),
|
||||
no_mdns=False,
|
||||
)
|
||||
|
||||
assert storage.firmware_bin_path == "/path/to/firmware.bin"
|
||||
|
||||
|
||||
def test_storage_json_save_creates_directory(
|
||||
setup_core: Path, tmp_path: Path, mock_write_file_if_changed: Mock
|
||||
) -> None:
|
||||
"""Test StorageJSON.save creates storage directory if it doesn't exist."""
|
||||
storage_dir = tmp_path / "new_data" / "storage"
|
||||
storage_file = storage_dir / "test.json"
|
||||
|
||||
assert not storage_dir.exists()
|
||||
|
||||
storage = storage_json.StorageJSON(
|
||||
storage_version=1,
|
||||
name="test",
|
||||
friendly_name="Test",
|
||||
comment=None,
|
||||
esphome_version="2024.1.0",
|
||||
src_version=None,
|
||||
address="test.local",
|
||||
web_port=None,
|
||||
target_platform="ESP8266",
|
||||
build_path=None,
|
||||
firmware_bin_path=None,
|
||||
loaded_integrations=set(),
|
||||
loaded_platforms=set(),
|
||||
no_mdns=False,
|
||||
)
|
||||
|
||||
storage.save(str(storage_file))
|
||||
mock_write_file_if_changed.assert_called_once()
|
||||
call_args = mock_write_file_if_changed.call_args[0]
|
||||
assert call_args[0] == str(storage_file)
|
||||
|
||||
|
||||
def test_storage_json_from_wizard(setup_core: Path) -> None:
|
||||
"""Test StorageJSON.from_wizard creates correct storage object."""
|
||||
storage = storage_json.StorageJSON.from_wizard(
|
||||
name="my_device",
|
||||
friendly_name="My Device",
|
||||
address="my_device.local",
|
||||
platform="ESP32",
|
||||
)
|
||||
|
||||
assert storage.name == "my_device"
|
||||
assert storage.friendly_name == "My Device"
|
||||
assert storage.address == "my_device.local"
|
||||
assert storage.target_platform == "ESP32"
|
||||
assert storage.build_path is None
|
||||
assert storage.firmware_bin_path is None
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform == "win32", reason="HA addons don't run on Windows")
|
||||
@patch("esphome.core.is_ha_addon")
|
||||
def test_storage_paths_with_ha_addon(mock_is_ha_addon: bool, tmp_path: Path) -> None:
|
||||
"""Test storage paths when running as Home Assistant addon."""
|
||||
mock_is_ha_addon.return_value = True
|
||||
|
||||
CORE.config_path = str(tmp_path / "test.yaml")
|
||||
|
||||
result = storage_json.storage_path()
|
||||
# When is_ha_addon is True, CORE.data_dir returns "/data"
|
||||
# This is the standard mount point for HA addon containers
|
||||
expected = str(Path("/data") / "storage" / "test.yaml.json")
|
||||
assert result == expected
|
||||
|
||||
result = storage_json.esphome_storage_path()
|
||||
expected = str(Path("/data") / "esphome.json")
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_storage_json_as_dict() -> None:
|
||||
"""Test StorageJSON.as_dict returns correct dictionary."""
|
||||
storage = storage_json.StorageJSON(
|
||||
storage_version=1,
|
||||
name="test_device",
|
||||
friendly_name="Test Device",
|
||||
comment="Test comment",
|
||||
esphome_version="2024.1.0",
|
||||
src_version=1,
|
||||
address="192.168.1.100",
|
||||
web_port=80,
|
||||
target_platform="ESP32",
|
||||
build_path="/path/to/build",
|
||||
firmware_bin_path="/path/to/firmware.bin",
|
||||
loaded_integrations={"wifi", "api", "ota"},
|
||||
loaded_platforms={"sensor", "binary_sensor"},
|
||||
no_mdns=True,
|
||||
framework="arduino",
|
||||
core_platform="esp32",
|
||||
)
|
||||
|
||||
result = storage.as_dict()
|
||||
|
||||
assert result["storage_version"] == 1
|
||||
assert result["name"] == "test_device"
|
||||
assert result["friendly_name"] == "Test Device"
|
||||
assert result["comment"] == "Test comment"
|
||||
assert result["esphome_version"] == "2024.1.0"
|
||||
assert result["src_version"] == 1
|
||||
assert result["address"] == "192.168.1.100"
|
||||
assert result["web_port"] == 80
|
||||
assert result["esp_platform"] == "ESP32"
|
||||
assert result["build_path"] == "/path/to/build"
|
||||
assert result["firmware_bin_path"] == "/path/to/firmware.bin"
|
||||
assert "api" in result["loaded_integrations"]
|
||||
assert "wifi" in result["loaded_integrations"]
|
||||
assert "ota" in result["loaded_integrations"]
|
||||
assert result["loaded_integrations"] == sorted(
|
||||
["wifi", "api", "ota"]
|
||||
) # Should be sorted
|
||||
assert "sensor" in result["loaded_platforms"]
|
||||
assert result["loaded_platforms"] == sorted(
|
||||
["sensor", "binary_sensor"]
|
||||
) # Should be sorted
|
||||
assert result["no_mdns"] is True
|
||||
assert result["framework"] == "arduino"
|
||||
assert result["core_platform"] == "esp32"
|
||||
|
||||
|
||||
def test_storage_json_to_json() -> None:
|
||||
"""Test StorageJSON.to_json returns valid JSON string."""
|
||||
storage = storage_json.StorageJSON(
|
||||
storage_version=1,
|
||||
name="test",
|
||||
friendly_name="Test",
|
||||
comment=None,
|
||||
esphome_version="2024.1.0",
|
||||
src_version=None,
|
||||
address="test.local",
|
||||
web_port=None,
|
||||
target_platform="ESP8266",
|
||||
build_path=None,
|
||||
firmware_bin_path=None,
|
||||
loaded_integrations=set(),
|
||||
loaded_platforms=set(),
|
||||
no_mdns=False,
|
||||
)
|
||||
|
||||
json_str = storage.to_json()
|
||||
|
||||
# Should be valid JSON
|
||||
parsed = json.loads(json_str)
|
||||
assert parsed["name"] == "test"
|
||||
assert parsed["storage_version"] == 1
|
||||
|
||||
# Should end with newline
|
||||
assert json_str.endswith("\n")
|
||||
|
||||
|
||||
def test_storage_json_save(tmp_path: Path) -> None:
|
||||
"""Test StorageJSON.save writes file correctly."""
|
||||
storage = storage_json.StorageJSON(
|
||||
storage_version=1,
|
||||
name="test",
|
||||
friendly_name="Test",
|
||||
comment=None,
|
||||
esphome_version="2024.1.0",
|
||||
src_version=None,
|
||||
address="test.local",
|
||||
web_port=None,
|
||||
target_platform="ESP32",
|
||||
build_path=None,
|
||||
firmware_bin_path=None,
|
||||
loaded_integrations=set(),
|
||||
loaded_platforms=set(),
|
||||
no_mdns=False,
|
||||
)
|
||||
|
||||
save_path = tmp_path / "test.json"
|
||||
|
||||
with patch("esphome.storage_json.write_file_if_changed") as mock_write:
|
||||
storage.save(str(save_path))
|
||||
mock_write.assert_called_once_with(str(save_path), storage.to_json())
|
||||
|
||||
|
||||
def test_storage_json_from_esphome_core(setup_core: Path) -> None:
|
||||
"""Test StorageJSON.from_esphome_core creates correct storage object."""
|
||||
# Mock CORE object
|
||||
mock_core = MagicMock()
|
||||
mock_core.name = "my_device"
|
||||
mock_core.friendly_name = "My Device"
|
||||
mock_core.comment = "A test device"
|
||||
mock_core.address = "192.168.1.50"
|
||||
mock_core.web_port = 8080
|
||||
mock_core.target_platform = "esp32"
|
||||
mock_core.is_esp32 = True
|
||||
mock_core.build_path = "/build/my_device"
|
||||
mock_core.firmware_bin = "/build/my_device/firmware.bin"
|
||||
mock_core.loaded_integrations = {"wifi", "api"}
|
||||
mock_core.loaded_platforms = {"sensor"}
|
||||
mock_core.config = {CONF_MDNS: {CONF_DISABLED: True}}
|
||||
mock_core.target_framework = "esp-idf"
|
||||
|
||||
with patch("esphome.components.esp32.get_esp32_variant") as mock_variant:
|
||||
mock_variant.return_value = "ESP32-C3"
|
||||
|
||||
result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None)
|
||||
|
||||
assert result.name == "my_device"
|
||||
assert result.friendly_name == "My Device"
|
||||
assert result.comment == "A test device"
|
||||
assert result.address == "192.168.1.50"
|
||||
assert result.web_port == 8080
|
||||
assert result.target_platform == "ESP32-C3"
|
||||
assert result.build_path == "/build/my_device"
|
||||
assert result.firmware_bin_path == "/build/my_device/firmware.bin"
|
||||
assert result.loaded_integrations == {"wifi", "api"}
|
||||
assert result.loaded_platforms == {"sensor"}
|
||||
assert result.no_mdns is True
|
||||
assert result.framework == "esp-idf"
|
||||
assert result.core_platform == "esp32"
|
||||
|
||||
|
||||
def test_storage_json_from_esphome_core_mdns_enabled(setup_core: Path) -> None:
|
||||
"""Test from_esphome_core with mDNS enabled."""
|
||||
mock_core = MagicMock()
|
||||
mock_core.name = "test"
|
||||
mock_core.friendly_name = "Test"
|
||||
mock_core.comment = None
|
||||
mock_core.address = "test.local"
|
||||
mock_core.web_port = None
|
||||
mock_core.target_platform = "esp8266"
|
||||
mock_core.is_esp32 = False
|
||||
mock_core.build_path = "/build"
|
||||
mock_core.firmware_bin = "/build/firmware.bin"
|
||||
mock_core.loaded_integrations = set()
|
||||
mock_core.loaded_platforms = set()
|
||||
mock_core.config = {} # No MDNS config means enabled
|
||||
mock_core.target_framework = "arduino"
|
||||
|
||||
result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None)
|
||||
|
||||
assert result.no_mdns is False
|
||||
|
||||
|
||||
def test_storage_json_load_valid_file(tmp_path: Path) -> None:
|
||||
"""Test StorageJSON.load with valid JSON file."""
|
||||
storage_data = {
|
||||
"storage_version": 1,
|
||||
"name": "loaded_device",
|
||||
"friendly_name": "Loaded Device",
|
||||
"comment": "Loaded from file",
|
||||
"esphome_version": "2024.1.0",
|
||||
"src_version": 2,
|
||||
"address": "10.0.0.1",
|
||||
"web_port": 8080,
|
||||
"esp_platform": "ESP32",
|
||||
"build_path": "/loaded/build",
|
||||
"firmware_bin_path": "/loaded/firmware.bin",
|
||||
"loaded_integrations": ["wifi", "api"],
|
||||
"loaded_platforms": ["sensor"],
|
||||
"no_mdns": True,
|
||||
"framework": "arduino",
|
||||
"core_platform": "esp32",
|
||||
}
|
||||
|
||||
file_path = tmp_path / "storage.json"
|
||||
file_path.write_text(json.dumps(storage_data))
|
||||
|
||||
result = storage_json.StorageJSON.load(str(file_path))
|
||||
|
||||
assert result is not None
|
||||
assert result.name == "loaded_device"
|
||||
assert result.friendly_name == "Loaded Device"
|
||||
assert result.comment == "Loaded from file"
|
||||
assert result.esphome_version == "2024.1.0"
|
||||
assert result.src_version == 2
|
||||
assert result.address == "10.0.0.1"
|
||||
assert result.web_port == 8080
|
||||
assert result.target_platform == "ESP32"
|
||||
assert result.build_path == "/loaded/build"
|
||||
assert result.firmware_bin_path == "/loaded/firmware.bin"
|
||||
assert result.loaded_integrations == {"wifi", "api"}
|
||||
assert result.loaded_platforms == {"sensor"}
|
||||
assert result.no_mdns is True
|
||||
assert result.framework == "arduino"
|
||||
assert result.core_platform == "esp32"
|
||||
|
||||
|
||||
def test_storage_json_load_invalid_file(tmp_path: Path) -> None:
|
||||
"""Test StorageJSON.load with invalid JSON file."""
|
||||
file_path = tmp_path / "invalid.json"
|
||||
file_path.write_text("not valid json{")
|
||||
|
||||
result = storage_json.StorageJSON.load(str(file_path))
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_storage_json_load_nonexistent_file() -> None:
|
||||
"""Test StorageJSON.load with non-existent file."""
|
||||
result = storage_json.StorageJSON.load("/nonexistent/file.json")
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_storage_json_equality() -> None:
|
||||
"""Test StorageJSON equality comparison."""
|
||||
storage1 = storage_json.StorageJSON(
|
||||
storage_version=1,
|
||||
name="test",
|
||||
friendly_name="Test",
|
||||
comment=None,
|
||||
esphome_version="2024.1.0",
|
||||
src_version=1,
|
||||
address="test.local",
|
||||
web_port=80,
|
||||
target_platform="ESP32",
|
||||
build_path="/build",
|
||||
firmware_bin_path="/firmware.bin",
|
||||
loaded_integrations={"wifi"},
|
||||
loaded_platforms=set(),
|
||||
no_mdns=False,
|
||||
)
|
||||
|
||||
storage2 = storage_json.StorageJSON(
|
||||
storage_version=1,
|
||||
name="test",
|
||||
friendly_name="Test",
|
||||
comment=None,
|
||||
esphome_version="2024.1.0",
|
||||
src_version=1,
|
||||
address="test.local",
|
||||
web_port=80,
|
||||
target_platform="ESP32",
|
||||
build_path="/build",
|
||||
firmware_bin_path="/firmware.bin",
|
||||
loaded_integrations={"wifi"},
|
||||
loaded_platforms=set(),
|
||||
no_mdns=False,
|
||||
)
|
||||
|
||||
storage3 = storage_json.StorageJSON(
|
||||
storage_version=1,
|
||||
name="different", # Different name
|
||||
friendly_name="Test",
|
||||
comment=None,
|
||||
esphome_version="2024.1.0",
|
||||
src_version=1,
|
||||
address="test.local",
|
||||
web_port=80,
|
||||
target_platform="ESP32",
|
||||
build_path="/build",
|
||||
firmware_bin_path="/firmware.bin",
|
||||
loaded_integrations={"wifi"},
|
||||
loaded_platforms=set(),
|
||||
no_mdns=False,
|
||||
)
|
||||
|
||||
assert storage1 == storage2
|
||||
assert storage1 != storage3
|
||||
assert storage1 != "not a storage object"
|
||||
|
||||
|
||||
def test_esphome_storage_json_as_dict() -> None:
|
||||
"""Test EsphomeStorageJSON.as_dict returns correct dictionary."""
|
||||
storage = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="secret123",
|
||||
last_update_check="2024-01-15T10:30:00",
|
||||
remote_version="2024.1.1",
|
||||
)
|
||||
|
||||
result = storage.as_dict()
|
||||
|
||||
assert result["storage_version"] == 1
|
||||
assert result["cookie_secret"] == "secret123"
|
||||
assert result["last_update_check"] == "2024-01-15T10:30:00"
|
||||
assert result["remote_version"] == "2024.1.1"
|
||||
|
||||
|
||||
def test_esphome_storage_json_last_update_check_property() -> None:
|
||||
"""Test EsphomeStorageJSON.last_update_check property."""
|
||||
storage = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="secret",
|
||||
last_update_check="2024-01-15T10:30:00",
|
||||
remote_version=None,
|
||||
)
|
||||
|
||||
# Test getter
|
||||
result = storage.last_update_check
|
||||
assert isinstance(result, datetime)
|
||||
assert result.year == 2024
|
||||
assert result.month == 1
|
||||
assert result.day == 15
|
||||
assert result.hour == 10
|
||||
assert result.minute == 30
|
||||
|
||||
# Test setter
|
||||
new_date = datetime(2024, 2, 20, 15, 45, 30)
|
||||
storage.last_update_check = new_date
|
||||
assert storage.last_update_check_str == "2024-02-20T15:45:30"
|
||||
|
||||
|
||||
def test_esphome_storage_json_last_update_check_invalid() -> None:
|
||||
"""Test EsphomeStorageJSON.last_update_check with invalid date."""
|
||||
storage = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="secret",
|
||||
last_update_check="invalid date",
|
||||
remote_version=None,
|
||||
)
|
||||
|
||||
result = storage.last_update_check
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_esphome_storage_json_to_json() -> None:
|
||||
"""Test EsphomeStorageJSON.to_json returns valid JSON string."""
|
||||
storage = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="mysecret",
|
||||
last_update_check="2024-01-15T10:30:00",
|
||||
remote_version="2024.1.1",
|
||||
)
|
||||
|
||||
json_str = storage.to_json()
|
||||
|
||||
# Should be valid JSON
|
||||
parsed = json.loads(json_str)
|
||||
assert parsed["cookie_secret"] == "mysecret"
|
||||
assert parsed["storage_version"] == 1
|
||||
|
||||
# Should end with newline
|
||||
assert json_str.endswith("\n")
|
||||
|
||||
|
||||
def test_esphome_storage_json_save(tmp_path: Path) -> None:
|
||||
"""Test EsphomeStorageJSON.save writes file correctly."""
|
||||
storage = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="secret",
|
||||
last_update_check=None,
|
||||
remote_version=None,
|
||||
)
|
||||
|
||||
save_path = tmp_path / "esphome.json"
|
||||
|
||||
with patch("esphome.storage_json.write_file_if_changed") as mock_write:
|
||||
storage.save(str(save_path))
|
||||
mock_write.assert_called_once_with(str(save_path), storage.to_json())
|
||||
|
||||
|
||||
def test_esphome_storage_json_load_valid_file(tmp_path: Path) -> None:
|
||||
"""Test EsphomeStorageJSON.load with valid JSON file."""
|
||||
storage_data = {
|
||||
"storage_version": 1,
|
||||
"cookie_secret": "loaded_secret",
|
||||
"last_update_check": "2024-01-20T14:30:00",
|
||||
"remote_version": "2024.1.2",
|
||||
}
|
||||
|
||||
file_path = tmp_path / "esphome.json"
|
||||
file_path.write_text(json.dumps(storage_data))
|
||||
|
||||
result = storage_json.EsphomeStorageJSON.load(str(file_path))
|
||||
|
||||
assert result is not None
|
||||
assert result.storage_version == 1
|
||||
assert result.cookie_secret == "loaded_secret"
|
||||
assert result.last_update_check_str == "2024-01-20T14:30:00"
|
||||
assert result.remote_version == "2024.1.2"
|
||||
|
||||
|
||||
def test_esphome_storage_json_load_invalid_file(tmp_path: Path) -> None:
|
||||
"""Test EsphomeStorageJSON.load with invalid JSON file."""
|
||||
file_path = tmp_path / "invalid.json"
|
||||
file_path.write_text("not valid json{")
|
||||
|
||||
result = storage_json.EsphomeStorageJSON.load(str(file_path))
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_esphome_storage_json_load_nonexistent_file() -> None:
|
||||
"""Test EsphomeStorageJSON.load with non-existent file."""
|
||||
result = storage_json.EsphomeStorageJSON.load("/nonexistent/file.json")
|
||||
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_esphome_storage_json_get_default() -> None:
|
||||
"""Test EsphomeStorageJSON.get_default creates default storage."""
|
||||
with patch("esphome.storage_json.os.urandom") as mock_urandom:
|
||||
# Mock urandom to return predictable bytes
|
||||
mock_urandom.return_value = b"test" * 16 # 64 bytes
|
||||
|
||||
result = storage_json.EsphomeStorageJSON.get_default()
|
||||
|
||||
assert result.storage_version == 1
|
||||
assert len(result.cookie_secret) == 128 # 64 bytes hex = 128 chars
|
||||
assert result.last_update_check is None
|
||||
assert result.remote_version is None
|
||||
|
||||
|
||||
def test_esphome_storage_json_equality() -> None:
|
||||
"""Test EsphomeStorageJSON equality comparison."""
|
||||
storage1 = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="secret",
|
||||
last_update_check="2024-01-15T10:30:00",
|
||||
remote_version="2024.1.1",
|
||||
)
|
||||
|
||||
storage2 = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="secret",
|
||||
last_update_check="2024-01-15T10:30:00",
|
||||
remote_version="2024.1.1",
|
||||
)
|
||||
|
||||
storage3 = storage_json.EsphomeStorageJSON(
|
||||
storage_version=1,
|
||||
cookie_secret="different", # Different secret
|
||||
last_update_check="2024-01-15T10:30:00",
|
||||
remote_version="2024.1.1",
|
||||
)
|
||||
|
||||
assert storage1 == storage2
|
||||
assert storage1 != storage3
|
||||
assert storage1 != "not a storage object"
|
||||
|
||||
|
||||
def test_storage_json_load_legacy_esphomeyaml_version(tmp_path: Path) -> None:
|
||||
"""Test loading storage with legacy esphomeyaml_version field."""
|
||||
storage_data = {
|
||||
"storage_version": 1,
|
||||
"name": "legacy_device",
|
||||
"friendly_name": "Legacy Device",
|
||||
"esphomeyaml_version": "1.14.0", # Legacy field name
|
||||
"address": "legacy.local",
|
||||
"esp_platform": "ESP8266",
|
||||
}
|
||||
|
||||
file_path = tmp_path / "legacy.json"
|
||||
file_path.write_text(json.dumps(storage_data))
|
||||
|
||||
result = storage_json.StorageJSON.load(str(file_path))
|
||||
|
||||
assert result is not None
|
||||
assert result.esphome_version == "1.14.0" # Should map to esphome_version
|
||||
@@ -141,3 +141,170 @@ def test_list_yaml_files_mixed_extensions(tmp_path: Path) -> None:
|
||||
str(yaml_file),
|
||||
str(yml_file),
|
||||
}
|
||||
|
||||
|
||||
def test_list_yaml_files_does_not_recurse_into_subdirectories(tmp_path: Path) -> None:
|
||||
"""Test that list_yaml_files only finds files in specified directory, not subdirectories."""
|
||||
# Create directory structure with YAML files at different depths
|
||||
root = tmp_path / "configs"
|
||||
root.mkdir()
|
||||
|
||||
# Create YAML files in the root directory
|
||||
(root / "config1.yaml").write_text("test: 1")
|
||||
(root / "config2.yml").write_text("test: 2")
|
||||
(root / "device.yaml").write_text("test: device")
|
||||
|
||||
# Create subdirectory with YAML files (should NOT be found)
|
||||
subdir = root / "subdir"
|
||||
subdir.mkdir()
|
||||
(subdir / "nested1.yaml").write_text("test: nested1")
|
||||
(subdir / "nested2.yml").write_text("test: nested2")
|
||||
|
||||
# Create deeper subdirectory (should NOT be found)
|
||||
deep_subdir = subdir / "deeper"
|
||||
deep_subdir.mkdir()
|
||||
(deep_subdir / "very_nested.yaml").write_text("test: very_nested")
|
||||
|
||||
# Test listing files from the root directory
|
||||
result = util.list_yaml_files([str(root)])
|
||||
|
||||
# Should only find the 3 files in root, not the 3 in subdirectories
|
||||
assert len(result) == 3
|
||||
|
||||
# Check that only root-level files are found
|
||||
assert str(root / "config1.yaml") in result
|
||||
assert str(root / "config2.yml") in result
|
||||
assert str(root / "device.yaml") in result
|
||||
|
||||
# Ensure nested files are NOT found
|
||||
for r in result:
|
||||
assert "subdir" not in r
|
||||
assert "deeper" not in r
|
||||
assert "nested1.yaml" not in r
|
||||
assert "nested2.yml" not in r
|
||||
assert "very_nested.yaml" not in r
|
||||
|
||||
|
||||
def test_list_yaml_files_excludes_secrets(tmp_path: Path) -> None:
|
||||
"""Test that secrets.yaml and secrets.yml are excluded."""
|
||||
root = tmp_path / "configs"
|
||||
root.mkdir()
|
||||
|
||||
# Create various YAML files including secrets
|
||||
(root / "config.yaml").write_text("test: config")
|
||||
(root / "secrets.yaml").write_text("wifi_password: secret123")
|
||||
(root / "secrets.yml").write_text("api_key: secret456")
|
||||
(root / "device.yaml").write_text("test: device")
|
||||
|
||||
result = util.list_yaml_files([str(root)])
|
||||
|
||||
# Should find 2 files (config.yaml and device.yaml), not secrets
|
||||
assert len(result) == 2
|
||||
assert str(root / "config.yaml") in result
|
||||
assert str(root / "device.yaml") in result
|
||||
assert str(root / "secrets.yaml") not in result
|
||||
assert str(root / "secrets.yml") not in result
|
||||
|
||||
|
||||
def test_list_yaml_files_excludes_hidden_files(tmp_path: Path) -> None:
|
||||
"""Test that hidden files (starting with .) are excluded."""
|
||||
root = tmp_path / "configs"
|
||||
root.mkdir()
|
||||
|
||||
# Create regular and hidden YAML files
|
||||
(root / "config.yaml").write_text("test: config")
|
||||
(root / ".hidden.yaml").write_text("test: hidden")
|
||||
(root / ".backup.yml").write_text("test: backup")
|
||||
(root / "device.yaml").write_text("test: device")
|
||||
|
||||
result = util.list_yaml_files([str(root)])
|
||||
|
||||
# Should find only non-hidden files
|
||||
assert len(result) == 2
|
||||
assert str(root / "config.yaml") in result
|
||||
assert str(root / "device.yaml") in result
|
||||
assert str(root / ".hidden.yaml") not in result
|
||||
assert str(root / ".backup.yml") not in result
|
||||
|
||||
|
||||
def test_filter_yaml_files_basic() -> None:
|
||||
"""Test filter_yaml_files function."""
|
||||
files = [
|
||||
"/path/to/config.yaml",
|
||||
"/path/to/device.yml",
|
||||
"/path/to/readme.txt",
|
||||
"/path/to/script.py",
|
||||
"/path/to/data.json",
|
||||
"/path/to/another.yaml",
|
||||
]
|
||||
|
||||
result = util.filter_yaml_files(files)
|
||||
|
||||
assert len(result) == 3
|
||||
assert "/path/to/config.yaml" in result
|
||||
assert "/path/to/device.yml" in result
|
||||
assert "/path/to/another.yaml" in result
|
||||
assert "/path/to/readme.txt" not in result
|
||||
assert "/path/to/script.py" not in result
|
||||
assert "/path/to/data.json" not in result
|
||||
|
||||
|
||||
def test_filter_yaml_files_excludes_secrets() -> None:
|
||||
"""Test that filter_yaml_files excludes secrets files."""
|
||||
files = [
|
||||
"/path/to/config.yaml",
|
||||
"/path/to/secrets.yaml",
|
||||
"/path/to/secrets.yml",
|
||||
"/path/to/device.yaml",
|
||||
"/some/dir/secrets.yaml",
|
||||
]
|
||||
|
||||
result = util.filter_yaml_files(files)
|
||||
|
||||
assert len(result) == 2
|
||||
assert "/path/to/config.yaml" in result
|
||||
assert "/path/to/device.yaml" in result
|
||||
assert "/path/to/secrets.yaml" not in result
|
||||
assert "/path/to/secrets.yml" not in result
|
||||
assert "/some/dir/secrets.yaml" not in result
|
||||
|
||||
|
||||
def test_filter_yaml_files_excludes_hidden() -> None:
|
||||
"""Test that filter_yaml_files excludes hidden files."""
|
||||
files = [
|
||||
"/path/to/config.yaml",
|
||||
"/path/to/.hidden.yaml",
|
||||
"/path/to/.backup.yml",
|
||||
"/path/to/device.yaml",
|
||||
"/some/dir/.config.yaml",
|
||||
]
|
||||
|
||||
result = util.filter_yaml_files(files)
|
||||
|
||||
assert len(result) == 2
|
||||
assert "/path/to/config.yaml" in result
|
||||
assert "/path/to/device.yaml" in result
|
||||
assert "/path/to/.hidden.yaml" not in result
|
||||
assert "/path/to/.backup.yml" not in result
|
||||
assert "/some/dir/.config.yaml" not in result
|
||||
|
||||
|
||||
def test_filter_yaml_files_case_sensitive() -> None:
|
||||
"""Test that filter_yaml_files is case-sensitive for extensions."""
|
||||
files = [
|
||||
"/path/to/config.yaml",
|
||||
"/path/to/config.YAML",
|
||||
"/path/to/config.YML",
|
||||
"/path/to/config.Yaml",
|
||||
"/path/to/config.yml",
|
||||
]
|
||||
|
||||
result = util.filter_yaml_files(files)
|
||||
|
||||
# Should only match lowercase .yaml and .yml
|
||||
assert len(result) == 2
|
||||
assert "/path/to/config.yaml" in result
|
||||
assert "/path/to/config.yml" in result
|
||||
assert "/path/to/config.YAML" not in result
|
||||
assert "/path/to/config.YML" not in result
|
||||
assert "/path/to/config.Yaml" not in result
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
"""Tests for the wizard.py file."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
from pytest import MonkeyPatch
|
||||
|
||||
from esphome.components.bk72xx.boards import BK72XX_BOARD_PINS
|
||||
from esphome.components.esp32.boards import ESP32_BOARD_PINS
|
||||
@@ -15,7 +18,7 @@ import esphome.wizard as wz
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def default_config():
|
||||
def default_config() -> dict[str, Any]:
|
||||
return {
|
||||
"type": "basic",
|
||||
"name": "test-name",
|
||||
@@ -28,7 +31,7 @@ def default_config():
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def wizard_answers():
|
||||
def wizard_answers() -> list[str]:
|
||||
return [
|
||||
"test-node", # Name of the node
|
||||
"ESP8266", # platform
|
||||
@@ -53,7 +56,9 @@ def test_sanitize_quotes_replaces_with_escaped_char():
|
||||
assert output_str == '\\"key\\": \\"value\\"'
|
||||
|
||||
|
||||
def test_config_file_fallback_ap_includes_descriptive_name(default_config):
|
||||
def test_config_file_fallback_ap_includes_descriptive_name(
|
||||
default_config: dict[str, Any],
|
||||
):
|
||||
"""
|
||||
The fallback AP should include the node and a descriptive name
|
||||
"""
|
||||
@@ -67,7 +72,9 @@ def test_config_file_fallback_ap_includes_descriptive_name(default_config):
|
||||
assert 'ssid: "Test Node Fallback Hotspot"' in config
|
||||
|
||||
|
||||
def test_config_file_fallback_ap_name_less_than_32_chars(default_config):
|
||||
def test_config_file_fallback_ap_name_less_than_32_chars(
|
||||
default_config: dict[str, Any],
|
||||
):
|
||||
"""
|
||||
The fallback AP name must be less than 32 chars.
|
||||
Since it is composed of the node name and "Fallback Hotspot" this can be too long and needs truncating
|
||||
@@ -82,7 +89,7 @@ def test_config_file_fallback_ap_name_less_than_32_chars(default_config):
|
||||
assert 'ssid: "A Very Long Name For This Node"' in config
|
||||
|
||||
|
||||
def test_config_file_should_include_ota(default_config):
|
||||
def test_config_file_should_include_ota(default_config: dict[str, Any]):
|
||||
"""
|
||||
The Over-The-Air update should be enabled by default
|
||||
"""
|
||||
@@ -95,7 +102,9 @@ def test_config_file_should_include_ota(default_config):
|
||||
assert "ota:" in config
|
||||
|
||||
|
||||
def test_config_file_should_include_ota_when_password_set(default_config):
|
||||
def test_config_file_should_include_ota_when_password_set(
|
||||
default_config: dict[str, Any],
|
||||
):
|
||||
"""
|
||||
The Over-The-Air update should be enabled when a password is set
|
||||
"""
|
||||
@@ -109,7 +118,9 @@ def test_config_file_should_include_ota_when_password_set(default_config):
|
||||
assert "ota:" in config
|
||||
|
||||
|
||||
def test_wizard_write_sets_platform(default_config, tmp_path, monkeypatch):
|
||||
def test_wizard_write_sets_platform(
|
||||
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
|
||||
):
|
||||
"""
|
||||
If the platform is not explicitly set, use "ESP8266" if the board is one of the ESP8266 boards
|
||||
"""
|
||||
@@ -126,7 +137,7 @@ def test_wizard_write_sets_platform(default_config, tmp_path, monkeypatch):
|
||||
assert "esp8266:" in generated_config
|
||||
|
||||
|
||||
def test_wizard_empty_config(tmp_path, monkeypatch):
|
||||
def test_wizard_empty_config(tmp_path: Path, monkeypatch: MonkeyPatch):
|
||||
"""
|
||||
The wizard should be able to create an empty configuration
|
||||
"""
|
||||
@@ -146,7 +157,7 @@ def test_wizard_empty_config(tmp_path, monkeypatch):
|
||||
assert generated_config == ""
|
||||
|
||||
|
||||
def test_wizard_upload_config(tmp_path, monkeypatch):
|
||||
def test_wizard_upload_config(tmp_path: Path, monkeypatch: MonkeyPatch):
|
||||
"""
|
||||
The wizard should be able to import an base64 encoded configuration
|
||||
"""
|
||||
@@ -168,7 +179,7 @@ def test_wizard_upload_config(tmp_path, monkeypatch):
|
||||
|
||||
|
||||
def test_wizard_write_defaults_platform_from_board_esp8266(
|
||||
default_config, tmp_path, monkeypatch
|
||||
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
|
||||
):
|
||||
"""
|
||||
If the platform is not explicitly set, use "ESP8266" if the board is one of the ESP8266 boards
|
||||
@@ -189,7 +200,7 @@ def test_wizard_write_defaults_platform_from_board_esp8266(
|
||||
|
||||
|
||||
def test_wizard_write_defaults_platform_from_board_esp32(
|
||||
default_config, tmp_path, monkeypatch
|
||||
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
|
||||
):
|
||||
"""
|
||||
If the platform is not explicitly set, use "ESP32" if the board is one of the ESP32 boards
|
||||
@@ -210,7 +221,7 @@ def test_wizard_write_defaults_platform_from_board_esp32(
|
||||
|
||||
|
||||
def test_wizard_write_defaults_platform_from_board_bk72xx(
|
||||
default_config, tmp_path, monkeypatch
|
||||
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
|
||||
):
|
||||
"""
|
||||
If the platform is not explicitly set, use "BK72XX" if the board is one of BK72XX boards
|
||||
@@ -231,7 +242,7 @@ def test_wizard_write_defaults_platform_from_board_bk72xx(
|
||||
|
||||
|
||||
def test_wizard_write_defaults_platform_from_board_ln882x(
|
||||
default_config, tmp_path, monkeypatch
|
||||
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
|
||||
):
|
||||
"""
|
||||
If the platform is not explicitly set, use "LN882X" if the board is one of LN882X boards
|
||||
@@ -252,7 +263,7 @@ def test_wizard_write_defaults_platform_from_board_ln882x(
|
||||
|
||||
|
||||
def test_wizard_write_defaults_platform_from_board_rtl87xx(
|
||||
default_config, tmp_path, monkeypatch
|
||||
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
|
||||
):
|
||||
"""
|
||||
If the platform is not explicitly set, use "RTL87XX" if the board is one of RTL87XX boards
|
||||
@@ -272,7 +283,7 @@ def test_wizard_write_defaults_platform_from_board_rtl87xx(
|
||||
assert "rtl87xx:" in generated_config
|
||||
|
||||
|
||||
def test_safe_print_step_prints_step_number_and_description(monkeypatch):
|
||||
def test_safe_print_step_prints_step_number_and_description(monkeypatch: MonkeyPatch):
|
||||
"""
|
||||
The safe_print_step function prints the step number and the passed description
|
||||
"""
|
||||
@@ -296,7 +307,7 @@ def test_safe_print_step_prints_step_number_and_description(monkeypatch):
|
||||
assert any(f"STEP {step_num}" in arg for arg in all_args)
|
||||
|
||||
|
||||
def test_default_input_uses_default_if_no_input_supplied(monkeypatch):
|
||||
def test_default_input_uses_default_if_no_input_supplied(monkeypatch: MonkeyPatch):
|
||||
"""
|
||||
The default_input() function should return the supplied default value if the user doesn't enter anything
|
||||
"""
|
||||
@@ -312,7 +323,7 @@ def test_default_input_uses_default_if_no_input_supplied(monkeypatch):
|
||||
assert retval == default_string
|
||||
|
||||
|
||||
def test_default_input_uses_user_supplied_value(monkeypatch):
|
||||
def test_default_input_uses_user_supplied_value(monkeypatch: MonkeyPatch):
|
||||
"""
|
||||
The default_input() function should return the value that the user enters
|
||||
"""
|
||||
@@ -376,7 +387,9 @@ def test_wizard_rejects_existing_files(tmpdir):
|
||||
assert retval == 2
|
||||
|
||||
|
||||
def test_wizard_accepts_default_answers_esp8266(tmpdir, monkeypatch, wizard_answers):
|
||||
def test_wizard_accepts_default_answers_esp8266(
|
||||
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
|
||||
):
|
||||
"""
|
||||
The wizard should accept the given default answers for esp8266
|
||||
"""
|
||||
@@ -396,7 +409,9 @@ def test_wizard_accepts_default_answers_esp8266(tmpdir, monkeypatch, wizard_answ
|
||||
assert retval == 0
|
||||
|
||||
|
||||
def test_wizard_accepts_default_answers_esp32(tmpdir, monkeypatch, wizard_answers):
|
||||
def test_wizard_accepts_default_answers_esp32(
|
||||
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
|
||||
):
|
||||
"""
|
||||
The wizard should accept the given default answers for esp32
|
||||
"""
|
||||
@@ -418,7 +433,9 @@ def test_wizard_accepts_default_answers_esp32(tmpdir, monkeypatch, wizard_answer
|
||||
assert retval == 0
|
||||
|
||||
|
||||
def test_wizard_offers_better_node_name(tmpdir, monkeypatch, wizard_answers):
|
||||
def test_wizard_offers_better_node_name(
|
||||
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
|
||||
):
|
||||
"""
|
||||
When the node name does not conform, a better alternative is offered
|
||||
* Removes special chars
|
||||
@@ -449,7 +466,9 @@ def test_wizard_offers_better_node_name(tmpdir, monkeypatch, wizard_answers):
|
||||
assert wz.default_input.call_args.args[1] == expected_name
|
||||
|
||||
|
||||
def test_wizard_requires_correct_platform(tmpdir, monkeypatch, wizard_answers):
|
||||
def test_wizard_requires_correct_platform(
|
||||
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
|
||||
):
|
||||
"""
|
||||
When the platform is not either esp32 or esp8266, the wizard should reject it
|
||||
"""
|
||||
@@ -471,7 +490,9 @@ def test_wizard_requires_correct_platform(tmpdir, monkeypatch, wizard_answers):
|
||||
assert retval == 0
|
||||
|
||||
|
||||
def test_wizard_requires_correct_board(tmpdir, monkeypatch, wizard_answers):
|
||||
def test_wizard_requires_correct_board(
|
||||
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
|
||||
):
|
||||
"""
|
||||
When the board is not a valid esp8266 board, the wizard should reject it
|
||||
"""
|
||||
@@ -493,7 +514,9 @@ def test_wizard_requires_correct_board(tmpdir, monkeypatch, wizard_answers):
|
||||
assert retval == 0
|
||||
|
||||
|
||||
def test_wizard_requires_valid_ssid(tmpdir, monkeypatch, wizard_answers):
|
||||
def test_wizard_requires_valid_ssid(
|
||||
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
|
||||
):
|
||||
"""
|
||||
When the board is not a valid esp8266 board, the wizard should reject it
|
||||
"""
|
||||
@@ -515,7 +538,9 @@ def test_wizard_requires_valid_ssid(tmpdir, monkeypatch, wizard_answers):
|
||||
assert retval == 0
|
||||
|
||||
|
||||
def test_wizard_write_protects_existing_config(tmpdir, default_config, monkeypatch):
|
||||
def test_wizard_write_protects_existing_config(
|
||||
tmpdir, default_config: dict[str, Any], monkeypatch: MonkeyPatch
|
||||
):
|
||||
"""
|
||||
The wizard_write function should not overwrite existing config files and return False
|
||||
"""
|
||||
|
||||
@@ -1,13 +1,34 @@
|
||||
"""Test writer module functionality."""
|
||||
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from esphome.core import EsphomeError
|
||||
from esphome.storage_json import StorageJSON
|
||||
from esphome.writer import storage_should_clean, update_storage_json
|
||||
from esphome.writer import (
|
||||
CPP_AUTO_GENERATE_BEGIN,
|
||||
CPP_AUTO_GENERATE_END,
|
||||
CPP_INCLUDE_BEGIN,
|
||||
CPP_INCLUDE_END,
|
||||
GITIGNORE_CONTENT,
|
||||
clean_build,
|
||||
clean_cmake_cache,
|
||||
storage_should_clean,
|
||||
update_storage_json,
|
||||
write_cpp,
|
||||
write_gitignore,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_copy_src_tree():
|
||||
"""Mock copy_src_tree to avoid side effects during tests."""
|
||||
with patch("esphome.writer.copy_src_tree"):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -218,3 +239,493 @@ def test_update_storage_json_logging_components_removed(
|
||||
|
||||
# Verify save was called
|
||||
new_storage.save.assert_called_once_with("/test/path")
|
||||
|
||||
|
||||
@patch("esphome.writer.CORE")
|
||||
def test_clean_cmake_cache(
|
||||
mock_core: MagicMock,
|
||||
tmp_path: Path,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test clean_cmake_cache removes CMakeCache.txt file."""
|
||||
# Create directory structure
|
||||
pioenvs_dir = tmp_path / ".pioenvs"
|
||||
pioenvs_dir.mkdir()
|
||||
device_dir = pioenvs_dir / "test_device"
|
||||
device_dir.mkdir()
|
||||
cmake_cache_file = device_dir / "CMakeCache.txt"
|
||||
cmake_cache_file.write_text("# CMake cache file")
|
||||
|
||||
# Setup mocks
|
||||
mock_core.relative_pioenvs_path.side_effect = [
|
||||
str(pioenvs_dir), # First call for directory check
|
||||
str(cmake_cache_file), # Second call for file path
|
||||
]
|
||||
mock_core.name = "test_device"
|
||||
|
||||
# Verify file exists before
|
||||
assert cmake_cache_file.exists()
|
||||
|
||||
# Call the function
|
||||
with caplog.at_level("INFO"):
|
||||
clean_cmake_cache()
|
||||
|
||||
# Verify file was removed
|
||||
assert not cmake_cache_file.exists()
|
||||
|
||||
# Verify logging
|
||||
assert "Deleting" in caplog.text
|
||||
assert "CMakeCache.txt" in caplog.text
|
||||
|
||||
|
||||
@patch("esphome.writer.CORE")
|
||||
def test_clean_cmake_cache_no_pioenvs_dir(
|
||||
mock_core: MagicMock,
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
"""Test clean_cmake_cache when pioenvs directory doesn't exist."""
|
||||
# Setup non-existent directory path
|
||||
pioenvs_dir = tmp_path / ".pioenvs"
|
||||
|
||||
# Setup mocks
|
||||
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
|
||||
|
||||
# Verify directory doesn't exist
|
||||
assert not pioenvs_dir.exists()
|
||||
|
||||
# Call the function - should not crash
|
||||
clean_cmake_cache()
|
||||
|
||||
# Verify directory still doesn't exist
|
||||
assert not pioenvs_dir.exists()
|
||||
|
||||
|
||||
@patch("esphome.writer.CORE")
|
||||
def test_clean_cmake_cache_no_cmake_file(
|
||||
mock_core: MagicMock,
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
"""Test clean_cmake_cache when CMakeCache.txt doesn't exist."""
|
||||
# Create directory structure without CMakeCache.txt
|
||||
pioenvs_dir = tmp_path / ".pioenvs"
|
||||
pioenvs_dir.mkdir()
|
||||
device_dir = pioenvs_dir / "test_device"
|
||||
device_dir.mkdir()
|
||||
cmake_cache_file = device_dir / "CMakeCache.txt"
|
||||
|
||||
# Setup mocks
|
||||
mock_core.relative_pioenvs_path.side_effect = [
|
||||
str(pioenvs_dir), # First call for directory check
|
||||
str(cmake_cache_file), # Second call for file path
|
||||
]
|
||||
mock_core.name = "test_device"
|
||||
|
||||
# Verify file doesn't exist
|
||||
assert not cmake_cache_file.exists()
|
||||
|
||||
# Call the function - should not crash
|
||||
clean_cmake_cache()
|
||||
|
||||
# Verify file still doesn't exist
|
||||
assert not cmake_cache_file.exists()
|
||||
|
||||
|
||||
@patch("esphome.writer.CORE")
|
||||
def test_clean_build(
|
||||
mock_core: MagicMock,
|
||||
tmp_path: Path,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test clean_build removes all build artifacts."""
|
||||
# Create directory structure and files
|
||||
pioenvs_dir = tmp_path / ".pioenvs"
|
||||
pioenvs_dir.mkdir()
|
||||
(pioenvs_dir / "test_file.o").write_text("object file")
|
||||
|
||||
piolibdeps_dir = tmp_path / ".piolibdeps"
|
||||
piolibdeps_dir.mkdir()
|
||||
(piolibdeps_dir / "library").mkdir()
|
||||
|
||||
dependencies_lock = tmp_path / "dependencies.lock"
|
||||
dependencies_lock.write_text("lock file")
|
||||
|
||||
# Create PlatformIO cache directory
|
||||
platformio_cache_dir = tmp_path / ".platformio" / ".cache"
|
||||
platformio_cache_dir.mkdir(parents=True)
|
||||
(platformio_cache_dir / "downloads").mkdir()
|
||||
(platformio_cache_dir / "http").mkdir()
|
||||
(platformio_cache_dir / "tmp").mkdir()
|
||||
(platformio_cache_dir / "downloads" / "package.tar.gz").write_text("package")
|
||||
|
||||
# Setup mocks
|
||||
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
|
||||
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
|
||||
mock_core.relative_build_path.return_value = str(dependencies_lock)
|
||||
|
||||
# Verify all exist before
|
||||
assert pioenvs_dir.exists()
|
||||
assert piolibdeps_dir.exists()
|
||||
assert dependencies_lock.exists()
|
||||
assert platformio_cache_dir.exists()
|
||||
|
||||
# Mock PlatformIO's get_project_cache_dir
|
||||
with patch(
|
||||
"platformio.project.helpers.get_project_cache_dir"
|
||||
) as mock_get_cache_dir:
|
||||
mock_get_cache_dir.return_value = str(platformio_cache_dir)
|
||||
|
||||
# Call the function
|
||||
with caplog.at_level("INFO"):
|
||||
clean_build()
|
||||
|
||||
# Verify all were removed
|
||||
assert not pioenvs_dir.exists()
|
||||
assert not piolibdeps_dir.exists()
|
||||
assert not dependencies_lock.exists()
|
||||
assert not platformio_cache_dir.exists()
|
||||
|
||||
# Verify logging
|
||||
assert "Deleting" in caplog.text
|
||||
assert ".pioenvs" in caplog.text
|
||||
assert ".piolibdeps" in caplog.text
|
||||
assert "dependencies.lock" in caplog.text
|
||||
assert "PlatformIO cache" in caplog.text
|
||||
|
||||
|
||||
@patch("esphome.writer.CORE")
|
||||
def test_clean_build_partial_exists(
|
||||
mock_core: MagicMock,
|
||||
tmp_path: Path,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test clean_build when only some paths exist."""
|
||||
# Create only pioenvs directory
|
||||
pioenvs_dir = tmp_path / ".pioenvs"
|
||||
pioenvs_dir.mkdir()
|
||||
(pioenvs_dir / "test_file.o").write_text("object file")
|
||||
|
||||
piolibdeps_dir = tmp_path / ".piolibdeps"
|
||||
dependencies_lock = tmp_path / "dependencies.lock"
|
||||
|
||||
# Setup mocks
|
||||
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
|
||||
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
|
||||
mock_core.relative_build_path.return_value = str(dependencies_lock)
|
||||
|
||||
# Verify only pioenvs exists
|
||||
assert pioenvs_dir.exists()
|
||||
assert not piolibdeps_dir.exists()
|
||||
assert not dependencies_lock.exists()
|
||||
|
||||
# Call the function
|
||||
with caplog.at_level("INFO"):
|
||||
clean_build()
|
||||
|
||||
# Verify only existing path was removed
|
||||
assert not pioenvs_dir.exists()
|
||||
assert not piolibdeps_dir.exists()
|
||||
assert not dependencies_lock.exists()
|
||||
|
||||
# Verify logging - only pioenvs should be logged
|
||||
assert "Deleting" in caplog.text
|
||||
assert ".pioenvs" in caplog.text
|
||||
assert ".piolibdeps" not in caplog.text
|
||||
assert "dependencies.lock" not in caplog.text
|
||||
|
||||
|
||||
@patch("esphome.writer.CORE")
|
||||
def test_clean_build_nothing_exists(
|
||||
mock_core: MagicMock,
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
"""Test clean_build when no build artifacts exist."""
|
||||
# Setup paths that don't exist
|
||||
pioenvs_dir = tmp_path / ".pioenvs"
|
||||
piolibdeps_dir = tmp_path / ".piolibdeps"
|
||||
dependencies_lock = tmp_path / "dependencies.lock"
|
||||
|
||||
# Setup mocks
|
||||
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
|
||||
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
|
||||
mock_core.relative_build_path.return_value = str(dependencies_lock)
|
||||
|
||||
# Verify nothing exists
|
||||
assert not pioenvs_dir.exists()
|
||||
assert not piolibdeps_dir.exists()
|
||||
assert not dependencies_lock.exists()
|
||||
|
||||
# Call the function - should not crash
|
||||
clean_build()
|
||||
|
||||
# Verify nothing was created
|
||||
assert not pioenvs_dir.exists()
|
||||
assert not piolibdeps_dir.exists()
|
||||
assert not dependencies_lock.exists()
|
||||
|
||||
|
||||
@patch("esphome.writer.CORE")
|
||||
def test_clean_build_platformio_not_available(
|
||||
mock_core: MagicMock,
|
||||
tmp_path: Path,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test clean_build when PlatformIO is not available."""
|
||||
# Create directory structure and files
|
||||
pioenvs_dir = tmp_path / ".pioenvs"
|
||||
pioenvs_dir.mkdir()
|
||||
|
||||
piolibdeps_dir = tmp_path / ".piolibdeps"
|
||||
piolibdeps_dir.mkdir()
|
||||
|
||||
dependencies_lock = tmp_path / "dependencies.lock"
|
||||
dependencies_lock.write_text("lock file")
|
||||
|
||||
# Setup mocks
|
||||
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
|
||||
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
|
||||
mock_core.relative_build_path.return_value = str(dependencies_lock)
|
||||
|
||||
# Verify all exist before
|
||||
assert pioenvs_dir.exists()
|
||||
assert piolibdeps_dir.exists()
|
||||
assert dependencies_lock.exists()
|
||||
|
||||
# Mock import error for platformio
|
||||
with (
|
||||
patch.dict("sys.modules", {"platformio.project.helpers": None}),
|
||||
caplog.at_level("INFO"),
|
||||
):
|
||||
# Call the function
|
||||
clean_build()
|
||||
|
||||
# Verify standard paths were removed but no cache cleaning attempted
|
||||
assert not pioenvs_dir.exists()
|
||||
assert not piolibdeps_dir.exists()
|
||||
assert not dependencies_lock.exists()
|
||||
|
||||
# Verify no cache logging
|
||||
assert "PlatformIO cache" not in caplog.text
|
||||
|
||||
|
||||
@patch("esphome.writer.CORE")
|
||||
def test_clean_build_empty_cache_dir(
|
||||
mock_core: MagicMock,
|
||||
tmp_path: Path,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test clean_build when get_project_cache_dir returns empty/whitespace."""
|
||||
# Create directory structure and files
|
||||
pioenvs_dir = tmp_path / ".pioenvs"
|
||||
pioenvs_dir.mkdir()
|
||||
|
||||
# Setup mocks
|
||||
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
|
||||
mock_core.relative_piolibdeps_path.return_value = str(tmp_path / ".piolibdeps")
|
||||
mock_core.relative_build_path.return_value = str(tmp_path / "dependencies.lock")
|
||||
|
||||
# Verify pioenvs exists before
|
||||
assert pioenvs_dir.exists()
|
||||
|
||||
# Mock PlatformIO's get_project_cache_dir to return whitespace
|
||||
with patch(
|
||||
"platformio.project.helpers.get_project_cache_dir"
|
||||
) as mock_get_cache_dir:
|
||||
mock_get_cache_dir.return_value = " " # Whitespace only
|
||||
|
||||
# Call the function
|
||||
with caplog.at_level("INFO"):
|
||||
clean_build()
|
||||
|
||||
# Verify pioenvs was removed
|
||||
assert not pioenvs_dir.exists()
|
||||
|
||||
# Verify no cache cleaning was attempted due to empty string
|
||||
assert "PlatformIO cache" not in caplog.text
|
||||
|
||||
|
||||
@patch("esphome.writer.CORE")
|
||||
def test_write_gitignore_creates_new_file(
|
||||
mock_core: MagicMock,
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
"""Test write_gitignore creates a new .gitignore file when it doesn't exist."""
|
||||
gitignore_path = tmp_path / ".gitignore"
|
||||
|
||||
# Setup mocks
|
||||
mock_core.relative_config_path.return_value = str(gitignore_path)
|
||||
|
||||
# Verify file doesn't exist
|
||||
assert not gitignore_path.exists()
|
||||
|
||||
# Call the function
|
||||
write_gitignore()
|
||||
|
||||
# Verify file was created with correct content
|
||||
assert gitignore_path.exists()
|
||||
assert gitignore_path.read_text() == GITIGNORE_CONTENT
|
||||
|
||||
|
||||
@patch("esphome.writer.CORE")
|
||||
def test_write_gitignore_skips_existing_file(
|
||||
mock_core: MagicMock,
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
"""Test write_gitignore doesn't overwrite existing .gitignore file."""
|
||||
gitignore_path = tmp_path / ".gitignore"
|
||||
existing_content = "# Custom gitignore\n/custom_dir/\n"
|
||||
gitignore_path.write_text(existing_content)
|
||||
|
||||
# Setup mocks
|
||||
mock_core.relative_config_path.return_value = str(gitignore_path)
|
||||
|
||||
# Verify file exists with custom content
|
||||
assert gitignore_path.exists()
|
||||
assert gitignore_path.read_text() == existing_content
|
||||
|
||||
# Call the function
|
||||
write_gitignore()
|
||||
|
||||
# Verify file was not modified
|
||||
assert gitignore_path.exists()
|
||||
assert gitignore_path.read_text() == existing_content
|
||||
|
||||
|
||||
@patch("esphome.writer.write_file_if_changed") # Mock to capture output
|
||||
@patch("esphome.writer.copy_src_tree") # Keep this mock as it's complex
|
||||
@patch("esphome.writer.CORE")
|
||||
def test_write_cpp_with_existing_file(
|
||||
mock_core: MagicMock,
|
||||
mock_copy_src_tree: MagicMock,
|
||||
mock_write_file: MagicMock,
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
"""Test write_cpp when main.cpp already exists."""
|
||||
# Create a real file with markers
|
||||
main_cpp = tmp_path / "main.cpp"
|
||||
existing_content = f"""#include "esphome.h"
|
||||
{CPP_INCLUDE_BEGIN}
|
||||
// Old includes
|
||||
{CPP_INCLUDE_END}
|
||||
void setup() {{
|
||||
{CPP_AUTO_GENERATE_BEGIN}
|
||||
// Old code
|
||||
{CPP_AUTO_GENERATE_END}
|
||||
}}
|
||||
void loop() {{}}"""
|
||||
main_cpp.write_text(existing_content)
|
||||
|
||||
# Setup mocks
|
||||
mock_core.relative_src_path.return_value = str(main_cpp)
|
||||
mock_core.cpp_global_section = "// Global section"
|
||||
|
||||
# Call the function
|
||||
test_code = " // New generated code"
|
||||
write_cpp(test_code)
|
||||
|
||||
# Verify copy_src_tree was called
|
||||
mock_copy_src_tree.assert_called_once()
|
||||
|
||||
# Get the content that would be written
|
||||
mock_write_file.assert_called_once()
|
||||
written_path, written_content = mock_write_file.call_args[0]
|
||||
|
||||
# Check that markers are preserved and content is updated
|
||||
assert CPP_INCLUDE_BEGIN in written_content
|
||||
assert CPP_INCLUDE_END in written_content
|
||||
assert CPP_AUTO_GENERATE_BEGIN in written_content
|
||||
assert CPP_AUTO_GENERATE_END in written_content
|
||||
assert test_code in written_content
|
||||
assert "// Global section" in written_content
|
||||
|
||||
|
||||
@patch("esphome.writer.write_file_if_changed") # Mock to capture output
|
||||
@patch("esphome.writer.copy_src_tree") # Keep this mock as it's complex
|
||||
@patch("esphome.writer.CORE")
|
||||
def test_write_cpp_creates_new_file(
|
||||
mock_core: MagicMock,
|
||||
mock_copy_src_tree: MagicMock,
|
||||
mock_write_file: MagicMock,
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
"""Test write_cpp when main.cpp doesn't exist."""
|
||||
# Setup path for new file
|
||||
main_cpp = tmp_path / "main.cpp"
|
||||
|
||||
# Setup mocks
|
||||
mock_core.relative_src_path.return_value = str(main_cpp)
|
||||
mock_core.cpp_global_section = "// Global section"
|
||||
|
||||
# Verify file doesn't exist
|
||||
assert not main_cpp.exists()
|
||||
|
||||
# Call the function
|
||||
test_code = " // Generated code"
|
||||
write_cpp(test_code)
|
||||
|
||||
# Verify copy_src_tree was called
|
||||
mock_copy_src_tree.assert_called_once()
|
||||
|
||||
# Get the content that would be written
|
||||
mock_write_file.assert_called_once()
|
||||
written_path, written_content = mock_write_file.call_args[0]
|
||||
assert written_path == str(main_cpp)
|
||||
|
||||
# Check that all necessary parts are in the new file
|
||||
assert '#include "esphome.h"' in written_content
|
||||
assert CPP_INCLUDE_BEGIN in written_content
|
||||
assert CPP_INCLUDE_END in written_content
|
||||
assert CPP_AUTO_GENERATE_BEGIN in written_content
|
||||
assert CPP_AUTO_GENERATE_END in written_content
|
||||
assert test_code in written_content
|
||||
assert "void setup()" in written_content
|
||||
assert "void loop()" in written_content
|
||||
assert "App.setup();" in written_content
|
||||
assert "App.loop();" in written_content
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_copy_src_tree")
|
||||
@patch("esphome.writer.CORE")
|
||||
def test_write_cpp_with_missing_end_marker(
|
||||
mock_core: MagicMock,
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
"""Test write_cpp raises error when end marker is missing."""
|
||||
# Create a file with begin marker but no end marker
|
||||
main_cpp = tmp_path / "main.cpp"
|
||||
existing_content = f"""#include "esphome.h"
|
||||
{CPP_AUTO_GENERATE_BEGIN}
|
||||
// Code without end marker"""
|
||||
main_cpp.write_text(existing_content)
|
||||
|
||||
# Setup mocks
|
||||
mock_core.relative_src_path.return_value = str(main_cpp)
|
||||
|
||||
# Call should raise an error
|
||||
with pytest.raises(EsphomeError, match="Could not find auto generated code end"):
|
||||
write_cpp("// New code")
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_copy_src_tree")
|
||||
@patch("esphome.writer.CORE")
|
||||
def test_write_cpp_with_duplicate_markers(
|
||||
mock_core: MagicMock,
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
"""Test write_cpp raises error when duplicate markers exist."""
|
||||
# Create a file with duplicate begin markers
|
||||
main_cpp = tmp_path / "main.cpp"
|
||||
existing_content = f"""#include "esphome.h"
|
||||
{CPP_AUTO_GENERATE_BEGIN}
|
||||
// First section
|
||||
{CPP_AUTO_GENERATE_END}
|
||||
{CPP_AUTO_GENERATE_BEGIN}
|
||||
// Duplicate section
|
||||
{CPP_AUTO_GENERATE_END}"""
|
||||
main_cpp.write_text(existing_content)
|
||||
|
||||
# Setup mocks
|
||||
mock_core.relative_src_path.return_value = str(main_cpp)
|
||||
|
||||
# Call should raise an error
|
||||
with pytest.raises(EsphomeError, match="Found multiple auto generate code begins"):
|
||||
write_cpp("// New code")
|
||||
|
||||
Reference in New Issue
Block a user