1
0
mirror of https://github.com/esphome/esphome.git synced 2025-11-01 15:41:52 +00:00

Compare commits

...

22 Commits

Author SHA1 Message Date
Jesse Hills
e47f4ef602 Merge pull request #10796 from esphome/bump-2025.9.1
2025.9.1
2025-09-19 20:46:53 +12:00
Jesse Hills
961be7fd12 Bump version to 2025.9.1 2025-09-19 11:52:10 +12:00
J. Nick Koston
a5a21f47d1 [gpio] Fix unused function warnings when compiling with log level below DEBUG (#10779) 2025-09-19 11:52:09 +12:00
J. Nick Koston
a06cd84974 [core] Fix ESP8266 mDNS compilation failure caused by incorrect coroutine priorities (#10773) 2025-09-19 11:52:09 +12:00
Subhash Chandra
e3703b43c1 [packet_transport] Refactor sensor/provider list handling to be idempotent (#10765) 2025-09-19 11:52:09 +12:00
J. Nick Koston
f6dc25c0ce [mqtt] Fix KeyError when MQTT logging configured without explicit level (#10774) 2025-09-19 11:52:09 +12:00
Jesse Hills
d2df232706 Merge pull request #10763 from esphome/bump-2025.9.0
2025.9.0
2025-09-17 18:51:21 +12:00
Jesse Hills
404e679e66 Bump version to 2025.9.0 2025-09-17 11:02:12 +12:00
Jesse Hills
8d401ad05a Merge pull request #10761 from esphome/bump-2025.9.0b4
2025.9.0b4
2025-09-17 10:50:15 +12:00
Jesse Hills
e542816f7d Bump version to 2025.9.0b4 2025-09-17 09:22:54 +12:00
J. Nick Koston
12cadf0a04 [core] Fix clean build files to properly clear PlatformIO cache (#10754) 2025-09-17 09:22:54 +12:00
J. Nick Koston
adc3d3127d [wizard] Fix KeyError when running wizard with empty OTA password (#10753) 2025-09-17 09:22:54 +12:00
J. Nick Koston
61ab682099 Add additional coverage for util and writer (#10683) 2025-09-17 09:22:54 +12:00
Jesse Hills
c05b7cca5e Merge pull request #10752 from esphome/bump-2025.9.0b3
2025.9.0b3
2025-09-17 00:15:06 +12:00
Jesse Hills
6ac395da6d Bump version to 2025.9.0b3 2025-09-16 20:35:23 +12:00
jokujossai
54616ae1b4 [ade7880] fix channel a voltage registry (#10750) 2025-09-16 20:35:23 +12:00
jokujossai
e33dcda907 [mqtt] fix publish payload length when payload contains null characters (#10744) 2025-09-16 20:35:23 +12:00
J. Nick Koston
04c1b90e57 [ethernet] Conditionally compile PHY-specific code to reduce flash usage (#10747) 2025-09-16 20:35:23 +12:00
J. Nick Koston
ddb8fedef7 [dashboard] Fix archive handler to properly delete build folders using correct path (#10724) 2025-09-16 20:35:23 +12:00
J. Nick Koston
04f4f79cb4 [select] Use const references to avoid unnecessary vector copies (#10741) 2025-09-16 20:35:23 +12:00
J. Nick Koston
8890071360 Add additional test coverage ahead of Path conversion (#10700) 2025-09-16 20:35:23 +12:00
J. Nick Koston
4b3a997a8e Improve coverage for various core modules (#10663) 2025-09-16 20:35:23 +12:00
37 changed files with 3087 additions and 75 deletions

View File

@@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 2025.9.0b2
PROJECT_NUMBER = 2025.9.1
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

View File

@@ -212,7 +212,7 @@ def has_mqtt_logging() -> bool:
if CONF_TOPIC not in log_topic:
return False
return log_topic[CONF_LEVEL] != "NONE"
return log_topic.get(CONF_LEVEL, None) != "NONE"
def has_mqtt() -> bool:

View File

@@ -113,7 +113,7 @@ void ADE7880::update() {
if (this->channel_a_ != nullptr) {
auto *chan = this->channel_a_;
this->update_sensor_from_s24zp_register16_(chan->current, AIRMS, [](float val) { return val / 100000.0f; });
this->update_sensor_from_s24zp_register16_(chan->voltage, BVRMS, [](float val) { return val / 10000.0f; });
this->update_sensor_from_s24zp_register16_(chan->voltage, AVRMS, [](float val) { return val / 10000.0f; });
this->update_sensor_from_s24zp_register16_(chan->active_power, AWATT, [](float val) { return val / 100.0f; });
this->update_sensor_from_s24zp_register16_(chan->apparent_power, AVA, [](float val) { return val / 100.0f; });
this->update_sensor_from_s16_register16_(chan->power_factor, APF,

View File

@@ -10,7 +10,8 @@ from esphome.const import (
PLATFORM_LN882X,
PLATFORM_RTL87XX,
)
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
AUTO_LOAD = ["web_server_base", "ota.web_server"]
DEPENDENCIES = ["wifi"]
@@ -40,7 +41,7 @@ CONFIG_SCHEMA = cv.All(
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.CAPTIVE_PORTAL)
async def to_code(config):
paren = await cg.get_variable(config[CONF_WEB_SERVER_BASE_ID])

View File

@@ -16,7 +16,8 @@ from esphome.const import (
CONF_SAFE_MODE,
CONF_VERSION,
)
from esphome.core import CoroPriority, coroutine_with_priority
from esphome.core import coroutine_with_priority
from esphome.coroutine import CoroPriority
import esphome.final_validate as fv
_LOGGER = logging.getLogger(__name__)
@@ -121,7 +122,7 @@ CONFIG_SCHEMA = (
FINAL_VALIDATE_SCHEMA = ota_esphome_final_validate
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.OTA_UPDATES)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
cg.add(var.set_port(config[CONF_PORT]))

View File

@@ -77,6 +77,13 @@ ETHERNET_TYPES = {
"DM9051": EthernetType.ETHERNET_TYPE_DM9051,
}
# PHY types that need compile-time defines for conditional compilation
_PHY_TYPE_TO_DEFINE = {
"KSZ8081": "USE_ETHERNET_KSZ8081",
"KSZ8081RNA": "USE_ETHERNET_KSZ8081",
# Add other PHY types here only if they need conditional compilation
}
SPI_ETHERNET_TYPES = ["W5500", "DM9051"]
SPI_ETHERNET_DEFAULT_POLLING_INTERVAL = TimePeriodMilliseconds(milliseconds=10)
@@ -345,6 +352,10 @@ async def to_code(config):
if CONF_MANUAL_IP in config:
cg.add(var.set_manual_ip(manual_ip(config[CONF_MANUAL_IP])))
# Add compile-time define for PHY types with specific code
if phy_define := _PHY_TYPE_TO_DEFINE.get(config[CONF_TYPE]):
cg.add_define(phy_define)
cg.add_define("USE_ETHERNET")
# Disable WiFi when using Ethernet to save memory

View File

@@ -229,10 +229,12 @@ void EthernetComponent::setup() {
ESPHL_ERROR_CHECK(err, "ETH driver install error");
#ifndef USE_ETHERNET_SPI
#ifdef USE_ETHERNET_KSZ8081
if (this->type_ == ETHERNET_TYPE_KSZ8081RNA && this->clk_mode_ == EMAC_CLK_OUT) {
// KSZ8081RNA default is incorrect. It expects a 25MHz clock instead of the 50MHz we provide.
this->ksz8081_set_clock_reference_(mac);
}
#endif // USE_ETHERNET_KSZ8081
for (const auto &phy_register : this->phy_registers_) {
this->write_phy_register_(mac, phy_register);
@@ -721,6 +723,7 @@ bool EthernetComponent::powerdown() {
#ifndef USE_ETHERNET_SPI
#ifdef USE_ETHERNET_KSZ8081
constexpr uint8_t KSZ80XX_PC2R_REG_ADDR = 0x1F;
void EthernetComponent::ksz8081_set_clock_reference_(esp_eth_mac_t *mac) {
@@ -749,6 +752,7 @@ void EthernetComponent::ksz8081_set_clock_reference_(esp_eth_mac_t *mac) {
ESP_LOGVV(TAG, "KSZ8081 PHY Control 2: %s", format_hex_pretty((u_int8_t *) &phy_control_2, 2).c_str());
}
}
#endif // USE_ETHERNET_KSZ8081
void EthernetComponent::write_phy_register_(esp_eth_mac_t *mac, PHYRegister register_data) {
esp_err_t err;

View File

@@ -104,8 +104,10 @@ class EthernetComponent : public Component {
void start_connect_();
void finish_connect_();
void dump_connect_params_();
#ifdef USE_ETHERNET_KSZ8081
/// @brief Set `RMII Reference Clock Select` bit for KSZ8081.
void ksz8081_set_clock_reference_(esp_eth_mac_t *mac);
#endif
/// @brief Set arbitratry PHY registers from config.
void write_phy_register_(esp_eth_mac_t *mac, PHYRegister register_data);

View File

@@ -6,6 +6,7 @@ namespace gpio {
static const char *const TAG = "gpio.binary_sensor";
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_DEBUG
static const LogString *interrupt_type_to_string(gpio::InterruptType type) {
switch (type) {
case gpio::INTERRUPT_RISING_EDGE:
@@ -22,6 +23,7 @@ static const LogString *interrupt_type_to_string(gpio::InterruptType type) {
static const LogString *gpio_mode_to_string(bool use_interrupt) {
return use_interrupt ? LOG_STR("interrupt") : LOG_STR("polling");
}
#endif
void IRAM_ATTR GPIOBinarySensorStore::gpio_intr(GPIOBinarySensorStore *arg) {
bool new_state = arg->isr_pin_.digital_read();

View File

@@ -3,7 +3,8 @@ import esphome.codegen as cg
from esphome.components.ota import BASE_OTA_SCHEMA, OTAComponent, ota_to_code
import esphome.config_validation as cv
from esphome.const import CONF_ID, CONF_PASSWORD, CONF_URL, CONF_USERNAME
from esphome.core import CoroPriority, coroutine_with_priority
from esphome.core import coroutine_with_priority
from esphome.coroutine import CoroPriority
from .. import CONF_HTTP_REQUEST_ID, HttpRequestComponent, http_request_ns
@@ -40,7 +41,7 @@ CONFIG_SCHEMA = cv.All(
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.OTA_UPDATES)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await ota_to_code(var, config)

View File

@@ -11,7 +11,8 @@ from esphome.const import (
CONF_SERVICES,
PlatformFramework,
)
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"]
DEPENDENCIES = ["network"]
@@ -72,7 +73,7 @@ def mdns_service(
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.NETWORK_SERVICES)
async def to_code(config):
if config[CONF_DISABLED] is True:
return

View File

@@ -491,7 +491,7 @@ bool MQTTClientComponent::publish(const std::string &topic, const std::string &p
bool MQTTClientComponent::publish(const std::string &topic, const char *payload, size_t payload_length, uint8_t qos,
bool retain) {
return publish({.topic = topic, .payload = payload, .qos = qos, .retain = retain});
return publish({.topic = topic, .payload = std::string(payload, payload_length), .qos = qos, .retain = retain});
}
bool MQTTClientComponent::publish(const MQTTMessage &message) {

View File

@@ -10,7 +10,8 @@ from esphome.const import (
CONF_TRIGGER_ID,
PlatformFramework,
)
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"]
AUTO_LOAD = ["md5", "safe_mode"]
@@ -82,7 +83,7 @@ BASE_OTA_SCHEMA = cv.Schema(
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.OTA_UPDATES)
async def to_code(config):
cg.add_define("USE_OTA")

View File

@@ -121,15 +121,11 @@ def transport_schema(cls):
return TRANSPORT_SCHEMA.extend({cv.GenerateID(): cv.declare_id(cls)})
# Build a list of sensors for this platform
CORE.data[DOMAIN] = {CONF_SENSORS: []}
def get_sensors(transport_id):
"""Return the list of sensors for this platform."""
return (
sensor
for sensor in CORE.data[DOMAIN][CONF_SENSORS]
for sensor in CORE.data.setdefault(DOMAIN, {}).setdefault(CONF_SENSORS, [])
if sensor[CONF_TRANSPORT_ID] == transport_id
)
@@ -137,7 +133,8 @@ def get_sensors(transport_id):
def validate_packet_transport_sensor(config):
if CONF_NAME in config and CONF_INTERNAL not in config:
raise cv.Invalid("Must provide internal: config when using name:")
CORE.data[DOMAIN][CONF_SENSORS].append(config)
conf_sensors = CORE.data.setdefault(DOMAIN, {}).setdefault(CONF_SENSORS, [])
conf_sensors.append(config)
return config

View File

@@ -28,12 +28,12 @@ bool Select::has_option(const std::string &option) const { return this->index_of
bool Select::has_index(size_t index) const { return index < this->size(); }
size_t Select::size() const {
auto options = traits.get_options();
const auto &options = traits.get_options();
return options.size();
}
optional<size_t> Select::index_of(const std::string &option) const {
auto options = traits.get_options();
const auto &options = traits.get_options();
auto it = std::find(options.begin(), options.end(), option);
if (it == options.end()) {
return {};
@@ -51,7 +51,7 @@ optional<size_t> Select::active_index() const {
optional<std::string> Select::at(size_t index) const {
if (this->has_index(index)) {
auto options = traits.get_options();
const auto &options = traits.get_options();
return options.at(index);
} else {
return {};

View File

@@ -45,7 +45,7 @@ void SelectCall::perform() {
auto *parent = this->parent_;
const auto *name = parent->get_name().c_str();
const auto &traits = parent->traits;
auto options = traits.get_options();
const auto &options = traits.get_options();
if (this->operation_ == SELECT_OP_NONE) {
ESP_LOGW(TAG, "'%s' - SelectCall performed without selecting an operation", name);

View File

@@ -3,7 +3,8 @@ from esphome.components.esp32 import add_idf_component
from esphome.components.ota import BASE_OTA_SCHEMA, OTAComponent, ota_to_code
import esphome.config_validation as cv
from esphome.const import CONF_ID
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"]
DEPENDENCIES = ["network", "web_server_base"]
@@ -22,7 +23,7 @@ CONFIG_SCHEMA = (
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.WEB_SERVER_OTA)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await ota_to_code(var, config)

View File

@@ -1,7 +1,8 @@
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import CONF_ID
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"]
DEPENDENCIES = ["network"]
@@ -26,7 +27,7 @@ CONFIG_SCHEMA = cv.Schema(
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.WEB_SERVER_BASE)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await cg.register_component(var, config)

View File

@@ -4,7 +4,7 @@ from enum import Enum
from esphome.enum import StrEnum
__version__ = "2025.9.0b2"
__version__ = "2025.9.1"
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
VALID_SUBSTITUTIONS_CHARACTERS = (

View File

@@ -175,6 +175,7 @@
#ifdef USE_ARDUINO
#define USE_ARDUINO_VERSION_CODE VERSION_CODE(3, 2, 1)
#define USE_ETHERNET
#define USE_ETHERNET_KSZ8081
#endif
#ifdef USE_ESP_IDF

View File

@@ -90,11 +90,30 @@ class CoroPriority(enum.IntEnum):
# Examples: status_led (80)
STATUS = 80
# Web server infrastructure
# Examples: web_server_base (65)
WEB_SERVER_BASE = 65
# Network portal services
# Examples: captive_portal (64)
CAPTIVE_PORTAL = 64
# Communication protocols and services
# Examples: web_server_base (65), captive_portal (64), wifi (60), ethernet (60),
# mdns (55), ota_updates (54), web_server_ota (52)
# Examples: wifi (60), ethernet (60)
COMMUNICATION = 60
# Network discovery and management services
# Examples: mdns (55)
NETWORK_SERVICES = 55
# OTA update services
# Examples: ota_updates (54)
OTA_UPDATES = 54
# Web-based OTA services
# Examples: web_server_ota (52)
WEB_SERVER_OTA = 52
# Application-level services
# Examples: safe_mode (50)
APPLICATION = 50

View File

@@ -1038,12 +1038,9 @@ class ArchiveRequestHandler(BaseHandler):
shutil.move(config_file, os.path.join(archive_path, configuration))
storage_json = StorageJSON.load(storage_path)
if storage_json is not None:
if storage_json is not None and storage_json.build_path:
# Delete build folder (if exists)
name = storage_json.name
build_folder = os.path.join(settings.config_dir, name)
if build_folder is not None:
shutil.rmtree(build_folder, os.path.join(archive_path, name))
shutil.rmtree(storage_json.build_path, ignore_errors=True)
class UnArchiveRequestHandler(BaseHandler):

View File

@@ -1,6 +1,7 @@
import os
import random
import string
from typing import Literal, NotRequired, TypedDict, Unpack
import unicodedata
import voluptuous as vol
@@ -103,11 +104,25 @@ HARDWARE_BASE_CONFIGS = {
}
def sanitize_double_quotes(value):
def sanitize_double_quotes(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"')
def wizard_file(**kwargs):
class WizardFileKwargs(TypedDict):
"""Keyword arguments for wizard_file function."""
name: str
platform: Literal["ESP8266", "ESP32", "RP2040", "BK72XX", "LN882X", "RTL87XX"]
board: str
ssid: NotRequired[str]
psk: NotRequired[str]
password: NotRequired[str]
ota_password: NotRequired[str]
api_encryption_key: NotRequired[str]
friendly_name: NotRequired[str]
def wizard_file(**kwargs: Unpack[WizardFileKwargs]) -> str:
letters = string.ascii_letters + string.digits
ap_name_base = kwargs["name"].replace("_", " ").title()
ap_name = f"{ap_name_base} Fallback Hotspot"
@@ -180,7 +195,25 @@ captive_portal:
return config
def wizard_write(path, **kwargs):
class WizardWriteKwargs(TypedDict):
"""Keyword arguments for wizard_write function."""
name: str
type: Literal["basic", "empty", "upload"]
# Required for "basic" type
board: NotRequired[str]
platform: NotRequired[str]
ssid: NotRequired[str]
psk: NotRequired[str]
password: NotRequired[str]
ota_password: NotRequired[str]
api_encryption_key: NotRequired[str]
friendly_name: NotRequired[str]
# Required for "upload" type
file_text: NotRequired[str]
def wizard_write(path: str, **kwargs: Unpack[WizardWriteKwargs]) -> bool:
from esphome.components.bk72xx import boards as bk72xx_boards
from esphome.components.esp32 import boards as esp32_boards
from esphome.components.esp8266 import boards as esp8266_boards
@@ -237,14 +270,14 @@ def wizard_write(path, **kwargs):
if get_bool_env(ENV_QUICKWIZARD):
def sleep(time):
def sleep(time: float) -> None:
pass
else:
from time import sleep
def safe_print_step(step, big):
def safe_print_step(step: int, big: str) -> None:
safe_print()
safe_print()
safe_print(f"============= STEP {step} =============")
@@ -253,14 +286,14 @@ def safe_print_step(step, big):
sleep(0.25)
def default_input(text, default):
def default_input(text: str, default: str) -> str:
safe_print()
safe_print(f"Press ENTER for default ({default})")
return safe_input(text.format(default)) or default
# From https://stackoverflow.com/a/518232/8924614
def strip_accents(value):
def strip_accents(value: str) -> str:
return "".join(
c
for c in unicodedata.normalize("NFD", str(value))
@@ -268,7 +301,7 @@ def strip_accents(value):
)
def wizard(path):
def wizard(path: str) -> int:
from esphome.components.bk72xx import boards as bk72xx_boards
from esphome.components.esp32 import boards as esp32_boards
from esphome.components.esp8266 import boards as esp8266_boards
@@ -509,6 +542,7 @@ def wizard(path):
ssid=ssid,
psk=psk,
password=password,
type="basic",
):
return 1

View File

@@ -315,6 +315,19 @@ def clean_build():
_LOGGER.info("Deleting %s", dependencies_lock)
os.remove(dependencies_lock)
# Clean PlatformIO cache to resolve CMake compiler detection issues
# This helps when toolchain paths change or get corrupted
try:
from platformio.project.helpers import get_project_cache_dir
except ImportError:
# PlatformIO is not available, skip cache cleaning
pass
else:
cache_dir = get_project_cache_dir()
if cache_dir and cache_dir.strip() and os.path.isdir(cache_dir):
_LOGGER.info("Deleting PlatformIO cache %s", cache_dir)
shutil.rmtree(cache_dir)
GITIGNORE_CONTENT = """# Gitignore settings for ESPHome
# This is an example and may include too much for your use-case.

View File

@@ -0,0 +1,42 @@
# Comprehensive ESP8266 test for mdns with multiple network components
# Tests the complete priority chain:
# wifi (60) -> mdns (55) -> ota (54) -> web_server_ota (52)
esphome:
name: mdns-comprehensive-test
esp8266:
board: esp01_1m
logger:
level: DEBUG
wifi:
ssid: MySSID
password: password1
# web_server_base should run at priority 65 (before wifi)
web_server:
port: 80
# mdns should run at priority 55 (after wifi at 60)
mdns:
services:
- service: _http
protocol: _tcp
port: 80
# OTA should run at priority 54 (after mdns)
ota:
- platform: esphome
password: "otapassword"
# Test status LED at priority 80
status_led:
pin:
number: GPIO2
inverted: true
# Include API at priority 40
api:
password: "apipassword"

View File

@@ -589,7 +589,7 @@ async def test_archive_request_handler_post(
mock_ext_storage_path: MagicMock,
tmp_path: Path,
) -> None:
"""Test ArchiveRequestHandler.post method."""
"""Test ArchiveRequestHandler.post method without storage_json."""
# Set up temp directories
config_dir = Path(get_fixture_path("conf"))
@@ -616,6 +616,97 @@ async def test_archive_request_handler_post(
).read_text() == "esphome:\n name: test_archive\n"
@pytest.mark.asyncio
async def test_archive_handler_with_build_folder(
dashboard: DashboardTestHelper,
mock_archive_storage_path: MagicMock,
mock_ext_storage_path: MagicMock,
mock_dashboard_settings: MagicMock,
mock_storage_json: MagicMock,
tmp_path: Path,
) -> None:
"""Test ArchiveRequestHandler.post with storage_json and build folder."""
config_dir = tmp_path / "config"
config_dir.mkdir()
archive_dir = tmp_path / "archive"
archive_dir.mkdir()
build_dir = tmp_path / "build"
build_dir.mkdir()
configuration = "test_device.yaml"
test_config = config_dir / configuration
test_config.write_text("esphome:\n name: test_device\n")
build_folder = build_dir / "test_device"
build_folder.mkdir()
(build_folder / "firmware.bin").write_text("binary content")
(build_folder / ".pioenvs").mkdir()
mock_dashboard_settings.config_dir = str(config_dir)
mock_dashboard_settings.rel_path.return_value = str(test_config)
mock_archive_storage_path.return_value = str(archive_dir)
mock_storage = MagicMock()
mock_storage.name = "test_device"
mock_storage.build_path = str(build_folder)
mock_storage_json.load.return_value = mock_storage
response = await dashboard.fetch(
"/archive",
method="POST",
body=f"configuration={configuration}",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
assert response.code == 200
assert not test_config.exists()
assert (archive_dir / configuration).exists()
assert not build_folder.exists()
assert not (archive_dir / "test_device").exists()
@pytest.mark.asyncio
async def test_archive_handler_no_build_folder(
dashboard: DashboardTestHelper,
mock_archive_storage_path: MagicMock,
mock_ext_storage_path: MagicMock,
mock_dashboard_settings: MagicMock,
mock_storage_json: MagicMock,
tmp_path: Path,
) -> None:
"""Test ArchiveRequestHandler.post with storage_json but no build folder."""
config_dir = tmp_path / "config"
config_dir.mkdir()
archive_dir = tmp_path / "archive"
archive_dir.mkdir()
configuration = "test_device.yaml"
test_config = config_dir / configuration
test_config.write_text("esphome:\n name: test_device\n")
mock_dashboard_settings.config_dir = str(config_dir)
mock_dashboard_settings.rel_path.return_value = str(test_config)
mock_archive_storage_path.return_value = str(archive_dir)
mock_storage = MagicMock()
mock_storage.name = "test_device"
mock_storage.build_path = None
mock_storage_json.load.return_value = mock_storage
response = await dashboard.fetch(
"/archive",
method="POST",
body=f"configuration={configuration}",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
assert response.code == 200
assert not test_config.exists()
assert (archive_dir / configuration).exists()
assert not (archive_dir / "test_device").exists()
@pytest.mark.skipif(os.name == "nt", reason="Unix sockets are not supported on Windows")
@pytest.mark.usefixtures("mock_trash_storage_path", "mock_archive_storage_path")
def test_start_web_server_with_unix_socket(tmp_path: Path) -> None:

View File

@@ -9,8 +9,10 @@ not be part of a unit test suite.
"""
from collections.abc import Generator
from pathlib import Path
import sys
from unittest.mock import Mock, patch
import pytest
@@ -36,3 +38,52 @@ def fixture_path() -> Path:
Location of all fixture files.
"""
return here / "fixtures"
@pytest.fixture
def setup_core(tmp_path: Path) -> Path:
"""Set up CORE with test paths."""
CORE.config_path = str(tmp_path / "test.yaml")
return tmp_path
@pytest.fixture
def mock_write_file_if_changed() -> Generator[Mock, None, None]:
"""Mock write_file_if_changed for storage_json."""
with patch("esphome.storage_json.write_file_if_changed") as mock:
yield mock
@pytest.fixture
def mock_copy_file_if_changed() -> Generator[Mock, None, None]:
"""Mock copy_file_if_changed for core.config."""
with patch("esphome.core.config.copy_file_if_changed") as mock:
yield mock
@pytest.fixture
def mock_run_platformio_cli() -> Generator[Mock, None, None]:
"""Mock run_platformio_cli for platformio_api."""
with patch("esphome.platformio_api.run_platformio_cli") as mock:
yield mock
@pytest.fixture
def mock_run_platformio_cli_run() -> Generator[Mock, None, None]:
"""Mock run_platformio_cli_run for platformio_api."""
with patch("esphome.platformio_api.run_platformio_cli_run") as mock:
yield mock
@pytest.fixture
def mock_decode_pc() -> Generator[Mock, None, None]:
"""Mock _decode_pc for platformio_api."""
with patch("esphome.platformio_api._decode_pc") as mock:
yield mock
@pytest.fixture
def mock_run_external_command() -> Generator[Mock, None, None]:
"""Mock run_external_command for platformio_api."""
with patch("esphome.platformio_api.run_external_command") as mock:
yield mock

View File

@@ -1,15 +1,34 @@
"""Unit tests for core config functionality including areas and devices."""
from collections.abc import Callable
import os
from pathlib import Path
import types
from typing import Any
from unittest.mock import MagicMock, Mock, patch
import pytest
from esphome import config_validation as cv, core
from esphome.const import CONF_AREA, CONF_AREAS, CONF_DEVICES
from esphome.core import config
from esphome.core.config import Area, validate_area_config
from esphome.const import (
CONF_AREA,
CONF_AREAS,
CONF_BUILD_PATH,
CONF_DEVICES,
CONF_ESPHOME,
CONF_NAME,
CONF_NAME_ADD_MAC_SUFFIX,
KEY_CORE,
)
from esphome.core import CORE, config
from esphome.core.config import (
Area,
preload_core_config,
valid_include,
valid_project_name,
validate_area_config,
validate_hostname,
)
from .common import load_config_from_fixture
@@ -245,3 +264,316 @@ def test_add_platform_defines_priority() -> None:
f"_add_platform_defines priority ({config._add_platform_defines.priority}) must be lower than "
f"globals priority ({globals_to_code.priority}) to fix issue #10431 (sensor count bug with lambdas)"
)
def test_valid_include_with_angle_brackets() -> None:
"""Test valid_include accepts angle bracket includes."""
assert valid_include("<ArduinoJson.h>") == "<ArduinoJson.h>"
def test_valid_include_with_valid_file(tmp_path: Path) -> None:
"""Test valid_include accepts valid include files."""
CORE.config_path = str(tmp_path / "test.yaml")
include_file = tmp_path / "include.h"
include_file.touch()
assert valid_include(str(include_file)) == str(include_file)
def test_valid_include_with_valid_directory(tmp_path: Path) -> None:
"""Test valid_include accepts valid directories."""
CORE.config_path = str(tmp_path / "test.yaml")
include_dir = tmp_path / "includes"
include_dir.mkdir()
assert valid_include(str(include_dir)) == str(include_dir)
def test_valid_include_invalid_extension(tmp_path: Path) -> None:
"""Test valid_include rejects files with invalid extensions."""
CORE.config_path = str(tmp_path / "test.yaml")
invalid_file = tmp_path / "file.txt"
invalid_file.touch()
with pytest.raises(cv.Invalid, match="Include has invalid file extension"):
valid_include(str(invalid_file))
def test_valid_project_name_valid() -> None:
"""Test valid_project_name accepts valid project names."""
assert valid_project_name("esphome.my_project") == "esphome.my_project"
def test_valid_project_name_no_namespace() -> None:
"""Test valid_project_name rejects names without namespace."""
with pytest.raises(cv.Invalid, match="project name needs to have a namespace"):
valid_project_name("my_project")
def test_valid_project_name_multiple_dots() -> None:
"""Test valid_project_name rejects names with multiple dots."""
with pytest.raises(cv.Invalid, match="project name needs to have a namespace"):
valid_project_name("esphome.my.project")
def test_validate_hostname_valid() -> None:
"""Test validate_hostname accepts valid hostnames."""
config = {CONF_NAME: "my-device", CONF_NAME_ADD_MAC_SUFFIX: False}
assert validate_hostname(config) == config
def test_validate_hostname_too_long() -> None:
"""Test validate_hostname rejects hostnames that are too long."""
config = {
CONF_NAME: "a" * 32, # 32 chars, max is 31
CONF_NAME_ADD_MAC_SUFFIX: False,
}
with pytest.raises(cv.Invalid, match="Hostnames can only be 31 characters long"):
validate_hostname(config)
def test_validate_hostname_too_long_with_mac_suffix() -> None:
"""Test validate_hostname accounts for MAC suffix length."""
config = {
CONF_NAME: "a" * 25, # 25 chars, max is 24 with MAC suffix
CONF_NAME_ADD_MAC_SUFFIX: True,
}
with pytest.raises(cv.Invalid, match="Hostnames can only be 24 characters long"):
validate_hostname(config)
def test_validate_hostname_with_underscore(caplog) -> None:
"""Test validate_hostname warns about underscores."""
config = {CONF_NAME: "my_device", CONF_NAME_ADD_MAC_SUFFIX: False}
assert validate_hostname(config) == config
assert (
"Using the '_' (underscore) character in the hostname is discouraged"
in caplog.text
)
def test_preload_core_config_basic(setup_core: Path) -> None:
"""Test preload_core_config sets basic CORE attributes."""
config = {
CONF_ESPHOME: {
CONF_NAME: "test_device",
},
"esp32": {},
}
result = {}
platform = preload_core_config(config, result)
assert CORE.name == "test_device"
assert platform == "esp32"
assert KEY_CORE in CORE.data
assert CONF_BUILD_PATH in config[CONF_ESPHOME]
# Verify default build path is "build/<device_name>"
build_path = config[CONF_ESPHOME][CONF_BUILD_PATH]
assert build_path.endswith(os.path.join("build", "test_device"))
def test_preload_core_config_with_build_path(setup_core: Path) -> None:
"""Test preload_core_config uses provided build path."""
config = {
CONF_ESPHOME: {
CONF_NAME: "test_device",
CONF_BUILD_PATH: "/custom/build/path",
},
"esp8266": {},
}
result = {}
platform = preload_core_config(config, result)
assert config[CONF_ESPHOME][CONF_BUILD_PATH] == "/custom/build/path"
assert platform == "esp8266"
def test_preload_core_config_env_build_path(setup_core: Path) -> None:
"""Test preload_core_config uses ESPHOME_BUILD_PATH env var."""
config = {
CONF_ESPHOME: {
CONF_NAME: "test_device",
},
"rp2040": {},
}
result = {}
with patch.dict(os.environ, {"ESPHOME_BUILD_PATH": "/env/build"}):
platform = preload_core_config(config, result)
assert CONF_BUILD_PATH in config[CONF_ESPHOME]
assert "test_device" in config[CONF_ESPHOME][CONF_BUILD_PATH]
# Verify it uses the env var path with device name appended
build_path = config[CONF_ESPHOME][CONF_BUILD_PATH]
expected_path = os.path.join("/env/build", "test_device")
assert build_path == expected_path or build_path == expected_path.replace(
"/", os.sep
)
assert platform == "rp2040"
def test_preload_core_config_no_platform(setup_core: Path) -> None:
"""Test preload_core_config raises when no platform is specified."""
config = {
CONF_ESPHOME: {
CONF_NAME: "test_device",
},
}
result = {}
# Mock _is_target_platform to avoid expensive component loading
with patch("esphome.core.config._is_target_platform") as mock_is_platform:
# Return True for known platforms
mock_is_platform.side_effect = lambda name: name in [
"esp32",
"esp8266",
"rp2040",
]
with pytest.raises(cv.Invalid, match="Platform missing"):
preload_core_config(config, result)
def test_preload_core_config_multiple_platforms(setup_core: Path) -> None:
"""Test preload_core_config raises when multiple platforms are specified."""
config = {
CONF_ESPHOME: {
CONF_NAME: "test_device",
},
"esp32": {},
"esp8266": {},
}
result = {}
# Mock _is_target_platform to avoid expensive component loading
with patch("esphome.core.config._is_target_platform") as mock_is_platform:
# Return True for known platforms
mock_is_platform.side_effect = lambda name: name in [
"esp32",
"esp8266",
"rp2040",
]
with pytest.raises(cv.Invalid, match="Found multiple target platform blocks"):
preload_core_config(config, result)
def test_include_file_header(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None:
"""Test include_file adds include statement for header files."""
src_file = tmp_path / "source.h"
src_file.write_text("// Header content")
CORE.build_path = str(tmp_path / "build")
with patch("esphome.core.config.cg") as mock_cg:
# Mock RawStatement to capture the text
mock_raw_statement = MagicMock()
mock_raw_statement.text = ""
def raw_statement_side_effect(text):
mock_raw_statement.text = text
return mock_raw_statement
mock_cg.RawStatement.side_effect = raw_statement_side_effect
config.include_file(str(src_file), "test.h")
mock_copy_file_if_changed.assert_called_once()
mock_cg.add_global.assert_called_once()
# Check that include statement was added
assert '#include "test.h"' in mock_raw_statement.text
def test_include_file_cpp(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None:
"""Test include_file does not add include for cpp files."""
src_file = tmp_path / "source.cpp"
src_file.write_text("// CPP content")
CORE.build_path = str(tmp_path / "build")
with patch("esphome.core.config.cg") as mock_cg:
config.include_file(str(src_file), "test.cpp")
mock_copy_file_if_changed.assert_called_once()
# Should not add include statement for .cpp files
mock_cg.add_global.assert_not_called()
def test_get_usable_cpu_count() -> None:
"""Test get_usable_cpu_count returns CPU count."""
count = config.get_usable_cpu_count()
assert isinstance(count, int)
assert count > 0
def test_get_usable_cpu_count_with_process_cpu_count() -> None:
"""Test get_usable_cpu_count uses process_cpu_count when available."""
# Test with process_cpu_count (Python 3.13+)
# Create a mock os module with process_cpu_count
mock_os = types.SimpleNamespace(process_cpu_count=lambda: 8, cpu_count=lambda: 4)
with patch("esphome.core.config.os", mock_os):
# When process_cpu_count exists, it should be used
count = config.get_usable_cpu_count()
assert count == 8
# Test fallback to cpu_count when process_cpu_count not available
mock_os_no_process = types.SimpleNamespace(cpu_count=lambda: 4)
with patch("esphome.core.config.os", mock_os_no_process):
count = config.get_usable_cpu_count()
assert count == 4
def test_list_target_platforms(tmp_path: Path) -> None:
"""Test _list_target_platforms returns available platforms."""
# Create mock components directory structure
components_dir = tmp_path / "components"
components_dir.mkdir()
# Create platform and non-platform directories with __init__.py
platforms = ["esp32", "esp8266", "rp2040", "libretiny", "host"]
non_platforms = ["sensor"]
for component in platforms + non_platforms:
component_dir = components_dir / component
component_dir.mkdir()
(component_dir / "__init__.py").touch()
# Create a file (not a directory)
(components_dir / "README.md").touch()
# Create a directory without __init__.py
(components_dir / "no_init").mkdir()
# Mock Path(__file__).parents[1] to return our tmp_path
with patch("esphome.core.config.Path") as mock_path:
mock_file_path = MagicMock()
mock_file_path.parents = [MagicMock(), tmp_path]
mock_path.return_value = mock_file_path
platforms = config._list_target_platforms()
assert isinstance(platforms, list)
# Should include platform components
assert "esp32" in platforms
assert "esp8266" in platforms
assert "rp2040" in platforms
assert "libretiny" in platforms
assert "host" in platforms
# Should not include non-platform components
assert "sensor" not in platforms
assert "README.md" not in platforms
assert "no_init" not in platforms
def test_is_target_platform() -> None:
"""Test _is_target_platform identifies valid platforms."""
assert config._is_target_platform("esp32") is True
assert config._is_target_platform("esp8266") is True
assert config._is_target_platform("rp2040") is True
assert config._is_target_platform("invalid_platform") is False
assert config._is_target_platform("api") is False # Component but not platform

View File

@@ -0,0 +1,187 @@
"""Tests for config_validation.py path-related functions."""
from pathlib import Path
import pytest
import voluptuous as vol
from esphome import config_validation as cv
def test_directory_valid_path(setup_core: Path) -> None:
"""Test directory validator with valid directory."""
test_dir = setup_core / "test_directory"
test_dir.mkdir()
result = cv.directory("test_directory")
assert result == "test_directory"
def test_directory_absolute_path(setup_core: Path) -> None:
"""Test directory validator with absolute path."""
test_dir = setup_core / "test_directory"
test_dir.mkdir()
result = cv.directory(str(test_dir))
assert result == str(test_dir)
def test_directory_nonexistent_path(setup_core: Path) -> None:
"""Test directory validator raises error for non-existent directory."""
with pytest.raises(
vol.Invalid, match="Could not find directory.*nonexistent_directory"
):
cv.directory("nonexistent_directory")
def test_directory_file_instead_of_directory(setup_core: Path) -> None:
"""Test directory validator raises error when path is a file."""
test_file = setup_core / "test_file.txt"
test_file.write_text("content")
with pytest.raises(vol.Invalid, match="is not a directory"):
cv.directory("test_file.txt")
def test_directory_with_parent_directory(setup_core: Path) -> None:
"""Test directory validator with nested directory structure."""
nested_dir = setup_core / "parent" / "child" / "grandchild"
nested_dir.mkdir(parents=True)
result = cv.directory("parent/child/grandchild")
assert result == "parent/child/grandchild"
def test_file_valid_path(setup_core: Path) -> None:
"""Test file_ validator with valid file."""
test_file = setup_core / "test_file.yaml"
test_file.write_text("test content")
result = cv.file_("test_file.yaml")
assert result == "test_file.yaml"
def test_file_absolute_path(setup_core: Path) -> None:
"""Test file_ validator with absolute path."""
test_file = setup_core / "test_file.yaml"
test_file.write_text("test content")
result = cv.file_(str(test_file))
assert result == str(test_file)
def test_file_nonexistent_path(setup_core: Path) -> None:
"""Test file_ validator raises error for non-existent file."""
with pytest.raises(vol.Invalid, match="Could not find file.*nonexistent_file.yaml"):
cv.file_("nonexistent_file.yaml")
def test_file_directory_instead_of_file(setup_core: Path) -> None:
"""Test file_ validator raises error when path is a directory."""
test_dir = setup_core / "test_directory"
test_dir.mkdir()
with pytest.raises(vol.Invalid, match="is not a file"):
cv.file_("test_directory")
def test_file_with_parent_directory(setup_core: Path) -> None:
"""Test file_ validator with file in nested directory."""
nested_dir = setup_core / "configs" / "sensors"
nested_dir.mkdir(parents=True)
test_file = nested_dir / "temperature.yaml"
test_file.write_text("sensor config")
result = cv.file_("configs/sensors/temperature.yaml")
assert result == "configs/sensors/temperature.yaml"
def test_directory_handles_trailing_slash(setup_core: Path) -> None:
"""Test directory validator handles trailing slashes correctly."""
test_dir = setup_core / "test_dir"
test_dir.mkdir()
result = cv.directory("test_dir/")
assert result == "test_dir/"
result = cv.directory("test_dir")
assert result == "test_dir"
def test_file_handles_various_extensions(setup_core: Path) -> None:
"""Test file_ validator works with different file extensions."""
yaml_file = setup_core / "config.yaml"
yaml_file.write_text("yaml content")
assert cv.file_("config.yaml") == "config.yaml"
yml_file = setup_core / "config.yml"
yml_file.write_text("yml content")
assert cv.file_("config.yml") == "config.yml"
txt_file = setup_core / "readme.txt"
txt_file.write_text("text content")
assert cv.file_("readme.txt") == "readme.txt"
no_ext_file = setup_core / "LICENSE"
no_ext_file.write_text("license content")
assert cv.file_("LICENSE") == "LICENSE"
def test_directory_with_symlink(setup_core: Path) -> None:
"""Test directory validator follows symlinks."""
actual_dir = setup_core / "actual_directory"
actual_dir.mkdir()
symlink_dir = setup_core / "symlink_directory"
symlink_dir.symlink_to(actual_dir)
result = cv.directory("symlink_directory")
assert result == "symlink_directory"
def test_file_with_symlink(setup_core: Path) -> None:
"""Test file_ validator follows symlinks."""
actual_file = setup_core / "actual_file.txt"
actual_file.write_text("content")
symlink_file = setup_core / "symlink_file.txt"
symlink_file.symlink_to(actual_file)
result = cv.file_("symlink_file.txt")
assert result == "symlink_file.txt"
def test_directory_error_shows_full_path(setup_core: Path) -> None:
"""Test directory validator error message includes full path."""
with pytest.raises(vol.Invalid, match=".*missing_dir.*full path:.*"):
cv.directory("missing_dir")
def test_file_error_shows_full_path(setup_core: Path) -> None:
"""Test file_ validator error message includes full path."""
with pytest.raises(vol.Invalid, match=".*missing_file.yaml.*full path:.*"):
cv.file_("missing_file.yaml")
def test_directory_with_spaces_in_name(setup_core: Path) -> None:
"""Test directory validator handles spaces in directory names."""
dir_with_spaces = setup_core / "my test directory"
dir_with_spaces.mkdir()
result = cv.directory("my test directory")
assert result == "my test directory"
def test_file_with_spaces_in_name(setup_core: Path) -> None:
"""Test file_ validator handles spaces in file names."""
file_with_spaces = setup_core / "my test file.yaml"
file_with_spaces.write_text("content")
result = cv.file_("my test file.yaml")
assert result == "my test file.yaml"

View File

@@ -13,7 +13,12 @@ def test_coro_priority_enum_values() -> None:
assert CoroPriority.CORE == 100
assert CoroPriority.DIAGNOSTICS == 90
assert CoroPriority.STATUS == 80
assert CoroPriority.WEB_SERVER_BASE == 65
assert CoroPriority.CAPTIVE_PORTAL == 64
assert CoroPriority.COMMUNICATION == 60
assert CoroPriority.NETWORK_SERVICES == 55
assert CoroPriority.OTA_UPDATES == 54
assert CoroPriority.WEB_SERVER_OTA == 52
assert CoroPriority.APPLICATION == 50
assert CoroPriority.WEB == 40
assert CoroPriority.AUTOMATION == 30
@@ -70,7 +75,12 @@ def test_float_and_enum_are_interchangeable() -> None:
(CoroPriority.CORE, 100.0),
(CoroPriority.DIAGNOSTICS, 90.0),
(CoroPriority.STATUS, 80.0),
(CoroPriority.WEB_SERVER_BASE, 65.0),
(CoroPriority.CAPTIVE_PORTAL, 64.0),
(CoroPriority.COMMUNICATION, 60.0),
(CoroPriority.NETWORK_SERVICES, 55.0),
(CoroPriority.OTA_UPDATES, 54.0),
(CoroPriority.WEB_SERVER_OTA, 52.0),
(CoroPriority.APPLICATION, 50.0),
(CoroPriority.WEB, 40.0),
(CoroPriority.AUTOMATION, 30.0),
@@ -164,8 +174,13 @@ def test_enum_priority_comparison() -> None:
assert CoroPriority.NETWORK_TRANSPORT > CoroPriority.CORE
assert CoroPriority.CORE > CoroPriority.DIAGNOSTICS
assert CoroPriority.DIAGNOSTICS > CoroPriority.STATUS
assert CoroPriority.STATUS > CoroPriority.COMMUNICATION
assert CoroPriority.COMMUNICATION > CoroPriority.APPLICATION
assert CoroPriority.STATUS > CoroPriority.WEB_SERVER_BASE
assert CoroPriority.WEB_SERVER_BASE > CoroPriority.CAPTIVE_PORTAL
assert CoroPriority.CAPTIVE_PORTAL > CoroPriority.COMMUNICATION
assert CoroPriority.COMMUNICATION > CoroPriority.NETWORK_SERVICES
assert CoroPriority.NETWORK_SERVICES > CoroPriority.OTA_UPDATES
assert CoroPriority.OTA_UPDATES > CoroPriority.WEB_SERVER_OTA
assert CoroPriority.WEB_SERVER_OTA > CoroPriority.APPLICATION
assert CoroPriority.APPLICATION > CoroPriority.WEB
assert CoroPriority.WEB > CoroPriority.AUTOMATION
assert CoroPriority.AUTOMATION > CoroPriority.BUS

View File

@@ -0,0 +1,196 @@
"""Tests for external_files.py functions."""
from pathlib import Path
import time
from unittest.mock import MagicMock, patch
import pytest
import requests
from esphome import external_files
from esphome.config_validation import Invalid
from esphome.core import CORE, TimePeriod
def test_compute_local_file_dir(setup_core: Path) -> None:
"""Test compute_local_file_dir creates and returns correct path."""
domain = "font"
result = external_files.compute_local_file_dir(domain)
assert isinstance(result, Path)
assert result == Path(CORE.data_dir) / domain
assert result.exists()
assert result.is_dir()
def test_compute_local_file_dir_nested(setup_core: Path) -> None:
"""Test compute_local_file_dir works with nested domains."""
domain = "images/icons"
result = external_files.compute_local_file_dir(domain)
assert result == Path(CORE.data_dir) / "images" / "icons"
assert result.exists()
assert result.is_dir()
def test_is_file_recent_with_recent_file(setup_core: Path) -> None:
"""Test is_file_recent returns True for recently created file."""
test_file = setup_core / "recent.txt"
test_file.write_text("content")
refresh = TimePeriod(seconds=3600)
result = external_files.is_file_recent(str(test_file), refresh)
assert result is True
def test_is_file_recent_with_old_file(setup_core: Path) -> None:
"""Test is_file_recent returns False for old file."""
test_file = setup_core / "old.txt"
test_file.write_text("content")
old_time = time.time() - 7200
with patch("os.path.getctime", return_value=old_time):
refresh = TimePeriod(seconds=3600)
result = external_files.is_file_recent(str(test_file), refresh)
assert result is False
def test_is_file_recent_nonexistent_file(setup_core: Path) -> None:
"""Test is_file_recent returns False for non-existent file."""
test_file = setup_core / "nonexistent.txt"
refresh = TimePeriod(seconds=3600)
result = external_files.is_file_recent(str(test_file), refresh)
assert result is False
def test_is_file_recent_with_zero_refresh(setup_core: Path) -> None:
"""Test is_file_recent with zero refresh period returns False."""
test_file = setup_core / "test.txt"
test_file.write_text("content")
# Mock getctime to return a time 10 seconds ago
with patch("os.path.getctime", return_value=time.time() - 10):
refresh = TimePeriod(seconds=0)
result = external_files.is_file_recent(str(test_file), refresh)
assert result is False
@patch("esphome.external_files.requests.head")
def test_has_remote_file_changed_not_modified(
mock_head: MagicMock, setup_core: Path
) -> None:
"""Test has_remote_file_changed returns False when file not modified."""
test_file = setup_core / "cached.txt"
test_file.write_text("cached content")
mock_response = MagicMock()
mock_response.status_code = 304
mock_head.return_value = mock_response
url = "https://example.com/file.txt"
result = external_files.has_remote_file_changed(url, str(test_file))
assert result is False
mock_head.assert_called_once()
call_args = mock_head.call_args
headers = call_args[1]["headers"]
assert external_files.IF_MODIFIED_SINCE in headers
assert external_files.CACHE_CONTROL in headers
@patch("esphome.external_files.requests.head")
def test_has_remote_file_changed_modified(
mock_head: MagicMock, setup_core: Path
) -> None:
"""Test has_remote_file_changed returns True when file modified."""
test_file = setup_core / "cached.txt"
test_file.write_text("cached content")
mock_response = MagicMock()
mock_response.status_code = 200
mock_head.return_value = mock_response
url = "https://example.com/file.txt"
result = external_files.has_remote_file_changed(url, str(test_file))
assert result is True
def test_has_remote_file_changed_no_local_file(setup_core: Path) -> None:
"""Test has_remote_file_changed returns True when local file doesn't exist."""
test_file = setup_core / "nonexistent.txt"
url = "https://example.com/file.txt"
result = external_files.has_remote_file_changed(url, str(test_file))
assert result is True
@patch("esphome.external_files.requests.head")
def test_has_remote_file_changed_network_error(
mock_head: MagicMock, setup_core: Path
) -> None:
"""Test has_remote_file_changed handles network errors gracefully."""
test_file = setup_core / "cached.txt"
test_file.write_text("cached content")
mock_head.side_effect = requests.exceptions.RequestException("Network error")
url = "https://example.com/file.txt"
with pytest.raises(Invalid, match="Could not check if.*Network error"):
external_files.has_remote_file_changed(url, str(test_file))
@patch("esphome.external_files.requests.head")
def test_has_remote_file_changed_timeout(
mock_head: MagicMock, setup_core: Path
) -> None:
"""Test has_remote_file_changed respects timeout."""
test_file = setup_core / "cached.txt"
test_file.write_text("cached content")
mock_response = MagicMock()
mock_response.status_code = 304
mock_head.return_value = mock_response
url = "https://example.com/file.txt"
external_files.has_remote_file_changed(url, str(test_file))
call_args = mock_head.call_args
assert call_args[1]["timeout"] == external_files.NETWORK_TIMEOUT
def test_compute_local_file_dir_creates_parent_dirs(setup_core: Path) -> None:
"""Test compute_local_file_dir creates parent directories."""
domain = "level1/level2/level3/level4"
result = external_files.compute_local_file_dir(domain)
assert result.exists()
assert result.is_dir()
assert result.parent.name == "level3"
assert result.parent.parent.name == "level2"
assert result.parent.parent.parent.name == "level1"
def test_is_file_recent_handles_float_seconds(setup_core: Path) -> None:
"""Test is_file_recent works with float seconds in TimePeriod."""
test_file = setup_core / "test.txt"
test_file.write_text("content")
refresh = TimePeriod(seconds=3600.5)
result = external_files.is_file_recent(str(test_file), refresh)
assert result is True

View File

@@ -1226,6 +1226,18 @@ def test_has_mqtt_logging_no_log_topic() -> None:
setup_core(config={})
assert has_mqtt_logging() is False
# Setup MQTT config with CONF_LOG_TOPIC but no CONF_LEVEL (regression test for #10771)
# This simulates the default configuration created by validate_config in the MQTT component
setup_core(
config={
CONF_MQTT: {
CONF_BROKER: "mqtt.local",
CONF_LOG_TOPIC: {CONF_TOPIC: "esphome/debug"},
}
}
)
assert has_mqtt_logging() is True
def test_has_mqtt() -> None:
"""Test has_mqtt function."""

View File

@@ -0,0 +1,636 @@
"""Tests for platformio_api.py path functions."""
import json
import os
from pathlib import Path
import shutil
from types import SimpleNamespace
from unittest.mock import MagicMock, Mock, patch
import pytest
from esphome import platformio_api
from esphome.core import CORE, EsphomeError
def test_idedata_firmware_elf_path(setup_core: Path) -> None:
"""Test IDEData.firmware_elf_path returns correct path."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
raw_data = {"prog_path": "/path/to/firmware.elf"}
idedata = platformio_api.IDEData(raw_data)
assert idedata.firmware_elf_path == "/path/to/firmware.elf"
def test_idedata_firmware_bin_path(setup_core: Path) -> None:
"""Test IDEData.firmware_bin_path returns Path with .bin extension."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
prog_path = str(Path("/path/to/firmware.elf"))
raw_data = {"prog_path": prog_path}
idedata = platformio_api.IDEData(raw_data)
result = idedata.firmware_bin_path
assert isinstance(result, str)
expected = str(Path("/path/to/firmware.bin"))
assert result == expected
assert result.endswith(".bin")
def test_idedata_firmware_bin_path_preserves_directory(setup_core: Path) -> None:
"""Test firmware_bin_path preserves the directory structure."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
prog_path = str(Path("/complex/path/to/build/firmware.elf"))
raw_data = {"prog_path": prog_path}
idedata = platformio_api.IDEData(raw_data)
result = idedata.firmware_bin_path
expected = str(Path("/complex/path/to/build/firmware.bin"))
assert result == expected
def test_idedata_extra_flash_images(setup_core: Path) -> None:
"""Test IDEData.extra_flash_images returns list of FlashImage objects."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
raw_data = {
"prog_path": "/path/to/firmware.elf",
"extra": {
"flash_images": [
{"path": "/path/to/bootloader.bin", "offset": "0x1000"},
{"path": "/path/to/partition.bin", "offset": "0x8000"},
]
},
}
idedata = platformio_api.IDEData(raw_data)
images = idedata.extra_flash_images
assert len(images) == 2
assert all(isinstance(img, platformio_api.FlashImage) for img in images)
assert images[0].path == "/path/to/bootloader.bin"
assert images[0].offset == "0x1000"
assert images[1].path == "/path/to/partition.bin"
assert images[1].offset == "0x8000"
def test_idedata_extra_flash_images_empty(setup_core: Path) -> None:
"""Test extra_flash_images returns empty list when no extra images."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
raw_data = {"prog_path": "/path/to/firmware.elf", "extra": {"flash_images": []}}
idedata = platformio_api.IDEData(raw_data)
images = idedata.extra_flash_images
assert images == []
def test_idedata_cc_path(setup_core: Path) -> None:
"""Test IDEData.cc_path returns compiler path."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
raw_data = {
"prog_path": "/path/to/firmware.elf",
"cc_path": "/Users/test/.platformio/packages/toolchain-xtensa32/bin/xtensa-esp32-elf-gcc",
}
idedata = platformio_api.IDEData(raw_data)
assert (
idedata.cc_path
== "/Users/test/.platformio/packages/toolchain-xtensa32/bin/xtensa-esp32-elf-gcc"
)
def test_flash_image_dataclass() -> None:
"""Test FlashImage dataclass stores path and offset correctly."""
image = platformio_api.FlashImage(path="/path/to/image.bin", offset="0x10000")
assert image.path == "/path/to/image.bin"
assert image.offset == "0x10000"
def test_load_idedata_returns_dict(
setup_core: Path, mock_run_platformio_cli_run
) -> None:
"""Test _load_idedata returns parsed idedata dict when successful."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
# Create required files
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
platformio_ini.touch()
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
idedata_path.parent.mkdir(parents=True, exist_ok=True)
idedata_path.write_text('{"prog_path": "/test/firmware.elf"}')
mock_run_platformio_cli_run.return_value = '{"prog_path": "/test/firmware.elf"}'
config = {"name": "test"}
result = platformio_api._load_idedata(config)
assert result is not None
assert isinstance(result, dict)
assert result["prog_path"] == "/test/firmware.elf"
def test_load_idedata_uses_cache_when_valid(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _load_idedata uses cached data when unchanged."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
# Create platformio.ini
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
platformio_ini.write_text("content")
# Create idedata cache file that's newer
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
idedata_path.parent.mkdir(parents=True, exist_ok=True)
idedata_path.write_text('{"prog_path": "/cached/firmware.elf"}')
# Make idedata newer than platformio.ini
platformio_ini_mtime = platformio_ini.stat().st_mtime
os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1))
config = {"name": "test"}
result = platformio_api._load_idedata(config)
# Should not call _run_idedata since cache is valid
mock_run_platformio_cli_run.assert_not_called()
assert result["prog_path"] == "/cached/firmware.elf"
def test_load_idedata_regenerates_when_platformio_ini_newer(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _load_idedata regenerates when platformio.ini is newer."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
# Create idedata cache file first
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
idedata_path.parent.mkdir(parents=True, exist_ok=True)
idedata_path.write_text('{"prog_path": "/old/firmware.elf"}')
# Create platformio.ini that's newer
idedata_mtime = idedata_path.stat().st_mtime
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
platformio_ini.write_text("content")
# Make platformio.ini newer than idedata
os.utime(platformio_ini, (idedata_mtime + 1, idedata_mtime + 1))
# Mock platformio to return new data
new_data = {"prog_path": "/new/firmware.elf"}
mock_run_platformio_cli_run.return_value = json.dumps(new_data)
config = {"name": "test"}
result = platformio_api._load_idedata(config)
# Should call _run_idedata since platformio.ini is newer
mock_run_platformio_cli_run.assert_called_once()
assert result["prog_path"] == "/new/firmware.elf"
def test_load_idedata_regenerates_on_corrupted_cache(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _load_idedata regenerates when cache file is corrupted."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
# Create platformio.ini
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
platformio_ini.write_text("content")
# Create corrupted idedata cache file
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
idedata_path.parent.mkdir(parents=True, exist_ok=True)
idedata_path.write_text('{"prog_path": invalid json')
# Make idedata newer so it would be used if valid
platformio_ini_mtime = platformio_ini.stat().st_mtime
os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1))
# Mock platformio to return new data
new_data = {"prog_path": "/new/firmware.elf"}
mock_run_platformio_cli_run.return_value = json.dumps(new_data)
config = {"name": "test"}
result = platformio_api._load_idedata(config)
# Should call _run_idedata since cache is corrupted
mock_run_platformio_cli_run.assert_called_once()
assert result["prog_path"] == "/new/firmware.elf"
def test_run_idedata_parses_json_from_output(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _run_idedata extracts JSON from platformio output."""
config = {"name": "test"}
expected_data = {
"prog_path": "/path/to/firmware.elf",
"cc_path": "/path/to/gcc",
"extra": {"flash_images": []},
}
# Simulate platformio output with JSON embedded
mock_run_platformio_cli_run.return_value = (
f"Some preamble\n{json.dumps(expected_data)}\nSome postamble"
)
result = platformio_api._run_idedata(config)
assert result == expected_data
def test_run_idedata_raises_on_no_json(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _run_idedata raises EsphomeError when no JSON found."""
config = {"name": "test"}
mock_run_platformio_cli_run.return_value = "No JSON in this output"
with pytest.raises(EsphomeError):
platformio_api._run_idedata(config)
def test_run_idedata_raises_on_invalid_json(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _run_idedata raises on malformed JSON."""
config = {"name": "test"}
mock_run_platformio_cli_run.return_value = '{"invalid": json"}'
# The ValueError from json.loads is re-raised
with pytest.raises(ValueError):
platformio_api._run_idedata(config)
def test_run_platformio_cli_sets_environment_variables(
setup_core: Path, mock_run_external_command: Mock
) -> None:
"""Test run_platformio_cli sets correct environment variables."""
CORE.build_path = str(setup_core / "build" / "test")
with patch.dict(os.environ, {}, clear=False):
mock_run_external_command.return_value = 0
platformio_api.run_platformio_cli("test", "arg")
# Check environment variables were set
assert os.environ["PLATFORMIO_FORCE_COLOR"] == "true"
assert (
setup_core / "build" / "test"
in Path(os.environ["PLATFORMIO_BUILD_DIR"]).parents
or Path(os.environ["PLATFORMIO_BUILD_DIR"]) == setup_core / "build" / "test"
)
assert "PLATFORMIO_LIBDEPS_DIR" in os.environ
assert "PYTHONWARNINGS" in os.environ
# Check command was called correctly
mock_run_external_command.assert_called_once()
args = mock_run_external_command.call_args[0]
assert "platformio" in args
assert "test" in args
assert "arg" in args
def test_run_platformio_cli_run_builds_command(
setup_core: Path, mock_run_platformio_cli: Mock
) -> None:
"""Test run_platformio_cli_run builds correct command."""
CORE.build_path = str(setup_core / "build" / "test")
mock_run_platformio_cli.return_value = 0
config = {"name": "test"}
platformio_api.run_platformio_cli_run(config, True, "extra", "args")
mock_run_platformio_cli.assert_called_once_with(
"run", "-d", CORE.build_path, "-v", "extra", "args"
)
def test_run_compile(setup_core: Path, mock_run_platformio_cli_run: Mock) -> None:
"""Test run_compile with process limit."""
from esphome.const import CONF_COMPILE_PROCESS_LIMIT, CONF_ESPHOME
CORE.build_path = str(setup_core / "build" / "test")
config = {CONF_ESPHOME: {CONF_COMPILE_PROCESS_LIMIT: 4}}
mock_run_platformio_cli_run.return_value = 0
platformio_api.run_compile(config, verbose=True)
mock_run_platformio_cli_run.assert_called_once_with(config, True, "-j4")
def test_get_idedata_caches_result(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test get_idedata caches result in CORE.data."""
from esphome.const import KEY_CORE
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
CORE.data[KEY_CORE] = {}
# Create platformio.ini to avoid regeneration
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
platformio_ini.write_text("content")
# Mock platformio to return data
idedata = {"prog_path": "/test/firmware.elf"}
mock_run_platformio_cli_run.return_value = json.dumps(idedata)
config = {"name": "test"}
# First call should load and cache
result1 = platformio_api.get_idedata(config)
mock_run_platformio_cli_run.assert_called_once()
# Second call should use cache from CORE.data
result2 = platformio_api.get_idedata(config)
mock_run_platformio_cli_run.assert_called_once() # Still only called once
assert result1 is result2
assert isinstance(result1, platformio_api.IDEData)
assert result1.firmware_elf_path == "/test/firmware.elf"
def test_idedata_addr2line_path_windows(setup_core: Path) -> None:
"""Test IDEData.addr2line_path on Windows."""
raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "C:\\tools\\gcc.exe"}
idedata = platformio_api.IDEData(raw_data)
result = idedata.addr2line_path
assert result == "C:\\tools\\addr2line.exe"
def test_idedata_addr2line_path_unix(setup_core: Path) -> None:
"""Test IDEData.addr2line_path on Unix."""
raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "/usr/bin/gcc"}
idedata = platformio_api.IDEData(raw_data)
result = idedata.addr2line_path
assert result == "/usr/bin/addr2line"
def test_patch_structhash(setup_core: Path) -> None:
"""Test patch_structhash monkey patches platformio functions."""
# Create simple namespace objects to act as modules
mock_cli = SimpleNamespace()
mock_helpers = SimpleNamespace()
mock_run = SimpleNamespace(cli=mock_cli, helpers=mock_helpers)
# Mock platformio modules
with patch.dict(
"sys.modules",
{
"platformio.run.cli": mock_cli,
"platformio.run.helpers": mock_helpers,
"platformio.run": mock_run,
"platformio.project.helpers": MagicMock(),
"platformio.fs": MagicMock(),
"platformio": MagicMock(),
},
):
# Call patch_structhash
platformio_api.patch_structhash()
# Verify both modules had clean_build_dir patched
# Check that clean_build_dir was set on both modules
assert hasattr(mock_cli, "clean_build_dir")
assert hasattr(mock_helpers, "clean_build_dir")
# Verify they got the same function assigned
assert mock_cli.clean_build_dir is mock_helpers.clean_build_dir
# Verify it's a real function (not a Mock)
assert callable(mock_cli.clean_build_dir)
assert mock_cli.clean_build_dir.__name__ == "patched_clean_build_dir"
def test_patched_clean_build_dir_removes_outdated(setup_core: Path) -> None:
"""Test patched_clean_build_dir removes build dir when platformio.ini is newer."""
build_dir = setup_core / "build"
build_dir.mkdir()
platformio_ini = setup_core / "platformio.ini"
platformio_ini.write_text("config")
# Make platformio.ini newer than build_dir
build_mtime = build_dir.stat().st_mtime
os.utime(platformio_ini, (build_mtime + 1, build_mtime + 1))
# Track if directory was removed
removed_paths: list[str] = []
def track_rmtree(path: str) -> None:
removed_paths.append(path)
shutil.rmtree(path)
# Create mock modules that patch_structhash expects
mock_cli = SimpleNamespace()
mock_helpers = SimpleNamespace()
mock_project_helpers = MagicMock()
mock_project_helpers.get_project_dir.return_value = str(setup_core)
mock_fs = SimpleNamespace(rmtree=track_rmtree)
with patch.dict(
"sys.modules",
{
"platformio": SimpleNamespace(fs=mock_fs),
"platformio.fs": mock_fs,
"platformio.project.helpers": mock_project_helpers,
"platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers),
"platformio.run.cli": mock_cli,
"platformio.run.helpers": mock_helpers,
},
):
# Call patch_structhash to install the patched function
platformio_api.patch_structhash()
# Call the patched function
mock_helpers.clean_build_dir(str(build_dir), [])
# Verify directory was removed and recreated
assert len(removed_paths) == 1
assert removed_paths[0] == str(build_dir)
assert build_dir.exists() # makedirs recreated it
def test_patched_clean_build_dir_keeps_updated(setup_core: Path) -> None:
"""Test patched_clean_build_dir keeps build dir when it's up to date."""
build_dir = setup_core / "build"
build_dir.mkdir()
test_file = build_dir / "test.txt"
test_file.write_text("test content")
platformio_ini = setup_core / "platformio.ini"
platformio_ini.write_text("config")
# Make build_dir newer than platformio.ini
ini_mtime = platformio_ini.stat().st_mtime
os.utime(build_dir, (ini_mtime + 1, ini_mtime + 1))
# Track if rmtree is called
removed_paths: list[str] = []
def track_rmtree(path: str) -> None:
removed_paths.append(path)
# Create mock modules
mock_cli = SimpleNamespace()
mock_helpers = SimpleNamespace()
mock_project_helpers = MagicMock()
mock_project_helpers.get_project_dir.return_value = str(setup_core)
mock_fs = SimpleNamespace(rmtree=track_rmtree)
with patch.dict(
"sys.modules",
{
"platformio": SimpleNamespace(fs=mock_fs),
"platformio.fs": mock_fs,
"platformio.project.helpers": mock_project_helpers,
"platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers),
"platformio.run.cli": mock_cli,
"platformio.run.helpers": mock_helpers,
},
):
# Call patch_structhash to install the patched function
platformio_api.patch_structhash()
# Call the patched function
mock_helpers.clean_build_dir(str(build_dir), [])
# Verify rmtree was NOT called
assert len(removed_paths) == 0
# Verify directory and file still exist
assert build_dir.exists()
assert test_file.exists()
assert test_file.read_text() == "test content"
def test_patched_clean_build_dir_creates_missing(setup_core: Path) -> None:
"""Test patched_clean_build_dir creates build dir when it doesn't exist."""
build_dir = setup_core / "build"
platformio_ini = setup_core / "platformio.ini"
platformio_ini.write_text("config")
# Ensure build_dir doesn't exist
assert not build_dir.exists()
# Track if rmtree is called
removed_paths: list[str] = []
def track_rmtree(path: str) -> None:
removed_paths.append(path)
# Create mock modules
mock_cli = SimpleNamespace()
mock_helpers = SimpleNamespace()
mock_project_helpers = MagicMock()
mock_project_helpers.get_project_dir.return_value = str(setup_core)
mock_fs = SimpleNamespace(rmtree=track_rmtree)
with patch.dict(
"sys.modules",
{
"platformio": SimpleNamespace(fs=mock_fs),
"platformio.fs": mock_fs,
"platformio.project.helpers": mock_project_helpers,
"platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers),
"platformio.run.cli": mock_cli,
"platformio.run.helpers": mock_helpers,
},
):
# Call patch_structhash to install the patched function
platformio_api.patch_structhash()
# Call the patched function
mock_helpers.clean_build_dir(str(build_dir), [])
# Verify rmtree was NOT called
assert len(removed_paths) == 0
# Verify directory was created
assert build_dir.exists()
def test_process_stacktrace_esp8266_exception(setup_core: Path, caplog) -> None:
"""Test process_stacktrace handles ESP8266 exceptions."""
config = {"name": "test"}
# Test exception type parsing
line = "Exception (28):"
backtrace_state = False
result = platformio_api.process_stacktrace(config, line, backtrace_state)
assert "Access to invalid address: LOAD (wild pointer?)" in caplog.text
assert result is False
def test_process_stacktrace_esp8266_backtrace(
setup_core: Path, mock_decode_pc: Mock
) -> None:
"""Test process_stacktrace handles ESP8266 multi-line backtrace."""
config = {"name": "test"}
# Start of backtrace
line1 = ">>>stack>>>"
state = platformio_api.process_stacktrace(config, line1, False)
assert state is True
# Backtrace content with addresses
line2 = "40201234 40205678"
state = platformio_api.process_stacktrace(config, line2, state)
assert state is True
assert mock_decode_pc.call_count == 2
# End of backtrace
line3 = "<<<stack<<<"
state = platformio_api.process_stacktrace(config, line3, state)
assert state is False
def test_process_stacktrace_esp32_backtrace(
setup_core: Path, mock_decode_pc: Mock
) -> None:
"""Test process_stacktrace handles ESP32 single-line backtrace."""
config = {"name": "test"}
line = "Backtrace: 0x40081234:0x3ffb1234 0x40085678:0x3ffb5678"
state = platformio_api.process_stacktrace(config, line, False)
# Should decode both addresses
assert mock_decode_pc.call_count == 2
mock_decode_pc.assert_any_call(config, "40081234")
mock_decode_pc.assert_any_call(config, "40085678")
assert state is False
def test_process_stacktrace_bad_alloc(
setup_core: Path, mock_decode_pc: Mock, caplog
) -> None:
"""Test process_stacktrace handles bad alloc messages."""
config = {"name": "test"}
line = "last failed alloc call: 40201234(512)"
state = platformio_api.process_stacktrace(config, line, False)
assert "Memory allocation of 512 bytes failed at 40201234" in caplog.text
mock_decode_pc.assert_called_once_with(config, "40201234")
assert state is False

View File

@@ -0,0 +1,660 @@
"""Tests for storage_json.py path functions."""
from datetime import datetime
import json
from pathlib import Path
import sys
from unittest.mock import MagicMock, Mock, patch
import pytest
from esphome import storage_json
from esphome.const import CONF_DISABLED, CONF_MDNS
from esphome.core import CORE
def test_storage_path(setup_core: Path) -> None:
"""Test storage_path returns correct path for current config."""
CORE.config_path = str(setup_core / "my_device.yaml")
result = storage_json.storage_path()
data_dir = Path(CORE.data_dir)
expected = str(data_dir / "storage" / "my_device.yaml.json")
assert result == expected
def test_ext_storage_path(setup_core: Path) -> None:
"""Test ext_storage_path returns correct path for given filename."""
result = storage_json.ext_storage_path("other_device.yaml")
data_dir = Path(CORE.data_dir)
expected = str(data_dir / "storage" / "other_device.yaml.json")
assert result == expected
def test_ext_storage_path_handles_various_extensions(setup_core: Path) -> None:
"""Test ext_storage_path works with different file extensions."""
result_yml = storage_json.ext_storage_path("device.yml")
assert result_yml.endswith("device.yml.json")
result_no_ext = storage_json.ext_storage_path("device")
assert result_no_ext.endswith("device.json")
result_path = storage_json.ext_storage_path("my/device.yaml")
assert result_path.endswith("device.yaml.json")
def test_esphome_storage_path(setup_core: Path) -> None:
"""Test esphome_storage_path returns correct path."""
result = storage_json.esphome_storage_path()
data_dir = Path(CORE.data_dir)
expected = str(data_dir / "esphome.json")
assert result == expected
def test_ignored_devices_storage_path(setup_core: Path) -> None:
"""Test ignored_devices_storage_path returns correct path."""
result = storage_json.ignored_devices_storage_path()
data_dir = Path(CORE.data_dir)
expected = str(data_dir / "ignored-devices.json")
assert result == expected
def test_trash_storage_path(setup_core: Path) -> None:
"""Test trash_storage_path returns correct path."""
CORE.config_path = str(setup_core / "configs" / "device.yaml")
result = storage_json.trash_storage_path()
expected = str(setup_core / "configs" / "trash")
assert result == expected
def test_archive_storage_path(setup_core: Path) -> None:
"""Test archive_storage_path returns correct path."""
CORE.config_path = str(setup_core / "configs" / "device.yaml")
result = storage_json.archive_storage_path()
expected = str(setup_core / "configs" / "archive")
assert result == expected
def test_storage_path_with_subdirectory(setup_core: Path) -> None:
"""Test storage paths work correctly when config is in subdirectory."""
subdir = setup_core / "configs" / "basement"
subdir.mkdir(parents=True, exist_ok=True)
CORE.config_path = str(subdir / "sensor.yaml")
result = storage_json.storage_path()
data_dir = Path(CORE.data_dir)
expected = str(data_dir / "storage" / "sensor.yaml.json")
assert result == expected
def test_storage_json_firmware_bin_path_property(setup_core: Path) -> None:
"""Test StorageJSON firmware_bin_path property."""
storage = storage_json.StorageJSON(
storage_version=1,
name="test_device",
friendly_name="Test Device",
comment=None,
esphome_version="2024.1.0",
src_version=None,
address="192.168.1.100",
web_port=80,
target_platform="ESP32",
build_path="build/test_device",
firmware_bin_path="/path/to/firmware.bin",
loaded_integrations={"wifi", "api"},
loaded_platforms=set(),
no_mdns=False,
)
assert storage.firmware_bin_path == "/path/to/firmware.bin"
def test_storage_json_save_creates_directory(
setup_core: Path, tmp_path: Path, mock_write_file_if_changed: Mock
) -> None:
"""Test StorageJSON.save creates storage directory if it doesn't exist."""
storage_dir = tmp_path / "new_data" / "storage"
storage_file = storage_dir / "test.json"
assert not storage_dir.exists()
storage = storage_json.StorageJSON(
storage_version=1,
name="test",
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=None,
address="test.local",
web_port=None,
target_platform="ESP8266",
build_path=None,
firmware_bin_path=None,
loaded_integrations=set(),
loaded_platforms=set(),
no_mdns=False,
)
storage.save(str(storage_file))
mock_write_file_if_changed.assert_called_once()
call_args = mock_write_file_if_changed.call_args[0]
assert call_args[0] == str(storage_file)
def test_storage_json_from_wizard(setup_core: Path) -> None:
"""Test StorageJSON.from_wizard creates correct storage object."""
storage = storage_json.StorageJSON.from_wizard(
name="my_device",
friendly_name="My Device",
address="my_device.local",
platform="ESP32",
)
assert storage.name == "my_device"
assert storage.friendly_name == "My Device"
assert storage.address == "my_device.local"
assert storage.target_platform == "ESP32"
assert storage.build_path is None
assert storage.firmware_bin_path is None
@pytest.mark.skipif(sys.platform == "win32", reason="HA addons don't run on Windows")
@patch("esphome.core.is_ha_addon")
def test_storage_paths_with_ha_addon(mock_is_ha_addon: bool, tmp_path: Path) -> None:
"""Test storage paths when running as Home Assistant addon."""
mock_is_ha_addon.return_value = True
CORE.config_path = str(tmp_path / "test.yaml")
result = storage_json.storage_path()
# When is_ha_addon is True, CORE.data_dir returns "/data"
# This is the standard mount point for HA addon containers
expected = str(Path("/data") / "storage" / "test.yaml.json")
assert result == expected
result = storage_json.esphome_storage_path()
expected = str(Path("/data") / "esphome.json")
assert result == expected
def test_storage_json_as_dict() -> None:
"""Test StorageJSON.as_dict returns correct dictionary."""
storage = storage_json.StorageJSON(
storage_version=1,
name="test_device",
friendly_name="Test Device",
comment="Test comment",
esphome_version="2024.1.0",
src_version=1,
address="192.168.1.100",
web_port=80,
target_platform="ESP32",
build_path="/path/to/build",
firmware_bin_path="/path/to/firmware.bin",
loaded_integrations={"wifi", "api", "ota"},
loaded_platforms={"sensor", "binary_sensor"},
no_mdns=True,
framework="arduino",
core_platform="esp32",
)
result = storage.as_dict()
assert result["storage_version"] == 1
assert result["name"] == "test_device"
assert result["friendly_name"] == "Test Device"
assert result["comment"] == "Test comment"
assert result["esphome_version"] == "2024.1.0"
assert result["src_version"] == 1
assert result["address"] == "192.168.1.100"
assert result["web_port"] == 80
assert result["esp_platform"] == "ESP32"
assert result["build_path"] == "/path/to/build"
assert result["firmware_bin_path"] == "/path/to/firmware.bin"
assert "api" in result["loaded_integrations"]
assert "wifi" in result["loaded_integrations"]
assert "ota" in result["loaded_integrations"]
assert result["loaded_integrations"] == sorted(
["wifi", "api", "ota"]
) # Should be sorted
assert "sensor" in result["loaded_platforms"]
assert result["loaded_platforms"] == sorted(
["sensor", "binary_sensor"]
) # Should be sorted
assert result["no_mdns"] is True
assert result["framework"] == "arduino"
assert result["core_platform"] == "esp32"
def test_storage_json_to_json() -> None:
"""Test StorageJSON.to_json returns valid JSON string."""
storage = storage_json.StorageJSON(
storage_version=1,
name="test",
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=None,
address="test.local",
web_port=None,
target_platform="ESP8266",
build_path=None,
firmware_bin_path=None,
loaded_integrations=set(),
loaded_platforms=set(),
no_mdns=False,
)
json_str = storage.to_json()
# Should be valid JSON
parsed = json.loads(json_str)
assert parsed["name"] == "test"
assert parsed["storage_version"] == 1
# Should end with newline
assert json_str.endswith("\n")
def test_storage_json_save(tmp_path: Path) -> None:
"""Test StorageJSON.save writes file correctly."""
storage = storage_json.StorageJSON(
storage_version=1,
name="test",
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=None,
address="test.local",
web_port=None,
target_platform="ESP32",
build_path=None,
firmware_bin_path=None,
loaded_integrations=set(),
loaded_platforms=set(),
no_mdns=False,
)
save_path = tmp_path / "test.json"
with patch("esphome.storage_json.write_file_if_changed") as mock_write:
storage.save(str(save_path))
mock_write.assert_called_once_with(str(save_path), storage.to_json())
def test_storage_json_from_esphome_core(setup_core: Path) -> None:
"""Test StorageJSON.from_esphome_core creates correct storage object."""
# Mock CORE object
mock_core = MagicMock()
mock_core.name = "my_device"
mock_core.friendly_name = "My Device"
mock_core.comment = "A test device"
mock_core.address = "192.168.1.50"
mock_core.web_port = 8080
mock_core.target_platform = "esp32"
mock_core.is_esp32 = True
mock_core.build_path = "/build/my_device"
mock_core.firmware_bin = "/build/my_device/firmware.bin"
mock_core.loaded_integrations = {"wifi", "api"}
mock_core.loaded_platforms = {"sensor"}
mock_core.config = {CONF_MDNS: {CONF_DISABLED: True}}
mock_core.target_framework = "esp-idf"
with patch("esphome.components.esp32.get_esp32_variant") as mock_variant:
mock_variant.return_value = "ESP32-C3"
result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None)
assert result.name == "my_device"
assert result.friendly_name == "My Device"
assert result.comment == "A test device"
assert result.address == "192.168.1.50"
assert result.web_port == 8080
assert result.target_platform == "ESP32-C3"
assert result.build_path == "/build/my_device"
assert result.firmware_bin_path == "/build/my_device/firmware.bin"
assert result.loaded_integrations == {"wifi", "api"}
assert result.loaded_platforms == {"sensor"}
assert result.no_mdns is True
assert result.framework == "esp-idf"
assert result.core_platform == "esp32"
def test_storage_json_from_esphome_core_mdns_enabled(setup_core: Path) -> None:
"""Test from_esphome_core with mDNS enabled."""
mock_core = MagicMock()
mock_core.name = "test"
mock_core.friendly_name = "Test"
mock_core.comment = None
mock_core.address = "test.local"
mock_core.web_port = None
mock_core.target_platform = "esp8266"
mock_core.is_esp32 = False
mock_core.build_path = "/build"
mock_core.firmware_bin = "/build/firmware.bin"
mock_core.loaded_integrations = set()
mock_core.loaded_platforms = set()
mock_core.config = {} # No MDNS config means enabled
mock_core.target_framework = "arduino"
result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None)
assert result.no_mdns is False
def test_storage_json_load_valid_file(tmp_path: Path) -> None:
"""Test StorageJSON.load with valid JSON file."""
storage_data = {
"storage_version": 1,
"name": "loaded_device",
"friendly_name": "Loaded Device",
"comment": "Loaded from file",
"esphome_version": "2024.1.0",
"src_version": 2,
"address": "10.0.0.1",
"web_port": 8080,
"esp_platform": "ESP32",
"build_path": "/loaded/build",
"firmware_bin_path": "/loaded/firmware.bin",
"loaded_integrations": ["wifi", "api"],
"loaded_platforms": ["sensor"],
"no_mdns": True,
"framework": "arduino",
"core_platform": "esp32",
}
file_path = tmp_path / "storage.json"
file_path.write_text(json.dumps(storage_data))
result = storage_json.StorageJSON.load(str(file_path))
assert result is not None
assert result.name == "loaded_device"
assert result.friendly_name == "Loaded Device"
assert result.comment == "Loaded from file"
assert result.esphome_version == "2024.1.0"
assert result.src_version == 2
assert result.address == "10.0.0.1"
assert result.web_port == 8080
assert result.target_platform == "ESP32"
assert result.build_path == "/loaded/build"
assert result.firmware_bin_path == "/loaded/firmware.bin"
assert result.loaded_integrations == {"wifi", "api"}
assert result.loaded_platforms == {"sensor"}
assert result.no_mdns is True
assert result.framework == "arduino"
assert result.core_platform == "esp32"
def test_storage_json_load_invalid_file(tmp_path: Path) -> None:
"""Test StorageJSON.load with invalid JSON file."""
file_path = tmp_path / "invalid.json"
file_path.write_text("not valid json{")
result = storage_json.StorageJSON.load(str(file_path))
assert result is None
def test_storage_json_load_nonexistent_file() -> None:
"""Test StorageJSON.load with non-existent file."""
result = storage_json.StorageJSON.load("/nonexistent/file.json")
assert result is None
def test_storage_json_equality() -> None:
"""Test StorageJSON equality comparison."""
storage1 = storage_json.StorageJSON(
storage_version=1,
name="test",
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=1,
address="test.local",
web_port=80,
target_platform="ESP32",
build_path="/build",
firmware_bin_path="/firmware.bin",
loaded_integrations={"wifi"},
loaded_platforms=set(),
no_mdns=False,
)
storage2 = storage_json.StorageJSON(
storage_version=1,
name="test",
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=1,
address="test.local",
web_port=80,
target_platform="ESP32",
build_path="/build",
firmware_bin_path="/firmware.bin",
loaded_integrations={"wifi"},
loaded_platforms=set(),
no_mdns=False,
)
storage3 = storage_json.StorageJSON(
storage_version=1,
name="different", # Different name
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=1,
address="test.local",
web_port=80,
target_platform="ESP32",
build_path="/build",
firmware_bin_path="/firmware.bin",
loaded_integrations={"wifi"},
loaded_platforms=set(),
no_mdns=False,
)
assert storage1 == storage2
assert storage1 != storage3
assert storage1 != "not a storage object"
def test_esphome_storage_json_as_dict() -> None:
"""Test EsphomeStorageJSON.as_dict returns correct dictionary."""
storage = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret123",
last_update_check="2024-01-15T10:30:00",
remote_version="2024.1.1",
)
result = storage.as_dict()
assert result["storage_version"] == 1
assert result["cookie_secret"] == "secret123"
assert result["last_update_check"] == "2024-01-15T10:30:00"
assert result["remote_version"] == "2024.1.1"
def test_esphome_storage_json_last_update_check_property() -> None:
"""Test EsphomeStorageJSON.last_update_check property."""
storage = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret",
last_update_check="2024-01-15T10:30:00",
remote_version=None,
)
# Test getter
result = storage.last_update_check
assert isinstance(result, datetime)
assert result.year == 2024
assert result.month == 1
assert result.day == 15
assert result.hour == 10
assert result.minute == 30
# Test setter
new_date = datetime(2024, 2, 20, 15, 45, 30)
storage.last_update_check = new_date
assert storage.last_update_check_str == "2024-02-20T15:45:30"
def test_esphome_storage_json_last_update_check_invalid() -> None:
"""Test EsphomeStorageJSON.last_update_check with invalid date."""
storage = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret",
last_update_check="invalid date",
remote_version=None,
)
result = storage.last_update_check
assert result is None
def test_esphome_storage_json_to_json() -> None:
"""Test EsphomeStorageJSON.to_json returns valid JSON string."""
storage = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="mysecret",
last_update_check="2024-01-15T10:30:00",
remote_version="2024.1.1",
)
json_str = storage.to_json()
# Should be valid JSON
parsed = json.loads(json_str)
assert parsed["cookie_secret"] == "mysecret"
assert parsed["storage_version"] == 1
# Should end with newline
assert json_str.endswith("\n")
def test_esphome_storage_json_save(tmp_path: Path) -> None:
"""Test EsphomeStorageJSON.save writes file correctly."""
storage = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret",
last_update_check=None,
remote_version=None,
)
save_path = tmp_path / "esphome.json"
with patch("esphome.storage_json.write_file_if_changed") as mock_write:
storage.save(str(save_path))
mock_write.assert_called_once_with(str(save_path), storage.to_json())
def test_esphome_storage_json_load_valid_file(tmp_path: Path) -> None:
"""Test EsphomeStorageJSON.load with valid JSON file."""
storage_data = {
"storage_version": 1,
"cookie_secret": "loaded_secret",
"last_update_check": "2024-01-20T14:30:00",
"remote_version": "2024.1.2",
}
file_path = tmp_path / "esphome.json"
file_path.write_text(json.dumps(storage_data))
result = storage_json.EsphomeStorageJSON.load(str(file_path))
assert result is not None
assert result.storage_version == 1
assert result.cookie_secret == "loaded_secret"
assert result.last_update_check_str == "2024-01-20T14:30:00"
assert result.remote_version == "2024.1.2"
def test_esphome_storage_json_load_invalid_file(tmp_path: Path) -> None:
"""Test EsphomeStorageJSON.load with invalid JSON file."""
file_path = tmp_path / "invalid.json"
file_path.write_text("not valid json{")
result = storage_json.EsphomeStorageJSON.load(str(file_path))
assert result is None
def test_esphome_storage_json_load_nonexistent_file() -> None:
"""Test EsphomeStorageJSON.load with non-existent file."""
result = storage_json.EsphomeStorageJSON.load("/nonexistent/file.json")
assert result is None
def test_esphome_storage_json_get_default() -> None:
"""Test EsphomeStorageJSON.get_default creates default storage."""
with patch("esphome.storage_json.os.urandom") as mock_urandom:
# Mock urandom to return predictable bytes
mock_urandom.return_value = b"test" * 16 # 64 bytes
result = storage_json.EsphomeStorageJSON.get_default()
assert result.storage_version == 1
assert len(result.cookie_secret) == 128 # 64 bytes hex = 128 chars
assert result.last_update_check is None
assert result.remote_version is None
def test_esphome_storage_json_equality() -> None:
"""Test EsphomeStorageJSON equality comparison."""
storage1 = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret",
last_update_check="2024-01-15T10:30:00",
remote_version="2024.1.1",
)
storage2 = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret",
last_update_check="2024-01-15T10:30:00",
remote_version="2024.1.1",
)
storage3 = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="different", # Different secret
last_update_check="2024-01-15T10:30:00",
remote_version="2024.1.1",
)
assert storage1 == storage2
assert storage1 != storage3
assert storage1 != "not a storage object"
def test_storage_json_load_legacy_esphomeyaml_version(tmp_path: Path) -> None:
"""Test loading storage with legacy esphomeyaml_version field."""
storage_data = {
"storage_version": 1,
"name": "legacy_device",
"friendly_name": "Legacy Device",
"esphomeyaml_version": "1.14.0", # Legacy field name
"address": "legacy.local",
"esp_platform": "ESP8266",
}
file_path = tmp_path / "legacy.json"
file_path.write_text(json.dumps(storage_data))
result = storage_json.StorageJSON.load(str(file_path))
assert result is not None
assert result.esphome_version == "1.14.0" # Should map to esphome_version

View File

@@ -141,3 +141,170 @@ def test_list_yaml_files_mixed_extensions(tmp_path: Path) -> None:
str(yaml_file),
str(yml_file),
}
def test_list_yaml_files_does_not_recurse_into_subdirectories(tmp_path: Path) -> None:
"""Test that list_yaml_files only finds files in specified directory, not subdirectories."""
# Create directory structure with YAML files at different depths
root = tmp_path / "configs"
root.mkdir()
# Create YAML files in the root directory
(root / "config1.yaml").write_text("test: 1")
(root / "config2.yml").write_text("test: 2")
(root / "device.yaml").write_text("test: device")
# Create subdirectory with YAML files (should NOT be found)
subdir = root / "subdir"
subdir.mkdir()
(subdir / "nested1.yaml").write_text("test: nested1")
(subdir / "nested2.yml").write_text("test: nested2")
# Create deeper subdirectory (should NOT be found)
deep_subdir = subdir / "deeper"
deep_subdir.mkdir()
(deep_subdir / "very_nested.yaml").write_text("test: very_nested")
# Test listing files from the root directory
result = util.list_yaml_files([str(root)])
# Should only find the 3 files in root, not the 3 in subdirectories
assert len(result) == 3
# Check that only root-level files are found
assert str(root / "config1.yaml") in result
assert str(root / "config2.yml") in result
assert str(root / "device.yaml") in result
# Ensure nested files are NOT found
for r in result:
assert "subdir" not in r
assert "deeper" not in r
assert "nested1.yaml" not in r
assert "nested2.yml" not in r
assert "very_nested.yaml" not in r
def test_list_yaml_files_excludes_secrets(tmp_path: Path) -> None:
"""Test that secrets.yaml and secrets.yml are excluded."""
root = tmp_path / "configs"
root.mkdir()
# Create various YAML files including secrets
(root / "config.yaml").write_text("test: config")
(root / "secrets.yaml").write_text("wifi_password: secret123")
(root / "secrets.yml").write_text("api_key: secret456")
(root / "device.yaml").write_text("test: device")
result = util.list_yaml_files([str(root)])
# Should find 2 files (config.yaml and device.yaml), not secrets
assert len(result) == 2
assert str(root / "config.yaml") in result
assert str(root / "device.yaml") in result
assert str(root / "secrets.yaml") not in result
assert str(root / "secrets.yml") not in result
def test_list_yaml_files_excludes_hidden_files(tmp_path: Path) -> None:
"""Test that hidden files (starting with .) are excluded."""
root = tmp_path / "configs"
root.mkdir()
# Create regular and hidden YAML files
(root / "config.yaml").write_text("test: config")
(root / ".hidden.yaml").write_text("test: hidden")
(root / ".backup.yml").write_text("test: backup")
(root / "device.yaml").write_text("test: device")
result = util.list_yaml_files([str(root)])
# Should find only non-hidden files
assert len(result) == 2
assert str(root / "config.yaml") in result
assert str(root / "device.yaml") in result
assert str(root / ".hidden.yaml") not in result
assert str(root / ".backup.yml") not in result
def test_filter_yaml_files_basic() -> None:
"""Test filter_yaml_files function."""
files = [
"/path/to/config.yaml",
"/path/to/device.yml",
"/path/to/readme.txt",
"/path/to/script.py",
"/path/to/data.json",
"/path/to/another.yaml",
]
result = util.filter_yaml_files(files)
assert len(result) == 3
assert "/path/to/config.yaml" in result
assert "/path/to/device.yml" in result
assert "/path/to/another.yaml" in result
assert "/path/to/readme.txt" not in result
assert "/path/to/script.py" not in result
assert "/path/to/data.json" not in result
def test_filter_yaml_files_excludes_secrets() -> None:
"""Test that filter_yaml_files excludes secrets files."""
files = [
"/path/to/config.yaml",
"/path/to/secrets.yaml",
"/path/to/secrets.yml",
"/path/to/device.yaml",
"/some/dir/secrets.yaml",
]
result = util.filter_yaml_files(files)
assert len(result) == 2
assert "/path/to/config.yaml" in result
assert "/path/to/device.yaml" in result
assert "/path/to/secrets.yaml" not in result
assert "/path/to/secrets.yml" not in result
assert "/some/dir/secrets.yaml" not in result
def test_filter_yaml_files_excludes_hidden() -> None:
"""Test that filter_yaml_files excludes hidden files."""
files = [
"/path/to/config.yaml",
"/path/to/.hidden.yaml",
"/path/to/.backup.yml",
"/path/to/device.yaml",
"/some/dir/.config.yaml",
]
result = util.filter_yaml_files(files)
assert len(result) == 2
assert "/path/to/config.yaml" in result
assert "/path/to/device.yaml" in result
assert "/path/to/.hidden.yaml" not in result
assert "/path/to/.backup.yml" not in result
assert "/some/dir/.config.yaml" not in result
def test_filter_yaml_files_case_sensitive() -> None:
"""Test that filter_yaml_files is case-sensitive for extensions."""
files = [
"/path/to/config.yaml",
"/path/to/config.YAML",
"/path/to/config.YML",
"/path/to/config.Yaml",
"/path/to/config.yml",
]
result = util.filter_yaml_files(files)
# Should only match lowercase .yaml and .yml
assert len(result) == 2
assert "/path/to/config.yaml" in result
assert "/path/to/config.yml" in result
assert "/path/to/config.YAML" not in result
assert "/path/to/config.YML" not in result
assert "/path/to/config.Yaml" not in result

View File

@@ -1,9 +1,12 @@
"""Tests for the wizard.py file."""
import os
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock
import pytest
from pytest import MonkeyPatch
from esphome.components.bk72xx.boards import BK72XX_BOARD_PINS
from esphome.components.esp32.boards import ESP32_BOARD_PINS
@@ -15,7 +18,7 @@ import esphome.wizard as wz
@pytest.fixture
def default_config():
def default_config() -> dict[str, Any]:
return {
"type": "basic",
"name": "test-name",
@@ -28,7 +31,7 @@ def default_config():
@pytest.fixture
def wizard_answers():
def wizard_answers() -> list[str]:
return [
"test-node", # Name of the node
"ESP8266", # platform
@@ -53,7 +56,9 @@ def test_sanitize_quotes_replaces_with_escaped_char():
assert output_str == '\\"key\\": \\"value\\"'
def test_config_file_fallback_ap_includes_descriptive_name(default_config):
def test_config_file_fallback_ap_includes_descriptive_name(
default_config: dict[str, Any],
):
"""
The fallback AP should include the node and a descriptive name
"""
@@ -67,7 +72,9 @@ def test_config_file_fallback_ap_includes_descriptive_name(default_config):
assert 'ssid: "Test Node Fallback Hotspot"' in config
def test_config_file_fallback_ap_name_less_than_32_chars(default_config):
def test_config_file_fallback_ap_name_less_than_32_chars(
default_config: dict[str, Any],
):
"""
The fallback AP name must be less than 32 chars.
Since it is composed of the node name and "Fallback Hotspot" this can be too long and needs truncating
@@ -82,7 +89,7 @@ def test_config_file_fallback_ap_name_less_than_32_chars(default_config):
assert 'ssid: "A Very Long Name For This Node"' in config
def test_config_file_should_include_ota(default_config):
def test_config_file_should_include_ota(default_config: dict[str, Any]):
"""
The Over-The-Air update should be enabled by default
"""
@@ -95,7 +102,9 @@ def test_config_file_should_include_ota(default_config):
assert "ota:" in config
def test_config_file_should_include_ota_when_password_set(default_config):
def test_config_file_should_include_ota_when_password_set(
default_config: dict[str, Any],
):
"""
The Over-The-Air update should be enabled when a password is set
"""
@@ -109,7 +118,9 @@ def test_config_file_should_include_ota_when_password_set(default_config):
assert "ota:" in config
def test_wizard_write_sets_platform(default_config, tmp_path, monkeypatch):
def test_wizard_write_sets_platform(
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
):
"""
If the platform is not explicitly set, use "ESP8266" if the board is one of the ESP8266 boards
"""
@@ -126,7 +137,7 @@ def test_wizard_write_sets_platform(default_config, tmp_path, monkeypatch):
assert "esp8266:" in generated_config
def test_wizard_empty_config(tmp_path, monkeypatch):
def test_wizard_empty_config(tmp_path: Path, monkeypatch: MonkeyPatch):
"""
The wizard should be able to create an empty configuration
"""
@@ -146,7 +157,7 @@ def test_wizard_empty_config(tmp_path, monkeypatch):
assert generated_config == ""
def test_wizard_upload_config(tmp_path, monkeypatch):
def test_wizard_upload_config(tmp_path: Path, monkeypatch: MonkeyPatch):
"""
The wizard should be able to import an base64 encoded configuration
"""
@@ -168,7 +179,7 @@ def test_wizard_upload_config(tmp_path, monkeypatch):
def test_wizard_write_defaults_platform_from_board_esp8266(
default_config, tmp_path, monkeypatch
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
):
"""
If the platform is not explicitly set, use "ESP8266" if the board is one of the ESP8266 boards
@@ -189,7 +200,7 @@ def test_wizard_write_defaults_platform_from_board_esp8266(
def test_wizard_write_defaults_platform_from_board_esp32(
default_config, tmp_path, monkeypatch
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
):
"""
If the platform is not explicitly set, use "ESP32" if the board is one of the ESP32 boards
@@ -210,7 +221,7 @@ def test_wizard_write_defaults_platform_from_board_esp32(
def test_wizard_write_defaults_platform_from_board_bk72xx(
default_config, tmp_path, monkeypatch
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
):
"""
If the platform is not explicitly set, use "BK72XX" if the board is one of BK72XX boards
@@ -231,7 +242,7 @@ def test_wizard_write_defaults_platform_from_board_bk72xx(
def test_wizard_write_defaults_platform_from_board_ln882x(
default_config, tmp_path, monkeypatch
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
):
"""
If the platform is not explicitly set, use "LN882X" if the board is one of LN882X boards
@@ -252,7 +263,7 @@ def test_wizard_write_defaults_platform_from_board_ln882x(
def test_wizard_write_defaults_platform_from_board_rtl87xx(
default_config, tmp_path, monkeypatch
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
):
"""
If the platform is not explicitly set, use "RTL87XX" if the board is one of RTL87XX boards
@@ -272,7 +283,7 @@ def test_wizard_write_defaults_platform_from_board_rtl87xx(
assert "rtl87xx:" in generated_config
def test_safe_print_step_prints_step_number_and_description(monkeypatch):
def test_safe_print_step_prints_step_number_and_description(monkeypatch: MonkeyPatch):
"""
The safe_print_step function prints the step number and the passed description
"""
@@ -296,7 +307,7 @@ def test_safe_print_step_prints_step_number_and_description(monkeypatch):
assert any(f"STEP {step_num}" in arg for arg in all_args)
def test_default_input_uses_default_if_no_input_supplied(monkeypatch):
def test_default_input_uses_default_if_no_input_supplied(monkeypatch: MonkeyPatch):
"""
The default_input() function should return the supplied default value if the user doesn't enter anything
"""
@@ -312,7 +323,7 @@ def test_default_input_uses_default_if_no_input_supplied(monkeypatch):
assert retval == default_string
def test_default_input_uses_user_supplied_value(monkeypatch):
def test_default_input_uses_user_supplied_value(monkeypatch: MonkeyPatch):
"""
The default_input() function should return the value that the user enters
"""
@@ -376,7 +387,9 @@ def test_wizard_rejects_existing_files(tmpdir):
assert retval == 2
def test_wizard_accepts_default_answers_esp8266(tmpdir, monkeypatch, wizard_answers):
def test_wizard_accepts_default_answers_esp8266(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
"""
The wizard should accept the given default answers for esp8266
"""
@@ -396,7 +409,9 @@ def test_wizard_accepts_default_answers_esp8266(tmpdir, monkeypatch, wizard_answ
assert retval == 0
def test_wizard_accepts_default_answers_esp32(tmpdir, monkeypatch, wizard_answers):
def test_wizard_accepts_default_answers_esp32(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
"""
The wizard should accept the given default answers for esp32
"""
@@ -418,7 +433,9 @@ def test_wizard_accepts_default_answers_esp32(tmpdir, monkeypatch, wizard_answer
assert retval == 0
def test_wizard_offers_better_node_name(tmpdir, monkeypatch, wizard_answers):
def test_wizard_offers_better_node_name(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
"""
When the node name does not conform, a better alternative is offered
* Removes special chars
@@ -449,7 +466,9 @@ def test_wizard_offers_better_node_name(tmpdir, monkeypatch, wizard_answers):
assert wz.default_input.call_args.args[1] == expected_name
def test_wizard_requires_correct_platform(tmpdir, monkeypatch, wizard_answers):
def test_wizard_requires_correct_platform(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
"""
When the platform is not either esp32 or esp8266, the wizard should reject it
"""
@@ -471,7 +490,9 @@ def test_wizard_requires_correct_platform(tmpdir, monkeypatch, wizard_answers):
assert retval == 0
def test_wizard_requires_correct_board(tmpdir, monkeypatch, wizard_answers):
def test_wizard_requires_correct_board(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
"""
When the board is not a valid esp8266 board, the wizard should reject it
"""
@@ -493,7 +514,9 @@ def test_wizard_requires_correct_board(tmpdir, monkeypatch, wizard_answers):
assert retval == 0
def test_wizard_requires_valid_ssid(tmpdir, monkeypatch, wizard_answers):
def test_wizard_requires_valid_ssid(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
"""
When the board is not a valid esp8266 board, the wizard should reject it
"""
@@ -515,7 +538,9 @@ def test_wizard_requires_valid_ssid(tmpdir, monkeypatch, wizard_answers):
assert retval == 0
def test_wizard_write_protects_existing_config(tmpdir, default_config, monkeypatch):
def test_wizard_write_protects_existing_config(
tmpdir, default_config: dict[str, Any], monkeypatch: MonkeyPatch
):
"""
The wizard_write function should not overwrite existing config files and return False
"""

View File

@@ -1,13 +1,34 @@
"""Test writer module functionality."""
from collections.abc import Callable
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from esphome.core import EsphomeError
from esphome.storage_json import StorageJSON
from esphome.writer import storage_should_clean, update_storage_json
from esphome.writer import (
CPP_AUTO_GENERATE_BEGIN,
CPP_AUTO_GENERATE_END,
CPP_INCLUDE_BEGIN,
CPP_INCLUDE_END,
GITIGNORE_CONTENT,
clean_build,
clean_cmake_cache,
storage_should_clean,
update_storage_json,
write_cpp,
write_gitignore,
)
@pytest.fixture
def mock_copy_src_tree():
"""Mock copy_src_tree to avoid side effects during tests."""
with patch("esphome.writer.copy_src_tree"):
yield
@pytest.fixture
@@ -218,3 +239,493 @@ def test_update_storage_json_logging_components_removed(
# Verify save was called
new_storage.save.assert_called_once_with("/test/path")
@patch("esphome.writer.CORE")
def test_clean_cmake_cache(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_cmake_cache removes CMakeCache.txt file."""
# Create directory structure
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
device_dir = pioenvs_dir / "test_device"
device_dir.mkdir()
cmake_cache_file = device_dir / "CMakeCache.txt"
cmake_cache_file.write_text("# CMake cache file")
# Setup mocks
mock_core.relative_pioenvs_path.side_effect = [
str(pioenvs_dir), # First call for directory check
str(cmake_cache_file), # Second call for file path
]
mock_core.name = "test_device"
# Verify file exists before
assert cmake_cache_file.exists()
# Call the function
with caplog.at_level("INFO"):
clean_cmake_cache()
# Verify file was removed
assert not cmake_cache_file.exists()
# Verify logging
assert "Deleting" in caplog.text
assert "CMakeCache.txt" in caplog.text
@patch("esphome.writer.CORE")
def test_clean_cmake_cache_no_pioenvs_dir(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test clean_cmake_cache when pioenvs directory doesn't exist."""
# Setup non-existent directory path
pioenvs_dir = tmp_path / ".pioenvs"
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
# Verify directory doesn't exist
assert not pioenvs_dir.exists()
# Call the function - should not crash
clean_cmake_cache()
# Verify directory still doesn't exist
assert not pioenvs_dir.exists()
@patch("esphome.writer.CORE")
def test_clean_cmake_cache_no_cmake_file(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test clean_cmake_cache when CMakeCache.txt doesn't exist."""
# Create directory structure without CMakeCache.txt
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
device_dir = pioenvs_dir / "test_device"
device_dir.mkdir()
cmake_cache_file = device_dir / "CMakeCache.txt"
# Setup mocks
mock_core.relative_pioenvs_path.side_effect = [
str(pioenvs_dir), # First call for directory check
str(cmake_cache_file), # Second call for file path
]
mock_core.name = "test_device"
# Verify file doesn't exist
assert not cmake_cache_file.exists()
# Call the function - should not crash
clean_cmake_cache()
# Verify file still doesn't exist
assert not cmake_cache_file.exists()
@patch("esphome.writer.CORE")
def test_clean_build(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_build removes all build artifacts."""
# Create directory structure and files
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
(pioenvs_dir / "test_file.o").write_text("object file")
piolibdeps_dir = tmp_path / ".piolibdeps"
piolibdeps_dir.mkdir()
(piolibdeps_dir / "library").mkdir()
dependencies_lock = tmp_path / "dependencies.lock"
dependencies_lock.write_text("lock file")
# Create PlatformIO cache directory
platformio_cache_dir = tmp_path / ".platformio" / ".cache"
platformio_cache_dir.mkdir(parents=True)
(platformio_cache_dir / "downloads").mkdir()
(platformio_cache_dir / "http").mkdir()
(platformio_cache_dir / "tmp").mkdir()
(platformio_cache_dir / "downloads" / "package.tar.gz").write_text("package")
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
mock_core.relative_build_path.return_value = str(dependencies_lock)
# Verify all exist before
assert pioenvs_dir.exists()
assert piolibdeps_dir.exists()
assert dependencies_lock.exists()
assert platformio_cache_dir.exists()
# Mock PlatformIO's get_project_cache_dir
with patch(
"platformio.project.helpers.get_project_cache_dir"
) as mock_get_cache_dir:
mock_get_cache_dir.return_value = str(platformio_cache_dir)
# Call the function
with caplog.at_level("INFO"):
clean_build()
# Verify all were removed
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
assert not platformio_cache_dir.exists()
# Verify logging
assert "Deleting" in caplog.text
assert ".pioenvs" in caplog.text
assert ".piolibdeps" in caplog.text
assert "dependencies.lock" in caplog.text
assert "PlatformIO cache" in caplog.text
@patch("esphome.writer.CORE")
def test_clean_build_partial_exists(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_build when only some paths exist."""
# Create only pioenvs directory
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
(pioenvs_dir / "test_file.o").write_text("object file")
piolibdeps_dir = tmp_path / ".piolibdeps"
dependencies_lock = tmp_path / "dependencies.lock"
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
mock_core.relative_build_path.return_value = str(dependencies_lock)
# Verify only pioenvs exists
assert pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Call the function
with caplog.at_level("INFO"):
clean_build()
# Verify only existing path was removed
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Verify logging - only pioenvs should be logged
assert "Deleting" in caplog.text
assert ".pioenvs" in caplog.text
assert ".piolibdeps" not in caplog.text
assert "dependencies.lock" not in caplog.text
@patch("esphome.writer.CORE")
def test_clean_build_nothing_exists(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test clean_build when no build artifacts exist."""
# Setup paths that don't exist
pioenvs_dir = tmp_path / ".pioenvs"
piolibdeps_dir = tmp_path / ".piolibdeps"
dependencies_lock = tmp_path / "dependencies.lock"
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
mock_core.relative_build_path.return_value = str(dependencies_lock)
# Verify nothing exists
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Call the function - should not crash
clean_build()
# Verify nothing was created
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
@patch("esphome.writer.CORE")
def test_clean_build_platformio_not_available(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_build when PlatformIO is not available."""
# Create directory structure and files
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
piolibdeps_dir = tmp_path / ".piolibdeps"
piolibdeps_dir.mkdir()
dependencies_lock = tmp_path / "dependencies.lock"
dependencies_lock.write_text("lock file")
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
mock_core.relative_build_path.return_value = str(dependencies_lock)
# Verify all exist before
assert pioenvs_dir.exists()
assert piolibdeps_dir.exists()
assert dependencies_lock.exists()
# Mock import error for platformio
with (
patch.dict("sys.modules", {"platformio.project.helpers": None}),
caplog.at_level("INFO"),
):
# Call the function
clean_build()
# Verify standard paths were removed but no cache cleaning attempted
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Verify no cache logging
assert "PlatformIO cache" not in caplog.text
@patch("esphome.writer.CORE")
def test_clean_build_empty_cache_dir(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_build when get_project_cache_dir returns empty/whitespace."""
# Create directory structure and files
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(tmp_path / ".piolibdeps")
mock_core.relative_build_path.return_value = str(tmp_path / "dependencies.lock")
# Verify pioenvs exists before
assert pioenvs_dir.exists()
# Mock PlatformIO's get_project_cache_dir to return whitespace
with patch(
"platformio.project.helpers.get_project_cache_dir"
) as mock_get_cache_dir:
mock_get_cache_dir.return_value = " " # Whitespace only
# Call the function
with caplog.at_level("INFO"):
clean_build()
# Verify pioenvs was removed
assert not pioenvs_dir.exists()
# Verify no cache cleaning was attempted due to empty string
assert "PlatformIO cache" not in caplog.text
@patch("esphome.writer.CORE")
def test_write_gitignore_creates_new_file(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_gitignore creates a new .gitignore file when it doesn't exist."""
gitignore_path = tmp_path / ".gitignore"
# Setup mocks
mock_core.relative_config_path.return_value = str(gitignore_path)
# Verify file doesn't exist
assert not gitignore_path.exists()
# Call the function
write_gitignore()
# Verify file was created with correct content
assert gitignore_path.exists()
assert gitignore_path.read_text() == GITIGNORE_CONTENT
@patch("esphome.writer.CORE")
def test_write_gitignore_skips_existing_file(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_gitignore doesn't overwrite existing .gitignore file."""
gitignore_path = tmp_path / ".gitignore"
existing_content = "# Custom gitignore\n/custom_dir/\n"
gitignore_path.write_text(existing_content)
# Setup mocks
mock_core.relative_config_path.return_value = str(gitignore_path)
# Verify file exists with custom content
assert gitignore_path.exists()
assert gitignore_path.read_text() == existing_content
# Call the function
write_gitignore()
# Verify file was not modified
assert gitignore_path.exists()
assert gitignore_path.read_text() == existing_content
@patch("esphome.writer.write_file_if_changed") # Mock to capture output
@patch("esphome.writer.copy_src_tree") # Keep this mock as it's complex
@patch("esphome.writer.CORE")
def test_write_cpp_with_existing_file(
mock_core: MagicMock,
mock_copy_src_tree: MagicMock,
mock_write_file: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp when main.cpp already exists."""
# Create a real file with markers
main_cpp = tmp_path / "main.cpp"
existing_content = f"""#include "esphome.h"
{CPP_INCLUDE_BEGIN}
// Old includes
{CPP_INCLUDE_END}
void setup() {{
{CPP_AUTO_GENERATE_BEGIN}
// Old code
{CPP_AUTO_GENERATE_END}
}}
void loop() {{}}"""
main_cpp.write_text(existing_content)
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
mock_core.cpp_global_section = "// Global section"
# Call the function
test_code = " // New generated code"
write_cpp(test_code)
# Verify copy_src_tree was called
mock_copy_src_tree.assert_called_once()
# Get the content that would be written
mock_write_file.assert_called_once()
written_path, written_content = mock_write_file.call_args[0]
# Check that markers are preserved and content is updated
assert CPP_INCLUDE_BEGIN in written_content
assert CPP_INCLUDE_END in written_content
assert CPP_AUTO_GENERATE_BEGIN in written_content
assert CPP_AUTO_GENERATE_END in written_content
assert test_code in written_content
assert "// Global section" in written_content
@patch("esphome.writer.write_file_if_changed") # Mock to capture output
@patch("esphome.writer.copy_src_tree") # Keep this mock as it's complex
@patch("esphome.writer.CORE")
def test_write_cpp_creates_new_file(
mock_core: MagicMock,
mock_copy_src_tree: MagicMock,
mock_write_file: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp when main.cpp doesn't exist."""
# Setup path for new file
main_cpp = tmp_path / "main.cpp"
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
mock_core.cpp_global_section = "// Global section"
# Verify file doesn't exist
assert not main_cpp.exists()
# Call the function
test_code = " // Generated code"
write_cpp(test_code)
# Verify copy_src_tree was called
mock_copy_src_tree.assert_called_once()
# Get the content that would be written
mock_write_file.assert_called_once()
written_path, written_content = mock_write_file.call_args[0]
assert written_path == str(main_cpp)
# Check that all necessary parts are in the new file
assert '#include "esphome.h"' in written_content
assert CPP_INCLUDE_BEGIN in written_content
assert CPP_INCLUDE_END in written_content
assert CPP_AUTO_GENERATE_BEGIN in written_content
assert CPP_AUTO_GENERATE_END in written_content
assert test_code in written_content
assert "void setup()" in written_content
assert "void loop()" in written_content
assert "App.setup();" in written_content
assert "App.loop();" in written_content
@pytest.mark.usefixtures("mock_copy_src_tree")
@patch("esphome.writer.CORE")
def test_write_cpp_with_missing_end_marker(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp raises error when end marker is missing."""
# Create a file with begin marker but no end marker
main_cpp = tmp_path / "main.cpp"
existing_content = f"""#include "esphome.h"
{CPP_AUTO_GENERATE_BEGIN}
// Code without end marker"""
main_cpp.write_text(existing_content)
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
# Call should raise an error
with pytest.raises(EsphomeError, match="Could not find auto generated code end"):
write_cpp("// New code")
@pytest.mark.usefixtures("mock_copy_src_tree")
@patch("esphome.writer.CORE")
def test_write_cpp_with_duplicate_markers(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp raises error when duplicate markers exist."""
# Create a file with duplicate begin markers
main_cpp = tmp_path / "main.cpp"
existing_content = f"""#include "esphome.h"
{CPP_AUTO_GENERATE_BEGIN}
// First section
{CPP_AUTO_GENERATE_END}
{CPP_AUTO_GENERATE_BEGIN}
// Duplicate section
{CPP_AUTO_GENERATE_END}"""
main_cpp.write_text(existing_content)
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
# Call should raise an error
with pytest.raises(EsphomeError, match="Found multiple auto generate code begins"):
write_cpp("// New code")