1
0
mirror of https://github.com/esphome/esphome.git synced 2025-11-01 15:41:52 +00:00

Compare commits

...

13 Commits

Author SHA1 Message Date
Jesse Hills
e47f4ef602 Merge pull request #10796 from esphome/bump-2025.9.1
2025.9.1
2025-09-19 20:46:53 +12:00
Jesse Hills
961be7fd12 Bump version to 2025.9.1 2025-09-19 11:52:10 +12:00
J. Nick Koston
a5a21f47d1 [gpio] Fix unused function warnings when compiling with log level below DEBUG (#10779) 2025-09-19 11:52:09 +12:00
J. Nick Koston
a06cd84974 [core] Fix ESP8266 mDNS compilation failure caused by incorrect coroutine priorities (#10773) 2025-09-19 11:52:09 +12:00
Subhash Chandra
e3703b43c1 [packet_transport] Refactor sensor/provider list handling to be idempotent (#10765) 2025-09-19 11:52:09 +12:00
J. Nick Koston
f6dc25c0ce [mqtt] Fix KeyError when MQTT logging configured without explicit level (#10774) 2025-09-19 11:52:09 +12:00
Jesse Hills
d2df232706 Merge pull request #10763 from esphome/bump-2025.9.0
2025.9.0
2025-09-17 18:51:21 +12:00
Jesse Hills
404e679e66 Bump version to 2025.9.0 2025-09-17 11:02:12 +12:00
Jesse Hills
8d401ad05a Merge pull request #10761 from esphome/bump-2025.9.0b4
2025.9.0b4
2025-09-17 10:50:15 +12:00
Jesse Hills
e542816f7d Bump version to 2025.9.0b4 2025-09-17 09:22:54 +12:00
J. Nick Koston
12cadf0a04 [core] Fix clean build files to properly clear PlatformIO cache (#10754) 2025-09-17 09:22:54 +12:00
J. Nick Koston
adc3d3127d [wizard] Fix KeyError when running wizard with empty OTA password (#10753) 2025-09-17 09:22:54 +12:00
J. Nick Koston
61ab682099 Add additional coverage for util and writer (#10683) 2025-09-17 09:22:54 +12:00
21 changed files with 904 additions and 60 deletions

View File

@@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 2025.9.0b3
PROJECT_NUMBER = 2025.9.1
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

View File

@@ -212,7 +212,7 @@ def has_mqtt_logging() -> bool:
if CONF_TOPIC not in log_topic:
return False
return log_topic[CONF_LEVEL] != "NONE"
return log_topic.get(CONF_LEVEL, None) != "NONE"
def has_mqtt() -> bool:

View File

@@ -10,7 +10,8 @@ from esphome.const import (
PLATFORM_LN882X,
PLATFORM_RTL87XX,
)
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
AUTO_LOAD = ["web_server_base", "ota.web_server"]
DEPENDENCIES = ["wifi"]
@@ -40,7 +41,7 @@ CONFIG_SCHEMA = cv.All(
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.CAPTIVE_PORTAL)
async def to_code(config):
paren = await cg.get_variable(config[CONF_WEB_SERVER_BASE_ID])

View File

@@ -16,7 +16,8 @@ from esphome.const import (
CONF_SAFE_MODE,
CONF_VERSION,
)
from esphome.core import CoroPriority, coroutine_with_priority
from esphome.core import coroutine_with_priority
from esphome.coroutine import CoroPriority
import esphome.final_validate as fv
_LOGGER = logging.getLogger(__name__)
@@ -121,7 +122,7 @@ CONFIG_SCHEMA = (
FINAL_VALIDATE_SCHEMA = ota_esphome_final_validate
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.OTA_UPDATES)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
cg.add(var.set_port(config[CONF_PORT]))

View File

@@ -6,6 +6,7 @@ namespace gpio {
static const char *const TAG = "gpio.binary_sensor";
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_DEBUG
static const LogString *interrupt_type_to_string(gpio::InterruptType type) {
switch (type) {
case gpio::INTERRUPT_RISING_EDGE:
@@ -22,6 +23,7 @@ static const LogString *interrupt_type_to_string(gpio::InterruptType type) {
static const LogString *gpio_mode_to_string(bool use_interrupt) {
return use_interrupt ? LOG_STR("interrupt") : LOG_STR("polling");
}
#endif
void IRAM_ATTR GPIOBinarySensorStore::gpio_intr(GPIOBinarySensorStore *arg) {
bool new_state = arg->isr_pin_.digital_read();

View File

@@ -3,7 +3,8 @@ import esphome.codegen as cg
from esphome.components.ota import BASE_OTA_SCHEMA, OTAComponent, ota_to_code
import esphome.config_validation as cv
from esphome.const import CONF_ID, CONF_PASSWORD, CONF_URL, CONF_USERNAME
from esphome.core import CoroPriority, coroutine_with_priority
from esphome.core import coroutine_with_priority
from esphome.coroutine import CoroPriority
from .. import CONF_HTTP_REQUEST_ID, HttpRequestComponent, http_request_ns
@@ -40,7 +41,7 @@ CONFIG_SCHEMA = cv.All(
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.OTA_UPDATES)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await ota_to_code(var, config)

View File

@@ -11,7 +11,8 @@ from esphome.const import (
CONF_SERVICES,
PlatformFramework,
)
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"]
DEPENDENCIES = ["network"]
@@ -72,7 +73,7 @@ def mdns_service(
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.NETWORK_SERVICES)
async def to_code(config):
if config[CONF_DISABLED] is True:
return

View File

@@ -10,7 +10,8 @@ from esphome.const import (
CONF_TRIGGER_ID,
PlatformFramework,
)
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"]
AUTO_LOAD = ["md5", "safe_mode"]
@@ -82,7 +83,7 @@ BASE_OTA_SCHEMA = cv.Schema(
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.OTA_UPDATES)
async def to_code(config):
cg.add_define("USE_OTA")

View File

@@ -121,15 +121,11 @@ def transport_schema(cls):
return TRANSPORT_SCHEMA.extend({cv.GenerateID(): cv.declare_id(cls)})
# Build a list of sensors for this platform
CORE.data[DOMAIN] = {CONF_SENSORS: []}
def get_sensors(transport_id):
"""Return the list of sensors for this platform."""
return (
sensor
for sensor in CORE.data[DOMAIN][CONF_SENSORS]
for sensor in CORE.data.setdefault(DOMAIN, {}).setdefault(CONF_SENSORS, [])
if sensor[CONF_TRANSPORT_ID] == transport_id
)
@@ -137,7 +133,8 @@ def get_sensors(transport_id):
def validate_packet_transport_sensor(config):
if CONF_NAME in config and CONF_INTERNAL not in config:
raise cv.Invalid("Must provide internal: config when using name:")
CORE.data[DOMAIN][CONF_SENSORS].append(config)
conf_sensors = CORE.data.setdefault(DOMAIN, {}).setdefault(CONF_SENSORS, [])
conf_sensors.append(config)
return config

View File

@@ -3,7 +3,8 @@ from esphome.components.esp32 import add_idf_component
from esphome.components.ota import BASE_OTA_SCHEMA, OTAComponent, ota_to_code
import esphome.config_validation as cv
from esphome.const import CONF_ID
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"]
DEPENDENCIES = ["network", "web_server_base"]
@@ -22,7 +23,7 @@ CONFIG_SCHEMA = (
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.WEB_SERVER_OTA)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await ota_to_code(var, config)

View File

@@ -1,7 +1,8 @@
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import CONF_ID
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"]
DEPENDENCIES = ["network"]
@@ -26,7 +27,7 @@ CONFIG_SCHEMA = cv.Schema(
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.WEB_SERVER_BASE)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await cg.register_component(var, config)

View File

@@ -4,7 +4,7 @@ from enum import Enum
from esphome.enum import StrEnum
__version__ = "2025.9.0b3"
__version__ = "2025.9.1"
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
VALID_SUBSTITUTIONS_CHARACTERS = (

View File

@@ -90,11 +90,30 @@ class CoroPriority(enum.IntEnum):
# Examples: status_led (80)
STATUS = 80
# Web server infrastructure
# Examples: web_server_base (65)
WEB_SERVER_BASE = 65
# Network portal services
# Examples: captive_portal (64)
CAPTIVE_PORTAL = 64
# Communication protocols and services
# Examples: web_server_base (65), captive_portal (64), wifi (60), ethernet (60),
# mdns (55), ota_updates (54), web_server_ota (52)
# Examples: wifi (60), ethernet (60)
COMMUNICATION = 60
# Network discovery and management services
# Examples: mdns (55)
NETWORK_SERVICES = 55
# OTA update services
# Examples: ota_updates (54)
OTA_UPDATES = 54
# Web-based OTA services
# Examples: web_server_ota (52)
WEB_SERVER_OTA = 52
# Application-level services
# Examples: safe_mode (50)
APPLICATION = 50

View File

@@ -1,6 +1,7 @@
import os
import random
import string
from typing import Literal, NotRequired, TypedDict, Unpack
import unicodedata
import voluptuous as vol
@@ -103,11 +104,25 @@ HARDWARE_BASE_CONFIGS = {
}
def sanitize_double_quotes(value):
def sanitize_double_quotes(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"')
def wizard_file(**kwargs):
class WizardFileKwargs(TypedDict):
"""Keyword arguments for wizard_file function."""
name: str
platform: Literal["ESP8266", "ESP32", "RP2040", "BK72XX", "LN882X", "RTL87XX"]
board: str
ssid: NotRequired[str]
psk: NotRequired[str]
password: NotRequired[str]
ota_password: NotRequired[str]
api_encryption_key: NotRequired[str]
friendly_name: NotRequired[str]
def wizard_file(**kwargs: Unpack[WizardFileKwargs]) -> str:
letters = string.ascii_letters + string.digits
ap_name_base = kwargs["name"].replace("_", " ").title()
ap_name = f"{ap_name_base} Fallback Hotspot"
@@ -180,7 +195,25 @@ captive_portal:
return config
def wizard_write(path, **kwargs):
class WizardWriteKwargs(TypedDict):
"""Keyword arguments for wizard_write function."""
name: str
type: Literal["basic", "empty", "upload"]
# Required for "basic" type
board: NotRequired[str]
platform: NotRequired[str]
ssid: NotRequired[str]
psk: NotRequired[str]
password: NotRequired[str]
ota_password: NotRequired[str]
api_encryption_key: NotRequired[str]
friendly_name: NotRequired[str]
# Required for "upload" type
file_text: NotRequired[str]
def wizard_write(path: str, **kwargs: Unpack[WizardWriteKwargs]) -> bool:
from esphome.components.bk72xx import boards as bk72xx_boards
from esphome.components.esp32 import boards as esp32_boards
from esphome.components.esp8266 import boards as esp8266_boards
@@ -237,14 +270,14 @@ def wizard_write(path, **kwargs):
if get_bool_env(ENV_QUICKWIZARD):
def sleep(time):
def sleep(time: float) -> None:
pass
else:
from time import sleep
def safe_print_step(step, big):
def safe_print_step(step: int, big: str) -> None:
safe_print()
safe_print()
safe_print(f"============= STEP {step} =============")
@@ -253,14 +286,14 @@ def safe_print_step(step, big):
sleep(0.25)
def default_input(text, default):
def default_input(text: str, default: str) -> str:
safe_print()
safe_print(f"Press ENTER for default ({default})")
return safe_input(text.format(default)) or default
# From https://stackoverflow.com/a/518232/8924614
def strip_accents(value):
def strip_accents(value: str) -> str:
return "".join(
c
for c in unicodedata.normalize("NFD", str(value))
@@ -268,7 +301,7 @@ def strip_accents(value):
)
def wizard(path):
def wizard(path: str) -> int:
from esphome.components.bk72xx import boards as bk72xx_boards
from esphome.components.esp32 import boards as esp32_boards
from esphome.components.esp8266 import boards as esp8266_boards
@@ -509,6 +542,7 @@ def wizard(path):
ssid=ssid,
psk=psk,
password=password,
type="basic",
):
return 1

View File

@@ -315,6 +315,19 @@ def clean_build():
_LOGGER.info("Deleting %s", dependencies_lock)
os.remove(dependencies_lock)
# Clean PlatformIO cache to resolve CMake compiler detection issues
# This helps when toolchain paths change or get corrupted
try:
from platformio.project.helpers import get_project_cache_dir
except ImportError:
# PlatformIO is not available, skip cache cleaning
pass
else:
cache_dir = get_project_cache_dir()
if cache_dir and cache_dir.strip() and os.path.isdir(cache_dir):
_LOGGER.info("Deleting PlatformIO cache %s", cache_dir)
shutil.rmtree(cache_dir)
GITIGNORE_CONTENT = """# Gitignore settings for ESPHome
# This is an example and may include too much for your use-case.

View File

@@ -0,0 +1,42 @@
# Comprehensive ESP8266 test for mdns with multiple network components
# Tests the complete priority chain:
# wifi (60) -> mdns (55) -> ota (54) -> web_server_ota (52)
esphome:
name: mdns-comprehensive-test
esp8266:
board: esp01_1m
logger:
level: DEBUG
wifi:
ssid: MySSID
password: password1
# web_server_base should run at priority 65 (before wifi)
web_server:
port: 80
# mdns should run at priority 55 (after wifi at 60)
mdns:
services:
- service: _http
protocol: _tcp
port: 80
# OTA should run at priority 54 (after mdns)
ota:
- platform: esphome
password: "otapassword"
# Test status LED at priority 80
status_led:
pin:
number: GPIO2
inverted: true
# Include API at priority 40
api:
password: "apipassword"

View File

@@ -13,7 +13,12 @@ def test_coro_priority_enum_values() -> None:
assert CoroPriority.CORE == 100
assert CoroPriority.DIAGNOSTICS == 90
assert CoroPriority.STATUS == 80
assert CoroPriority.WEB_SERVER_BASE == 65
assert CoroPriority.CAPTIVE_PORTAL == 64
assert CoroPriority.COMMUNICATION == 60
assert CoroPriority.NETWORK_SERVICES == 55
assert CoroPriority.OTA_UPDATES == 54
assert CoroPriority.WEB_SERVER_OTA == 52
assert CoroPriority.APPLICATION == 50
assert CoroPriority.WEB == 40
assert CoroPriority.AUTOMATION == 30
@@ -70,7 +75,12 @@ def test_float_and_enum_are_interchangeable() -> None:
(CoroPriority.CORE, 100.0),
(CoroPriority.DIAGNOSTICS, 90.0),
(CoroPriority.STATUS, 80.0),
(CoroPriority.WEB_SERVER_BASE, 65.0),
(CoroPriority.CAPTIVE_PORTAL, 64.0),
(CoroPriority.COMMUNICATION, 60.0),
(CoroPriority.NETWORK_SERVICES, 55.0),
(CoroPriority.OTA_UPDATES, 54.0),
(CoroPriority.WEB_SERVER_OTA, 52.0),
(CoroPriority.APPLICATION, 50.0),
(CoroPriority.WEB, 40.0),
(CoroPriority.AUTOMATION, 30.0),
@@ -164,8 +174,13 @@ def test_enum_priority_comparison() -> None:
assert CoroPriority.NETWORK_TRANSPORT > CoroPriority.CORE
assert CoroPriority.CORE > CoroPriority.DIAGNOSTICS
assert CoroPriority.DIAGNOSTICS > CoroPriority.STATUS
assert CoroPriority.STATUS > CoroPriority.COMMUNICATION
assert CoroPriority.COMMUNICATION > CoroPriority.APPLICATION
assert CoroPriority.STATUS > CoroPriority.WEB_SERVER_BASE
assert CoroPriority.WEB_SERVER_BASE > CoroPriority.CAPTIVE_PORTAL
assert CoroPriority.CAPTIVE_PORTAL > CoroPriority.COMMUNICATION
assert CoroPriority.COMMUNICATION > CoroPriority.NETWORK_SERVICES
assert CoroPriority.NETWORK_SERVICES > CoroPriority.OTA_UPDATES
assert CoroPriority.OTA_UPDATES > CoroPriority.WEB_SERVER_OTA
assert CoroPriority.WEB_SERVER_OTA > CoroPriority.APPLICATION
assert CoroPriority.APPLICATION > CoroPriority.WEB
assert CoroPriority.WEB > CoroPriority.AUTOMATION
assert CoroPriority.AUTOMATION > CoroPriority.BUS

View File

@@ -1226,6 +1226,18 @@ def test_has_mqtt_logging_no_log_topic() -> None:
setup_core(config={})
assert has_mqtt_logging() is False
# Setup MQTT config with CONF_LOG_TOPIC but no CONF_LEVEL (regression test for #10771)
# This simulates the default configuration created by validate_config in the MQTT component
setup_core(
config={
CONF_MQTT: {
CONF_BROKER: "mqtt.local",
CONF_LOG_TOPIC: {CONF_TOPIC: "esphome/debug"},
}
}
)
assert has_mqtt_logging() is True
def test_has_mqtt() -> None:
"""Test has_mqtt function."""

View File

@@ -141,3 +141,170 @@ def test_list_yaml_files_mixed_extensions(tmp_path: Path) -> None:
str(yaml_file),
str(yml_file),
}
def test_list_yaml_files_does_not_recurse_into_subdirectories(tmp_path: Path) -> None:
"""Test that list_yaml_files only finds files in specified directory, not subdirectories."""
# Create directory structure with YAML files at different depths
root = tmp_path / "configs"
root.mkdir()
# Create YAML files in the root directory
(root / "config1.yaml").write_text("test: 1")
(root / "config2.yml").write_text("test: 2")
(root / "device.yaml").write_text("test: device")
# Create subdirectory with YAML files (should NOT be found)
subdir = root / "subdir"
subdir.mkdir()
(subdir / "nested1.yaml").write_text("test: nested1")
(subdir / "nested2.yml").write_text("test: nested2")
# Create deeper subdirectory (should NOT be found)
deep_subdir = subdir / "deeper"
deep_subdir.mkdir()
(deep_subdir / "very_nested.yaml").write_text("test: very_nested")
# Test listing files from the root directory
result = util.list_yaml_files([str(root)])
# Should only find the 3 files in root, not the 3 in subdirectories
assert len(result) == 3
# Check that only root-level files are found
assert str(root / "config1.yaml") in result
assert str(root / "config2.yml") in result
assert str(root / "device.yaml") in result
# Ensure nested files are NOT found
for r in result:
assert "subdir" not in r
assert "deeper" not in r
assert "nested1.yaml" not in r
assert "nested2.yml" not in r
assert "very_nested.yaml" not in r
def test_list_yaml_files_excludes_secrets(tmp_path: Path) -> None:
"""Test that secrets.yaml and secrets.yml are excluded."""
root = tmp_path / "configs"
root.mkdir()
# Create various YAML files including secrets
(root / "config.yaml").write_text("test: config")
(root / "secrets.yaml").write_text("wifi_password: secret123")
(root / "secrets.yml").write_text("api_key: secret456")
(root / "device.yaml").write_text("test: device")
result = util.list_yaml_files([str(root)])
# Should find 2 files (config.yaml and device.yaml), not secrets
assert len(result) == 2
assert str(root / "config.yaml") in result
assert str(root / "device.yaml") in result
assert str(root / "secrets.yaml") not in result
assert str(root / "secrets.yml") not in result
def test_list_yaml_files_excludes_hidden_files(tmp_path: Path) -> None:
"""Test that hidden files (starting with .) are excluded."""
root = tmp_path / "configs"
root.mkdir()
# Create regular and hidden YAML files
(root / "config.yaml").write_text("test: config")
(root / ".hidden.yaml").write_text("test: hidden")
(root / ".backup.yml").write_text("test: backup")
(root / "device.yaml").write_text("test: device")
result = util.list_yaml_files([str(root)])
# Should find only non-hidden files
assert len(result) == 2
assert str(root / "config.yaml") in result
assert str(root / "device.yaml") in result
assert str(root / ".hidden.yaml") not in result
assert str(root / ".backup.yml") not in result
def test_filter_yaml_files_basic() -> None:
"""Test filter_yaml_files function."""
files = [
"/path/to/config.yaml",
"/path/to/device.yml",
"/path/to/readme.txt",
"/path/to/script.py",
"/path/to/data.json",
"/path/to/another.yaml",
]
result = util.filter_yaml_files(files)
assert len(result) == 3
assert "/path/to/config.yaml" in result
assert "/path/to/device.yml" in result
assert "/path/to/another.yaml" in result
assert "/path/to/readme.txt" not in result
assert "/path/to/script.py" not in result
assert "/path/to/data.json" not in result
def test_filter_yaml_files_excludes_secrets() -> None:
"""Test that filter_yaml_files excludes secrets files."""
files = [
"/path/to/config.yaml",
"/path/to/secrets.yaml",
"/path/to/secrets.yml",
"/path/to/device.yaml",
"/some/dir/secrets.yaml",
]
result = util.filter_yaml_files(files)
assert len(result) == 2
assert "/path/to/config.yaml" in result
assert "/path/to/device.yaml" in result
assert "/path/to/secrets.yaml" not in result
assert "/path/to/secrets.yml" not in result
assert "/some/dir/secrets.yaml" not in result
def test_filter_yaml_files_excludes_hidden() -> None:
"""Test that filter_yaml_files excludes hidden files."""
files = [
"/path/to/config.yaml",
"/path/to/.hidden.yaml",
"/path/to/.backup.yml",
"/path/to/device.yaml",
"/some/dir/.config.yaml",
]
result = util.filter_yaml_files(files)
assert len(result) == 2
assert "/path/to/config.yaml" in result
assert "/path/to/device.yaml" in result
assert "/path/to/.hidden.yaml" not in result
assert "/path/to/.backup.yml" not in result
assert "/some/dir/.config.yaml" not in result
def test_filter_yaml_files_case_sensitive() -> None:
"""Test that filter_yaml_files is case-sensitive for extensions."""
files = [
"/path/to/config.yaml",
"/path/to/config.YAML",
"/path/to/config.YML",
"/path/to/config.Yaml",
"/path/to/config.yml",
]
result = util.filter_yaml_files(files)
# Should only match lowercase .yaml and .yml
assert len(result) == 2
assert "/path/to/config.yaml" in result
assert "/path/to/config.yml" in result
assert "/path/to/config.YAML" not in result
assert "/path/to/config.YML" not in result
assert "/path/to/config.Yaml" not in result

View File

@@ -1,9 +1,12 @@
"""Tests for the wizard.py file."""
import os
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock
import pytest
from pytest import MonkeyPatch
from esphome.components.bk72xx.boards import BK72XX_BOARD_PINS
from esphome.components.esp32.boards import ESP32_BOARD_PINS
@@ -15,7 +18,7 @@ import esphome.wizard as wz
@pytest.fixture
def default_config():
def default_config() -> dict[str, Any]:
return {
"type": "basic",
"name": "test-name",
@@ -28,7 +31,7 @@ def default_config():
@pytest.fixture
def wizard_answers():
def wizard_answers() -> list[str]:
return [
"test-node", # Name of the node
"ESP8266", # platform
@@ -53,7 +56,9 @@ def test_sanitize_quotes_replaces_with_escaped_char():
assert output_str == '\\"key\\": \\"value\\"'
def test_config_file_fallback_ap_includes_descriptive_name(default_config):
def test_config_file_fallback_ap_includes_descriptive_name(
default_config: dict[str, Any],
):
"""
The fallback AP should include the node and a descriptive name
"""
@@ -67,7 +72,9 @@ def test_config_file_fallback_ap_includes_descriptive_name(default_config):
assert 'ssid: "Test Node Fallback Hotspot"' in config
def test_config_file_fallback_ap_name_less_than_32_chars(default_config):
def test_config_file_fallback_ap_name_less_than_32_chars(
default_config: dict[str, Any],
):
"""
The fallback AP name must be less than 32 chars.
Since it is composed of the node name and "Fallback Hotspot" this can be too long and needs truncating
@@ -82,7 +89,7 @@ def test_config_file_fallback_ap_name_less_than_32_chars(default_config):
assert 'ssid: "A Very Long Name For This Node"' in config
def test_config_file_should_include_ota(default_config):
def test_config_file_should_include_ota(default_config: dict[str, Any]):
"""
The Over-The-Air update should be enabled by default
"""
@@ -95,7 +102,9 @@ def test_config_file_should_include_ota(default_config):
assert "ota:" in config
def test_config_file_should_include_ota_when_password_set(default_config):
def test_config_file_should_include_ota_when_password_set(
default_config: dict[str, Any],
):
"""
The Over-The-Air update should be enabled when a password is set
"""
@@ -109,7 +118,9 @@ def test_config_file_should_include_ota_when_password_set(default_config):
assert "ota:" in config
def test_wizard_write_sets_platform(default_config, tmp_path, monkeypatch):
def test_wizard_write_sets_platform(
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
):
"""
If the platform is not explicitly set, use "ESP8266" if the board is one of the ESP8266 boards
"""
@@ -126,7 +137,7 @@ def test_wizard_write_sets_platform(default_config, tmp_path, monkeypatch):
assert "esp8266:" in generated_config
def test_wizard_empty_config(tmp_path, monkeypatch):
def test_wizard_empty_config(tmp_path: Path, monkeypatch: MonkeyPatch):
"""
The wizard should be able to create an empty configuration
"""
@@ -146,7 +157,7 @@ def test_wizard_empty_config(tmp_path, monkeypatch):
assert generated_config == ""
def test_wizard_upload_config(tmp_path, monkeypatch):
def test_wizard_upload_config(tmp_path: Path, monkeypatch: MonkeyPatch):
"""
The wizard should be able to import an base64 encoded configuration
"""
@@ -168,7 +179,7 @@ def test_wizard_upload_config(tmp_path, monkeypatch):
def test_wizard_write_defaults_platform_from_board_esp8266(
default_config, tmp_path, monkeypatch
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
):
"""
If the platform is not explicitly set, use "ESP8266" if the board is one of the ESP8266 boards
@@ -189,7 +200,7 @@ def test_wizard_write_defaults_platform_from_board_esp8266(
def test_wizard_write_defaults_platform_from_board_esp32(
default_config, tmp_path, monkeypatch
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
):
"""
If the platform is not explicitly set, use "ESP32" if the board is one of the ESP32 boards
@@ -210,7 +221,7 @@ def test_wizard_write_defaults_platform_from_board_esp32(
def test_wizard_write_defaults_platform_from_board_bk72xx(
default_config, tmp_path, monkeypatch
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
):
"""
If the platform is not explicitly set, use "BK72XX" if the board is one of BK72XX boards
@@ -231,7 +242,7 @@ def test_wizard_write_defaults_platform_from_board_bk72xx(
def test_wizard_write_defaults_platform_from_board_ln882x(
default_config, tmp_path, monkeypatch
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
):
"""
If the platform is not explicitly set, use "LN882X" if the board is one of LN882X boards
@@ -252,7 +263,7 @@ def test_wizard_write_defaults_platform_from_board_ln882x(
def test_wizard_write_defaults_platform_from_board_rtl87xx(
default_config, tmp_path, monkeypatch
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
):
"""
If the platform is not explicitly set, use "RTL87XX" if the board is one of RTL87XX boards
@@ -272,7 +283,7 @@ def test_wizard_write_defaults_platform_from_board_rtl87xx(
assert "rtl87xx:" in generated_config
def test_safe_print_step_prints_step_number_and_description(monkeypatch):
def test_safe_print_step_prints_step_number_and_description(monkeypatch: MonkeyPatch):
"""
The safe_print_step function prints the step number and the passed description
"""
@@ -296,7 +307,7 @@ def test_safe_print_step_prints_step_number_and_description(monkeypatch):
assert any(f"STEP {step_num}" in arg for arg in all_args)
def test_default_input_uses_default_if_no_input_supplied(monkeypatch):
def test_default_input_uses_default_if_no_input_supplied(monkeypatch: MonkeyPatch):
"""
The default_input() function should return the supplied default value if the user doesn't enter anything
"""
@@ -312,7 +323,7 @@ def test_default_input_uses_default_if_no_input_supplied(monkeypatch):
assert retval == default_string
def test_default_input_uses_user_supplied_value(monkeypatch):
def test_default_input_uses_user_supplied_value(monkeypatch: MonkeyPatch):
"""
The default_input() function should return the value that the user enters
"""
@@ -376,7 +387,9 @@ def test_wizard_rejects_existing_files(tmpdir):
assert retval == 2
def test_wizard_accepts_default_answers_esp8266(tmpdir, monkeypatch, wizard_answers):
def test_wizard_accepts_default_answers_esp8266(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
"""
The wizard should accept the given default answers for esp8266
"""
@@ -396,7 +409,9 @@ def test_wizard_accepts_default_answers_esp8266(tmpdir, monkeypatch, wizard_answ
assert retval == 0
def test_wizard_accepts_default_answers_esp32(tmpdir, monkeypatch, wizard_answers):
def test_wizard_accepts_default_answers_esp32(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
"""
The wizard should accept the given default answers for esp32
"""
@@ -418,7 +433,9 @@ def test_wizard_accepts_default_answers_esp32(tmpdir, monkeypatch, wizard_answer
assert retval == 0
def test_wizard_offers_better_node_name(tmpdir, monkeypatch, wizard_answers):
def test_wizard_offers_better_node_name(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
"""
When the node name does not conform, a better alternative is offered
* Removes special chars
@@ -449,7 +466,9 @@ def test_wizard_offers_better_node_name(tmpdir, monkeypatch, wizard_answers):
assert wz.default_input.call_args.args[1] == expected_name
def test_wizard_requires_correct_platform(tmpdir, monkeypatch, wizard_answers):
def test_wizard_requires_correct_platform(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
"""
When the platform is not either esp32 or esp8266, the wizard should reject it
"""
@@ -471,7 +490,9 @@ def test_wizard_requires_correct_platform(tmpdir, monkeypatch, wizard_answers):
assert retval == 0
def test_wizard_requires_correct_board(tmpdir, monkeypatch, wizard_answers):
def test_wizard_requires_correct_board(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
"""
When the board is not a valid esp8266 board, the wizard should reject it
"""
@@ -493,7 +514,9 @@ def test_wizard_requires_correct_board(tmpdir, monkeypatch, wizard_answers):
assert retval == 0
def test_wizard_requires_valid_ssid(tmpdir, monkeypatch, wizard_answers):
def test_wizard_requires_valid_ssid(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
"""
When the board is not a valid esp8266 board, the wizard should reject it
"""
@@ -515,7 +538,9 @@ def test_wizard_requires_valid_ssid(tmpdir, monkeypatch, wizard_answers):
assert retval == 0
def test_wizard_write_protects_existing_config(tmpdir, default_config, monkeypatch):
def test_wizard_write_protects_existing_config(
tmpdir, default_config: dict[str, Any], monkeypatch: MonkeyPatch
):
"""
The wizard_write function should not overwrite existing config files and return False
"""

View File

@@ -1,13 +1,34 @@
"""Test writer module functionality."""
from collections.abc import Callable
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from esphome.core import EsphomeError
from esphome.storage_json import StorageJSON
from esphome.writer import storage_should_clean, update_storage_json
from esphome.writer import (
CPP_AUTO_GENERATE_BEGIN,
CPP_AUTO_GENERATE_END,
CPP_INCLUDE_BEGIN,
CPP_INCLUDE_END,
GITIGNORE_CONTENT,
clean_build,
clean_cmake_cache,
storage_should_clean,
update_storage_json,
write_cpp,
write_gitignore,
)
@pytest.fixture
def mock_copy_src_tree():
"""Mock copy_src_tree to avoid side effects during tests."""
with patch("esphome.writer.copy_src_tree"):
yield
@pytest.fixture
@@ -218,3 +239,493 @@ def test_update_storage_json_logging_components_removed(
# Verify save was called
new_storage.save.assert_called_once_with("/test/path")
@patch("esphome.writer.CORE")
def test_clean_cmake_cache(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_cmake_cache removes CMakeCache.txt file."""
# Create directory structure
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
device_dir = pioenvs_dir / "test_device"
device_dir.mkdir()
cmake_cache_file = device_dir / "CMakeCache.txt"
cmake_cache_file.write_text("# CMake cache file")
# Setup mocks
mock_core.relative_pioenvs_path.side_effect = [
str(pioenvs_dir), # First call for directory check
str(cmake_cache_file), # Second call for file path
]
mock_core.name = "test_device"
# Verify file exists before
assert cmake_cache_file.exists()
# Call the function
with caplog.at_level("INFO"):
clean_cmake_cache()
# Verify file was removed
assert not cmake_cache_file.exists()
# Verify logging
assert "Deleting" in caplog.text
assert "CMakeCache.txt" in caplog.text
@patch("esphome.writer.CORE")
def test_clean_cmake_cache_no_pioenvs_dir(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test clean_cmake_cache when pioenvs directory doesn't exist."""
# Setup non-existent directory path
pioenvs_dir = tmp_path / ".pioenvs"
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
# Verify directory doesn't exist
assert not pioenvs_dir.exists()
# Call the function - should not crash
clean_cmake_cache()
# Verify directory still doesn't exist
assert not pioenvs_dir.exists()
@patch("esphome.writer.CORE")
def test_clean_cmake_cache_no_cmake_file(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test clean_cmake_cache when CMakeCache.txt doesn't exist."""
# Create directory structure without CMakeCache.txt
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
device_dir = pioenvs_dir / "test_device"
device_dir.mkdir()
cmake_cache_file = device_dir / "CMakeCache.txt"
# Setup mocks
mock_core.relative_pioenvs_path.side_effect = [
str(pioenvs_dir), # First call for directory check
str(cmake_cache_file), # Second call for file path
]
mock_core.name = "test_device"
# Verify file doesn't exist
assert not cmake_cache_file.exists()
# Call the function - should not crash
clean_cmake_cache()
# Verify file still doesn't exist
assert not cmake_cache_file.exists()
@patch("esphome.writer.CORE")
def test_clean_build(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_build removes all build artifacts."""
# Create directory structure and files
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
(pioenvs_dir / "test_file.o").write_text("object file")
piolibdeps_dir = tmp_path / ".piolibdeps"
piolibdeps_dir.mkdir()
(piolibdeps_dir / "library").mkdir()
dependencies_lock = tmp_path / "dependencies.lock"
dependencies_lock.write_text("lock file")
# Create PlatformIO cache directory
platformio_cache_dir = tmp_path / ".platformio" / ".cache"
platformio_cache_dir.mkdir(parents=True)
(platformio_cache_dir / "downloads").mkdir()
(platformio_cache_dir / "http").mkdir()
(platformio_cache_dir / "tmp").mkdir()
(platformio_cache_dir / "downloads" / "package.tar.gz").write_text("package")
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
mock_core.relative_build_path.return_value = str(dependencies_lock)
# Verify all exist before
assert pioenvs_dir.exists()
assert piolibdeps_dir.exists()
assert dependencies_lock.exists()
assert platformio_cache_dir.exists()
# Mock PlatformIO's get_project_cache_dir
with patch(
"platformio.project.helpers.get_project_cache_dir"
) as mock_get_cache_dir:
mock_get_cache_dir.return_value = str(platformio_cache_dir)
# Call the function
with caplog.at_level("INFO"):
clean_build()
# Verify all were removed
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
assert not platformio_cache_dir.exists()
# Verify logging
assert "Deleting" in caplog.text
assert ".pioenvs" in caplog.text
assert ".piolibdeps" in caplog.text
assert "dependencies.lock" in caplog.text
assert "PlatformIO cache" in caplog.text
@patch("esphome.writer.CORE")
def test_clean_build_partial_exists(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_build when only some paths exist."""
# Create only pioenvs directory
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
(pioenvs_dir / "test_file.o").write_text("object file")
piolibdeps_dir = tmp_path / ".piolibdeps"
dependencies_lock = tmp_path / "dependencies.lock"
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
mock_core.relative_build_path.return_value = str(dependencies_lock)
# Verify only pioenvs exists
assert pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Call the function
with caplog.at_level("INFO"):
clean_build()
# Verify only existing path was removed
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Verify logging - only pioenvs should be logged
assert "Deleting" in caplog.text
assert ".pioenvs" in caplog.text
assert ".piolibdeps" not in caplog.text
assert "dependencies.lock" not in caplog.text
@patch("esphome.writer.CORE")
def test_clean_build_nothing_exists(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test clean_build when no build artifacts exist."""
# Setup paths that don't exist
pioenvs_dir = tmp_path / ".pioenvs"
piolibdeps_dir = tmp_path / ".piolibdeps"
dependencies_lock = tmp_path / "dependencies.lock"
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
mock_core.relative_build_path.return_value = str(dependencies_lock)
# Verify nothing exists
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Call the function - should not crash
clean_build()
# Verify nothing was created
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
@patch("esphome.writer.CORE")
def test_clean_build_platformio_not_available(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_build when PlatformIO is not available."""
# Create directory structure and files
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
piolibdeps_dir = tmp_path / ".piolibdeps"
piolibdeps_dir.mkdir()
dependencies_lock = tmp_path / "dependencies.lock"
dependencies_lock.write_text("lock file")
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
mock_core.relative_build_path.return_value = str(dependencies_lock)
# Verify all exist before
assert pioenvs_dir.exists()
assert piolibdeps_dir.exists()
assert dependencies_lock.exists()
# Mock import error for platformio
with (
patch.dict("sys.modules", {"platformio.project.helpers": None}),
caplog.at_level("INFO"),
):
# Call the function
clean_build()
# Verify standard paths were removed but no cache cleaning attempted
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Verify no cache logging
assert "PlatformIO cache" not in caplog.text
@patch("esphome.writer.CORE")
def test_clean_build_empty_cache_dir(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_build when get_project_cache_dir returns empty/whitespace."""
# Create directory structure and files
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(tmp_path / ".piolibdeps")
mock_core.relative_build_path.return_value = str(tmp_path / "dependencies.lock")
# Verify pioenvs exists before
assert pioenvs_dir.exists()
# Mock PlatformIO's get_project_cache_dir to return whitespace
with patch(
"platformio.project.helpers.get_project_cache_dir"
) as mock_get_cache_dir:
mock_get_cache_dir.return_value = " " # Whitespace only
# Call the function
with caplog.at_level("INFO"):
clean_build()
# Verify pioenvs was removed
assert not pioenvs_dir.exists()
# Verify no cache cleaning was attempted due to empty string
assert "PlatformIO cache" not in caplog.text
@patch("esphome.writer.CORE")
def test_write_gitignore_creates_new_file(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_gitignore creates a new .gitignore file when it doesn't exist."""
gitignore_path = tmp_path / ".gitignore"
# Setup mocks
mock_core.relative_config_path.return_value = str(gitignore_path)
# Verify file doesn't exist
assert not gitignore_path.exists()
# Call the function
write_gitignore()
# Verify file was created with correct content
assert gitignore_path.exists()
assert gitignore_path.read_text() == GITIGNORE_CONTENT
@patch("esphome.writer.CORE")
def test_write_gitignore_skips_existing_file(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_gitignore doesn't overwrite existing .gitignore file."""
gitignore_path = tmp_path / ".gitignore"
existing_content = "# Custom gitignore\n/custom_dir/\n"
gitignore_path.write_text(existing_content)
# Setup mocks
mock_core.relative_config_path.return_value = str(gitignore_path)
# Verify file exists with custom content
assert gitignore_path.exists()
assert gitignore_path.read_text() == existing_content
# Call the function
write_gitignore()
# Verify file was not modified
assert gitignore_path.exists()
assert gitignore_path.read_text() == existing_content
@patch("esphome.writer.write_file_if_changed") # Mock to capture output
@patch("esphome.writer.copy_src_tree") # Keep this mock as it's complex
@patch("esphome.writer.CORE")
def test_write_cpp_with_existing_file(
mock_core: MagicMock,
mock_copy_src_tree: MagicMock,
mock_write_file: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp when main.cpp already exists."""
# Create a real file with markers
main_cpp = tmp_path / "main.cpp"
existing_content = f"""#include "esphome.h"
{CPP_INCLUDE_BEGIN}
// Old includes
{CPP_INCLUDE_END}
void setup() {{
{CPP_AUTO_GENERATE_BEGIN}
// Old code
{CPP_AUTO_GENERATE_END}
}}
void loop() {{}}"""
main_cpp.write_text(existing_content)
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
mock_core.cpp_global_section = "// Global section"
# Call the function
test_code = " // New generated code"
write_cpp(test_code)
# Verify copy_src_tree was called
mock_copy_src_tree.assert_called_once()
# Get the content that would be written
mock_write_file.assert_called_once()
written_path, written_content = mock_write_file.call_args[0]
# Check that markers are preserved and content is updated
assert CPP_INCLUDE_BEGIN in written_content
assert CPP_INCLUDE_END in written_content
assert CPP_AUTO_GENERATE_BEGIN in written_content
assert CPP_AUTO_GENERATE_END in written_content
assert test_code in written_content
assert "// Global section" in written_content
@patch("esphome.writer.write_file_if_changed") # Mock to capture output
@patch("esphome.writer.copy_src_tree") # Keep this mock as it's complex
@patch("esphome.writer.CORE")
def test_write_cpp_creates_new_file(
mock_core: MagicMock,
mock_copy_src_tree: MagicMock,
mock_write_file: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp when main.cpp doesn't exist."""
# Setup path for new file
main_cpp = tmp_path / "main.cpp"
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
mock_core.cpp_global_section = "// Global section"
# Verify file doesn't exist
assert not main_cpp.exists()
# Call the function
test_code = " // Generated code"
write_cpp(test_code)
# Verify copy_src_tree was called
mock_copy_src_tree.assert_called_once()
# Get the content that would be written
mock_write_file.assert_called_once()
written_path, written_content = mock_write_file.call_args[0]
assert written_path == str(main_cpp)
# Check that all necessary parts are in the new file
assert '#include "esphome.h"' in written_content
assert CPP_INCLUDE_BEGIN in written_content
assert CPP_INCLUDE_END in written_content
assert CPP_AUTO_GENERATE_BEGIN in written_content
assert CPP_AUTO_GENERATE_END in written_content
assert test_code in written_content
assert "void setup()" in written_content
assert "void loop()" in written_content
assert "App.setup();" in written_content
assert "App.loop();" in written_content
@pytest.mark.usefixtures("mock_copy_src_tree")
@patch("esphome.writer.CORE")
def test_write_cpp_with_missing_end_marker(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp raises error when end marker is missing."""
# Create a file with begin marker but no end marker
main_cpp = tmp_path / "main.cpp"
existing_content = f"""#include "esphome.h"
{CPP_AUTO_GENERATE_BEGIN}
// Code without end marker"""
main_cpp.write_text(existing_content)
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
# Call should raise an error
with pytest.raises(EsphomeError, match="Could not find auto generated code end"):
write_cpp("// New code")
@pytest.mark.usefixtures("mock_copy_src_tree")
@patch("esphome.writer.CORE")
def test_write_cpp_with_duplicate_markers(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp raises error when duplicate markers exist."""
# Create a file with duplicate begin markers
main_cpp = tmp_path / "main.cpp"
existing_content = f"""#include "esphome.h"
{CPP_AUTO_GENERATE_BEGIN}
// First section
{CPP_AUTO_GENERATE_END}
{CPP_AUTO_GENERATE_BEGIN}
// Duplicate section
{CPP_AUTO_GENERATE_END}"""
main_cpp.write_text(existing_content)
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
# Call should raise an error
with pytest.raises(EsphomeError, match="Found multiple auto generate code begins"):
write_cpp("// New code")