1
0
mirror of https://github.com/esphome/esphome.git synced 2025-11-02 08:01:50 +00:00

Compare commits

...

22 Commits

Author SHA1 Message Date
Jesse Hills
a96c013eb1 Merge pull request #10932 from esphome/bump-2025.9.2
2025.9.2
2025-09-30 07:50:17 +13:00
Jesse Hills
58166b3e71 Bump version to 2025.9.2 2025-09-29 21:39:17 +13:00
Clyde Stubbs
345fc0b6ca [mipi_spi] Fix t-display-amoled (#10922) 2025-09-29 21:39:17 +13:00
Oliver Kleinecke
127058e700 [usb_uart] Disable flow control on ch34x 2025-09-29 21:39:17 +13:00
Jonathan Swoboda
57f7a709cf [sx126x] Fix issues with variable length FSK packets (#10911) 2025-09-29 21:39:17 +13:00
J. Nick Koston
f2a9e9265e [esp32_improv] Fix crashes from uninitialized pointers and missing null checks (#10902) 2025-09-29 21:39:16 +13:00
Stuart Parmenter
1ecd26adb5 Set color_order to RGB for the Waveshare ESP32-S3-TOUCH-LCD-4.3 and ESP32-S3-TOUCH-LCD-7-800X480 (#10835) 2025-09-29 21:39:16 +13:00
Jonathan Swoboda
6d9fc672d5 [libretiny] Fix lib_ignore handling and ignore incompatible libraries (#10846) 2025-09-29 21:39:16 +13:00
J. Nick Koston
b9361b0868 [esp32_improv] Disable loop by default until provisioning needed (#10764) 2025-09-29 21:39:16 +13:00
Jesse Hills
e47f4ef602 Merge pull request #10796 from esphome/bump-2025.9.1
2025.9.1
2025-09-19 20:46:53 +12:00
Jesse Hills
961be7fd12 Bump version to 2025.9.1 2025-09-19 11:52:10 +12:00
J. Nick Koston
a5a21f47d1 [gpio] Fix unused function warnings when compiling with log level below DEBUG (#10779) 2025-09-19 11:52:09 +12:00
J. Nick Koston
a06cd84974 [core] Fix ESP8266 mDNS compilation failure caused by incorrect coroutine priorities (#10773) 2025-09-19 11:52:09 +12:00
Subhash Chandra
e3703b43c1 [packet_transport] Refactor sensor/provider list handling to be idempotent (#10765) 2025-09-19 11:52:09 +12:00
J. Nick Koston
f6dc25c0ce [mqtt] Fix KeyError when MQTT logging configured without explicit level (#10774) 2025-09-19 11:52:09 +12:00
Jesse Hills
d2df232706 Merge pull request #10763 from esphome/bump-2025.9.0
2025.9.0
2025-09-17 18:51:21 +12:00
Jesse Hills
404e679e66 Bump version to 2025.9.0 2025-09-17 11:02:12 +12:00
Jesse Hills
8d401ad05a Merge pull request #10761 from esphome/bump-2025.9.0b4
2025.9.0b4
2025-09-17 10:50:15 +12:00
Jesse Hills
e542816f7d Bump version to 2025.9.0b4 2025-09-17 09:22:54 +12:00
J. Nick Koston
12cadf0a04 [core] Fix clean build files to properly clear PlatformIO cache (#10754) 2025-09-17 09:22:54 +12:00
J. Nick Koston
adc3d3127d [wizard] Fix KeyError when running wizard with empty OTA password (#10753) 2025-09-17 09:22:54 +12:00
J. Nick Koston
61ab682099 Add additional coverage for util and writer (#10683) 2025-09-17 09:22:54 +12:00
30 changed files with 997 additions and 90 deletions

View File

@@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome
# could be handy for archiving the generated documentation or if some version # could be handy for archiving the generated documentation or if some version
# control system is used. # control system is used.
PROJECT_NUMBER = 2025.9.0b3 PROJECT_NUMBER = 2025.9.2
# Using the PROJECT_BRIEF tag one can provide an optional one line description # Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a # for a project that appears at the top of each page and should give viewer a

View File

@@ -212,7 +212,7 @@ def has_mqtt_logging() -> bool:
if CONF_TOPIC not in log_topic: if CONF_TOPIC not in log_topic:
return False return False
return log_topic[CONF_LEVEL] != "NONE" return log_topic.get(CONF_LEVEL, None) != "NONE"
def has_mqtt() -> bool: def has_mqtt() -> bool:

View File

@@ -10,7 +10,8 @@ from esphome.const import (
PLATFORM_LN882X, PLATFORM_LN882X,
PLATFORM_RTL87XX, PLATFORM_RTL87XX,
) )
from esphome.core import CORE, CoroPriority, coroutine_with_priority from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
AUTO_LOAD = ["web_server_base", "ota.web_server"] AUTO_LOAD = ["web_server_base", "ota.web_server"]
DEPENDENCIES = ["wifi"] DEPENDENCIES = ["wifi"]
@@ -40,7 +41,7 @@ CONFIG_SCHEMA = cv.All(
) )
@coroutine_with_priority(CoroPriority.COMMUNICATION) @coroutine_with_priority(CoroPriority.CAPTIVE_PORTAL)
async def to_code(config): async def to_code(config):
paren = await cg.get_variable(config[CONF_WEB_SERVER_BASE_ID]) paren = await cg.get_variable(config[CONF_WEB_SERVER_BASE_ID])

View File

@@ -15,6 +15,8 @@ using namespace bytebuffer;
static const char *const TAG = "esp32_improv.component"; static const char *const TAG = "esp32_improv.component";
static const char *const ESPHOME_MY_LINK = "https://my.home-assistant.io/redirect/config_flow_start?domain=esphome"; static const char *const ESPHOME_MY_LINK = "https://my.home-assistant.io/redirect/config_flow_start?domain=esphome";
static constexpr uint16_t STOP_ADVERTISING_DELAY =
10000; // Delay (ms) before stopping service to allow BLE clients to read the final state
ESP32ImprovComponent::ESP32ImprovComponent() { global_improv_component = this; } ESP32ImprovComponent::ESP32ImprovComponent() { global_improv_component = this; }
@@ -31,6 +33,9 @@ void ESP32ImprovComponent::setup() {
#endif #endif
global_ble_server->on(BLEServerEvt::EmptyEvt::ON_DISCONNECT, global_ble_server->on(BLEServerEvt::EmptyEvt::ON_DISCONNECT,
[this](uint16_t conn_id) { this->set_error_(improv::ERROR_NONE); }); [this](uint16_t conn_id) { this->set_error_(improv::ERROR_NONE); });
// Start with loop disabled - will be enabled by start() when needed
this->disable_loop();
} }
void ESP32ImprovComponent::setup_characteristics() { void ESP32ImprovComponent::setup_characteristics() {
@@ -190,6 +195,25 @@ void ESP32ImprovComponent::set_status_indicator_state_(bool state) {
#endif #endif
} }
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_DEBUG
const char *ESP32ImprovComponent::state_to_string_(improv::State state) {
switch (state) {
case improv::STATE_STOPPED:
return "STOPPED";
case improv::STATE_AWAITING_AUTHORIZATION:
return "AWAITING_AUTHORIZATION";
case improv::STATE_AUTHORIZED:
return "AUTHORIZED";
case improv::STATE_PROVISIONING:
return "PROVISIONING";
case improv::STATE_PROVISIONED:
return "PROVISIONED";
default:
return "UNKNOWN";
}
}
#endif
bool ESP32ImprovComponent::check_identify_() { bool ESP32ImprovComponent::check_identify_() {
uint32_t now = millis(); uint32_t now = millis();
@@ -203,31 +227,42 @@ bool ESP32ImprovComponent::check_identify_() {
} }
void ESP32ImprovComponent::set_state_(improv::State state) { void ESP32ImprovComponent::set_state_(improv::State state) {
ESP_LOGV(TAG, "Setting state: %d", state); #if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_DEBUG
if (this->state_ != state) {
ESP_LOGD(TAG, "State transition: %s (0x%02X) -> %s (0x%02X)", this->state_to_string_(this->state_), this->state_,
this->state_to_string_(state), state);
}
#endif
this->state_ = state; this->state_ = state;
if (this->status_->get_value().empty() || this->status_->get_value()[0] != state) { if (this->status_ != nullptr && (this->status_->get_value().empty() || this->status_->get_value()[0] != state)) {
this->status_->set_value(ByteBuffer::wrap(static_cast<uint8_t>(state))); this->status_->set_value(ByteBuffer::wrap(static_cast<uint8_t>(state)));
if (state != improv::STATE_STOPPED) if (state != improv::STATE_STOPPED)
this->status_->notify(); this->status_->notify();
} }
std::vector<uint8_t> service_data(8, 0); // Only advertise valid Improv states (0x01-0x04).
service_data[0] = 0x77; // PR // STATE_STOPPED (0x00) is internal only and not part of the Improv spec.
service_data[1] = 0x46; // IM // Advertising 0x00 causes undefined behavior in some clients and makes them
service_data[2] = static_cast<uint8_t>(state); // repeatedly connect trying to determine the actual state.
if (state != improv::STATE_STOPPED) {
std::vector<uint8_t> service_data(8, 0);
service_data[0] = 0x77; // PR
service_data[1] = 0x46; // IM
service_data[2] = static_cast<uint8_t>(state);
uint8_t capabilities = 0x00; uint8_t capabilities = 0x00;
#ifdef USE_OUTPUT #ifdef USE_OUTPUT
if (this->status_indicator_ != nullptr) if (this->status_indicator_ != nullptr)
capabilities |= improv::CAPABILITY_IDENTIFY; capabilities |= improv::CAPABILITY_IDENTIFY;
#endif #endif
service_data[3] = capabilities; service_data[3] = capabilities;
service_data[4] = 0x00; // Reserved service_data[4] = 0x00; // Reserved
service_data[5] = 0x00; // Reserved service_data[5] = 0x00; // Reserved
service_data[6] = 0x00; // Reserved service_data[6] = 0x00; // Reserved
service_data[7] = 0x00; // Reserved service_data[7] = 0x00; // Reserved
esp32_ble::global_ble->advertising_set_service_data(service_data); esp32_ble::global_ble->advertising_set_service_data(service_data);
}
#ifdef USE_ESP32_IMPROV_STATE_CALLBACK #ifdef USE_ESP32_IMPROV_STATE_CALLBACK
this->state_callback_.call(this->state_, this->error_state_); this->state_callback_.call(this->state_, this->error_state_);
#endif #endif
@@ -237,7 +272,12 @@ void ESP32ImprovComponent::set_error_(improv::Error error) {
if (error != improv::ERROR_NONE) { if (error != improv::ERROR_NONE) {
ESP_LOGE(TAG, "Error: %d", error); ESP_LOGE(TAG, "Error: %d", error);
} }
if (this->error_->get_value().empty() || this->error_->get_value()[0] != error) { // The error_ characteristic is initialized in setup_characteristics() which is called
// from the loop, while the BLE disconnect callback is registered in setup().
// error_ can be nullptr if:
// 1. A client connects/disconnects before setup_characteristics() is called
// 2. The device is already provisioned so the service never starts (should_start_ is false)
if (this->error_ != nullptr && (this->error_->get_value().empty() || this->error_->get_value()[0] != error)) {
this->error_->set_value(ByteBuffer::wrap(static_cast<uint8_t>(error))); this->error_->set_value(ByteBuffer::wrap(static_cast<uint8_t>(error)));
if (this->state_ != improv::STATE_STOPPED) if (this->state_ != improv::STATE_STOPPED)
this->error_->notify(); this->error_->notify();
@@ -261,7 +301,10 @@ void ESP32ImprovComponent::start() {
void ESP32ImprovComponent::stop() { void ESP32ImprovComponent::stop() {
this->should_start_ = false; this->should_start_ = false;
this->set_timeout("end-service", 1000, [this] { // Wait before stopping the service to ensure all BLE clients see the state change.
// This prevents clients from repeatedly reconnecting and wasting resources by allowing
// them to observe that the device is provisioned before the service disappears.
this->set_timeout("end-service", STOP_ADVERTISING_DELAY, [this] {
if (this->state_ == improv::STATE_STOPPED || this->service_ == nullptr) if (this->state_ == improv::STATE_STOPPED || this->service_ == nullptr)
return; return;
this->service_->stop(); this->service_->stop();

View File

@@ -79,12 +79,12 @@ class ESP32ImprovComponent : public Component {
std::vector<uint8_t> incoming_data_; std::vector<uint8_t> incoming_data_;
wifi::WiFiAP connecting_sta_; wifi::WiFiAP connecting_sta_;
BLEService *service_ = nullptr; BLEService *service_{nullptr};
BLECharacteristic *status_; BLECharacteristic *status_{nullptr};
BLECharacteristic *error_; BLECharacteristic *error_{nullptr};
BLECharacteristic *rpc_; BLECharacteristic *rpc_{nullptr};
BLECharacteristic *rpc_response_; BLECharacteristic *rpc_response_{nullptr};
BLECharacteristic *capabilities_; BLECharacteristic *capabilities_{nullptr};
#ifdef USE_BINARY_SENSOR #ifdef USE_BINARY_SENSOR
binary_sensor::BinarySensor *authorizer_{nullptr}; binary_sensor::BinarySensor *authorizer_{nullptr};
@@ -108,6 +108,9 @@ class ESP32ImprovComponent : public Component {
void process_incoming_data_(); void process_incoming_data_();
void on_wifi_connect_timeout_(); void on_wifi_connect_timeout_();
bool check_identify_(); bool check_identify_();
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_DEBUG
const char *state_to_string_(improv::State state);
#endif
}; };
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@@ -16,7 +16,8 @@ from esphome.const import (
CONF_SAFE_MODE, CONF_SAFE_MODE,
CONF_VERSION, CONF_VERSION,
) )
from esphome.core import CoroPriority, coroutine_with_priority from esphome.core import coroutine_with_priority
from esphome.coroutine import CoroPriority
import esphome.final_validate as fv import esphome.final_validate as fv
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -121,7 +122,7 @@ CONFIG_SCHEMA = (
FINAL_VALIDATE_SCHEMA = ota_esphome_final_validate FINAL_VALIDATE_SCHEMA = ota_esphome_final_validate
@coroutine_with_priority(CoroPriority.COMMUNICATION) @coroutine_with_priority(CoroPriority.OTA_UPDATES)
async def to_code(config): async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID]) var = cg.new_Pvariable(config[CONF_ID])
cg.add(var.set_port(config[CONF_PORT])) cg.add(var.set_port(config[CONF_PORT]))

View File

@@ -6,6 +6,7 @@ namespace gpio {
static const char *const TAG = "gpio.binary_sensor"; static const char *const TAG = "gpio.binary_sensor";
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_DEBUG
static const LogString *interrupt_type_to_string(gpio::InterruptType type) { static const LogString *interrupt_type_to_string(gpio::InterruptType type) {
switch (type) { switch (type) {
case gpio::INTERRUPT_RISING_EDGE: case gpio::INTERRUPT_RISING_EDGE:
@@ -22,6 +23,7 @@ static const LogString *interrupt_type_to_string(gpio::InterruptType type) {
static const LogString *gpio_mode_to_string(bool use_interrupt) { static const LogString *gpio_mode_to_string(bool use_interrupt) {
return use_interrupt ? LOG_STR("interrupt") : LOG_STR("polling"); return use_interrupt ? LOG_STR("interrupt") : LOG_STR("polling");
} }
#endif
void IRAM_ATTR GPIOBinarySensorStore::gpio_intr(GPIOBinarySensorStore *arg) { void IRAM_ATTR GPIOBinarySensorStore::gpio_intr(GPIOBinarySensorStore *arg) {
bool new_state = arg->isr_pin_.digital_read(); bool new_state = arg->isr_pin_.digital_read();

View File

@@ -128,4 +128,4 @@ async def to_code(config):
cg.add_library("tonia/HeatpumpIR", "1.0.37") cg.add_library("tonia/HeatpumpIR", "1.0.37")
if CORE.is_libretiny or CORE.is_esp32: if CORE.is_libretiny or CORE.is_esp32:
CORE.add_platformio_option("lib_ignore", "IRremoteESP8266") CORE.add_platformio_option("lib_ignore", ["IRremoteESP8266"])

View File

@@ -3,7 +3,8 @@ import esphome.codegen as cg
from esphome.components.ota import BASE_OTA_SCHEMA, OTAComponent, ota_to_code from esphome.components.ota import BASE_OTA_SCHEMA, OTAComponent, ota_to_code
import esphome.config_validation as cv import esphome.config_validation as cv
from esphome.const import CONF_ID, CONF_PASSWORD, CONF_URL, CONF_USERNAME from esphome.const import CONF_ID, CONF_PASSWORD, CONF_URL, CONF_USERNAME
from esphome.core import CoroPriority, coroutine_with_priority from esphome.core import coroutine_with_priority
from esphome.coroutine import CoroPriority
from .. import CONF_HTTP_REQUEST_ID, HttpRequestComponent, http_request_ns from .. import CONF_HTTP_REQUEST_ID, HttpRequestComponent, http_request_ns
@@ -40,7 +41,7 @@ CONFIG_SCHEMA = cv.All(
) )
@coroutine_with_priority(CoroPriority.COMMUNICATION) @coroutine_with_priority(CoroPriority.OTA_UPDATES)
async def to_code(config): async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID]) var = cg.new_Pvariable(config[CONF_ID])
await ota_to_code(var, config) await ota_to_code(var, config)

View File

@@ -11,7 +11,8 @@ from esphome.const import (
CONF_SERVICES, CONF_SERVICES,
PlatformFramework, PlatformFramework,
) )
from esphome.core import CORE, CoroPriority, coroutine_with_priority from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"] CODEOWNERS = ["@esphome/core"]
DEPENDENCIES = ["network"] DEPENDENCIES = ["network"]
@@ -72,7 +73,7 @@ def mdns_service(
) )
@coroutine_with_priority(CoroPriority.COMMUNICATION) @coroutine_with_priority(CoroPriority.NETWORK_SERVICES)
async def to_code(config): async def to_code(config):
if config[CONF_DISABLED] is True: if config[CONF_DISABLED] is True:
return return

View File

@@ -401,6 +401,12 @@ class DriverChip:
sequence.append((MADCTL, madctl)) sequence.append((MADCTL, madctl))
return madctl return madctl
def skip_command(self, command: str):
"""
Allow suppressing a standard command in the init sequence.
"""
return self.get_default(f"no_{command.lower()}", False)
def get_sequence(self, config) -> tuple[tuple[int, ...], int]: def get_sequence(self, config) -> tuple[tuple[int, ...], int]:
""" """
Create the init sequence for the display. Create the init sequence for the display.
@@ -432,7 +438,9 @@ class DriverChip:
sequence.append((INVOFF,)) sequence.append((INVOFF,))
if brightness := config.get(CONF_BRIGHTNESS, self.get_default(CONF_BRIGHTNESS)): if brightness := config.get(CONF_BRIGHTNESS, self.get_default(CONF_BRIGHTNESS)):
sequence.append((BRIGHTNESS, brightness)) sequence.append((BRIGHTNESS, brightness))
sequence.append((SLPOUT,)) # Add a SLPOUT command if required.
if not self.skip_command("SLPOUT"):
sequence.append((SLPOUT,))
sequence.append((DISPON,)) sequence.append((DISPON,))
# Flatten the sequence into a list of bytes, with the length of each command # Flatten the sequence into a list of bytes, with the length of each command

View File

@@ -7,6 +7,7 @@ wave_4_3 = DriverChip(
"ESP32-S3-TOUCH-LCD-4.3", "ESP32-S3-TOUCH-LCD-4.3",
swap_xy=UNDEFINED, swap_xy=UNDEFINED,
initsequence=(), initsequence=(),
color_order="RGB",
width=800, width=800,
height=480, height=480,
pclk_frequency="16MHz", pclk_frequency="16MHz",

View File

@@ -27,7 +27,8 @@ DriverChip(
bus_mode=TYPE_QUAD, bus_mode=TYPE_QUAD,
brightness=0xD0, brightness=0xD0,
color_order=MODE_RGB, color_order=MODE_RGB,
initsequence=(SLPOUT,), # Requires early SLPOUT no_slpout=True, # SLPOUT is in the init sequence, early
initsequence=(SLPOUT,),
) )
DriverChip( DriverChip(
@@ -95,6 +96,7 @@ CO5300 = DriverChip(
brightness=0xD0, brightness=0xD0,
color_order=MODE_RGB, color_order=MODE_RGB,
bus_mode=TYPE_QUAD, bus_mode=TYPE_QUAD,
no_slpout=True,
initsequence=( initsequence=(
(SLPOUT,), # Requires early SLPOUT (SLPOUT,), # Requires early SLPOUT
(PAGESEL, 0x00), (PAGESEL, 0x00),

View File

@@ -10,7 +10,8 @@ from esphome.const import (
CONF_TRIGGER_ID, CONF_TRIGGER_ID,
PlatformFramework, PlatformFramework,
) )
from esphome.core import CORE, CoroPriority, coroutine_with_priority from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"] CODEOWNERS = ["@esphome/core"]
AUTO_LOAD = ["md5", "safe_mode"] AUTO_LOAD = ["md5", "safe_mode"]
@@ -82,7 +83,7 @@ BASE_OTA_SCHEMA = cv.Schema(
) )
@coroutine_with_priority(CoroPriority.COMMUNICATION) @coroutine_with_priority(CoroPriority.OTA_UPDATES)
async def to_code(config): async def to_code(config):
cg.add_define("USE_OTA") cg.add_define("USE_OTA")

View File

@@ -121,15 +121,11 @@ def transport_schema(cls):
return TRANSPORT_SCHEMA.extend({cv.GenerateID(): cv.declare_id(cls)}) return TRANSPORT_SCHEMA.extend({cv.GenerateID(): cv.declare_id(cls)})
# Build a list of sensors for this platform
CORE.data[DOMAIN] = {CONF_SENSORS: []}
def get_sensors(transport_id): def get_sensors(transport_id):
"""Return the list of sensors for this platform.""" """Return the list of sensors for this platform."""
return ( return (
sensor sensor
for sensor in CORE.data[DOMAIN][CONF_SENSORS] for sensor in CORE.data.setdefault(DOMAIN, {}).setdefault(CONF_SENSORS, [])
if sensor[CONF_TRANSPORT_ID] == transport_id if sensor[CONF_TRANSPORT_ID] == transport_id
) )
@@ -137,7 +133,8 @@ def get_sensors(transport_id):
def validate_packet_transport_sensor(config): def validate_packet_transport_sensor(config):
if CONF_NAME in config and CONF_INTERNAL not in config: if CONF_NAME in config and CONF_INTERNAL not in config:
raise cv.Invalid("Must provide internal: config when using name:") raise cv.Invalid("Must provide internal: config when using name:")
CORE.data[DOMAIN][CONF_SENSORS].append(config) conf_sensors = CORE.data.setdefault(DOMAIN, {}).setdefault(CONF_SENSORS, [])
conf_sensors.append(config)
return config return config

View File

@@ -217,7 +217,7 @@ void SX126x::configure() {
this->write_opcode_(RADIO_SET_MODULATIONPARAMS, buf, 4); this->write_opcode_(RADIO_SET_MODULATIONPARAMS, buf, 4);
// set packet params and sync word // set packet params and sync word
this->set_packet_params_(this->payload_length_); this->set_packet_params_(this->get_max_packet_size());
if (this->sync_value_.size() == 2) { if (this->sync_value_.size() == 2) {
this->write_register_(REG_LORA_SYNCWORD, this->sync_value_.data(), this->sync_value_.size()); this->write_register_(REG_LORA_SYNCWORD, this->sync_value_.data(), this->sync_value_.size());
} }
@@ -236,7 +236,7 @@ void SX126x::configure() {
this->write_opcode_(RADIO_SET_MODULATIONPARAMS, buf, 8); this->write_opcode_(RADIO_SET_MODULATIONPARAMS, buf, 8);
// set packet params and sync word // set packet params and sync word
this->set_packet_params_(this->payload_length_); this->set_packet_params_(this->get_max_packet_size());
if (!this->sync_value_.empty()) { if (!this->sync_value_.empty()) {
this->write_register_(REG_GFSK_SYNCWORD, this->sync_value_.data(), this->sync_value_.size()); this->write_register_(REG_GFSK_SYNCWORD, this->sync_value_.data(), this->sync_value_.size());
} }
@@ -274,7 +274,7 @@ void SX126x::set_packet_params_(uint8_t payload_length) {
buf[2] = (this->preamble_detect_ > 0) ? ((this->preamble_detect_ - 1) | 0x04) : 0x00; buf[2] = (this->preamble_detect_ > 0) ? ((this->preamble_detect_ - 1) | 0x04) : 0x00;
buf[3] = this->sync_value_.size() * 8; buf[3] = this->sync_value_.size() * 8;
buf[4] = 0x00; buf[4] = 0x00;
buf[5] = 0x00; buf[5] = (this->payload_length_ > 0) ? 0x00 : 0x01;
buf[6] = payload_length; buf[6] = payload_length;
buf[7] = this->crc_enable_ ? 0x06 : 0x01; buf[7] = this->crc_enable_ ? 0x06 : 0x01;
buf[8] = 0x00; buf[8] = 0x00;
@@ -314,6 +314,9 @@ SX126xError SX126x::transmit_packet(const std::vector<uint8_t> &packet) {
buf[0] = 0xFF; buf[0] = 0xFF;
buf[1] = 0xFF; buf[1] = 0xFF;
this->write_opcode_(RADIO_CLR_IRQSTATUS, buf, 2); this->write_opcode_(RADIO_CLR_IRQSTATUS, buf, 2);
if (this->payload_length_ == 0) {
this->set_packet_params_(this->get_max_packet_size());
}
if (this->rx_start_) { if (this->rx_start_) {
this->set_mode_rx(); this->set_mode_rx();
} else { } else {

View File

@@ -72,6 +72,7 @@ void USBUartTypeCH34X::enable_channels() {
if (channel->index_ >= 2) if (channel->index_ >= 2)
cmd += 0xE; cmd += 0xE;
this->control_transfer(USB_VENDOR_DEV | usb_host::USB_DIR_OUT, cmd, value, (factor << 8) | divisor, callback); this->control_transfer(USB_VENDOR_DEV | usb_host::USB_DIR_OUT, cmd, value, (factor << 8) | divisor, callback);
this->control_transfer(USB_VENDOR_DEV | usb_host::USB_DIR_OUT, cmd + 3, 0x80, 0, callback);
} }
USBUartTypeCdcAcm::enable_channels(); USBUartTypeCdcAcm::enable_channels();
} }

View File

@@ -3,7 +3,8 @@ from esphome.components.esp32 import add_idf_component
from esphome.components.ota import BASE_OTA_SCHEMA, OTAComponent, ota_to_code from esphome.components.ota import BASE_OTA_SCHEMA, OTAComponent, ota_to_code
import esphome.config_validation as cv import esphome.config_validation as cv
from esphome.const import CONF_ID from esphome.const import CONF_ID
from esphome.core import CORE, CoroPriority, coroutine_with_priority from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"] CODEOWNERS = ["@esphome/core"]
DEPENDENCIES = ["network", "web_server_base"] DEPENDENCIES = ["network", "web_server_base"]
@@ -22,7 +23,7 @@ CONFIG_SCHEMA = (
) )
@coroutine_with_priority(CoroPriority.COMMUNICATION) @coroutine_with_priority(CoroPriority.WEB_SERVER_OTA)
async def to_code(config): async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID]) var = cg.new_Pvariable(config[CONF_ID])
await ota_to_code(var, config) await ota_to_code(var, config)

View File

@@ -1,7 +1,8 @@
import esphome.codegen as cg import esphome.codegen as cg
import esphome.config_validation as cv import esphome.config_validation as cv
from esphome.const import CONF_ID from esphome.const import CONF_ID
from esphome.core import CORE, CoroPriority, coroutine_with_priority from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"] CODEOWNERS = ["@esphome/core"]
DEPENDENCIES = ["network"] DEPENDENCIES = ["network"]
@@ -26,7 +27,7 @@ CONFIG_SCHEMA = cv.Schema(
) )
@coroutine_with_priority(CoroPriority.COMMUNICATION) @coroutine_with_priority(CoroPriority.WEB_SERVER_BASE)
async def to_code(config): async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID]) var = cg.new_Pvariable(config[CONF_ID])
await cg.register_component(var, config) await cg.register_component(var, config)
@@ -39,5 +40,7 @@ async def to_code(config):
cg.add_library("Update", None) cg.add_library("Update", None)
if CORE.is_esp8266: if CORE.is_esp8266:
cg.add_library("ESP8266WiFi", None) cg.add_library("ESP8266WiFi", None)
if CORE.is_libretiny:
CORE.add_platformio_option("lib_ignore", ["ESPAsyncTCP", "RPAsyncTCP"])
# https://github.com/ESP32Async/ESPAsyncWebServer/blob/main/library.json # https://github.com/ESP32Async/ESPAsyncWebServer/blob/main/library.json
cg.add_library("ESP32Async/ESPAsyncWebServer", "3.7.10") cg.add_library("ESP32Async/ESPAsyncWebServer", "3.7.10")

View File

@@ -4,7 +4,7 @@ from enum import Enum
from esphome.enum import StrEnum from esphome.enum import StrEnum
__version__ = "2025.9.0b3" __version__ = "2025.9.2"
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_" ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
VALID_SUBSTITUTIONS_CHARACTERS = ( VALID_SUBSTITUTIONS_CHARACTERS = (

View File

@@ -396,7 +396,7 @@ async def add_includes(includes):
async def _add_platformio_options(pio_options): async def _add_platformio_options(pio_options):
# Add includes at the very end, so that they override everything # Add includes at the very end, so that they override everything
for key, val in pio_options.items(): for key, val in pio_options.items():
if key == "build_flags" and not isinstance(val, list): if key in ["build_flags", "lib_ignore"] and not isinstance(val, list):
val = [val] val = [val]
cg.add_platformio_option(key, val) cg.add_platformio_option(key, val)

View File

@@ -90,11 +90,30 @@ class CoroPriority(enum.IntEnum):
# Examples: status_led (80) # Examples: status_led (80)
STATUS = 80 STATUS = 80
# Web server infrastructure
# Examples: web_server_base (65)
WEB_SERVER_BASE = 65
# Network portal services
# Examples: captive_portal (64)
CAPTIVE_PORTAL = 64
# Communication protocols and services # Communication protocols and services
# Examples: web_server_base (65), captive_portal (64), wifi (60), ethernet (60), # Examples: wifi (60), ethernet (60)
# mdns (55), ota_updates (54), web_server_ota (52)
COMMUNICATION = 60 COMMUNICATION = 60
# Network discovery and management services
# Examples: mdns (55)
NETWORK_SERVICES = 55
# OTA update services
# Examples: ota_updates (54)
OTA_UPDATES = 54
# Web-based OTA services
# Examples: web_server_ota (52)
WEB_SERVER_OTA = 52
# Application-level services # Application-level services
# Examples: safe_mode (50) # Examples: safe_mode (50)
APPLICATION = 50 APPLICATION = 50

View File

@@ -1,6 +1,7 @@
import os import os
import random import random
import string import string
from typing import Literal, NotRequired, TypedDict, Unpack
import unicodedata import unicodedata
import voluptuous as vol import voluptuous as vol
@@ -103,11 +104,25 @@ HARDWARE_BASE_CONFIGS = {
} }
def sanitize_double_quotes(value): def sanitize_double_quotes(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"') return value.replace("\\", "\\\\").replace('"', '\\"')
def wizard_file(**kwargs): class WizardFileKwargs(TypedDict):
"""Keyword arguments for wizard_file function."""
name: str
platform: Literal["ESP8266", "ESP32", "RP2040", "BK72XX", "LN882X", "RTL87XX"]
board: str
ssid: NotRequired[str]
psk: NotRequired[str]
password: NotRequired[str]
ota_password: NotRequired[str]
api_encryption_key: NotRequired[str]
friendly_name: NotRequired[str]
def wizard_file(**kwargs: Unpack[WizardFileKwargs]) -> str:
letters = string.ascii_letters + string.digits letters = string.ascii_letters + string.digits
ap_name_base = kwargs["name"].replace("_", " ").title() ap_name_base = kwargs["name"].replace("_", " ").title()
ap_name = f"{ap_name_base} Fallback Hotspot" ap_name = f"{ap_name_base} Fallback Hotspot"
@@ -180,7 +195,25 @@ captive_portal:
return config return config
def wizard_write(path, **kwargs): class WizardWriteKwargs(TypedDict):
"""Keyword arguments for wizard_write function."""
name: str
type: Literal["basic", "empty", "upload"]
# Required for "basic" type
board: NotRequired[str]
platform: NotRequired[str]
ssid: NotRequired[str]
psk: NotRequired[str]
password: NotRequired[str]
ota_password: NotRequired[str]
api_encryption_key: NotRequired[str]
friendly_name: NotRequired[str]
# Required for "upload" type
file_text: NotRequired[str]
def wizard_write(path: str, **kwargs: Unpack[WizardWriteKwargs]) -> bool:
from esphome.components.bk72xx import boards as bk72xx_boards from esphome.components.bk72xx import boards as bk72xx_boards
from esphome.components.esp32 import boards as esp32_boards from esphome.components.esp32 import boards as esp32_boards
from esphome.components.esp8266 import boards as esp8266_boards from esphome.components.esp8266 import boards as esp8266_boards
@@ -237,14 +270,14 @@ def wizard_write(path, **kwargs):
if get_bool_env(ENV_QUICKWIZARD): if get_bool_env(ENV_QUICKWIZARD):
def sleep(time): def sleep(time: float) -> None:
pass pass
else: else:
from time import sleep from time import sleep
def safe_print_step(step, big): def safe_print_step(step: int, big: str) -> None:
safe_print() safe_print()
safe_print() safe_print()
safe_print(f"============= STEP {step} =============") safe_print(f"============= STEP {step} =============")
@@ -253,14 +286,14 @@ def safe_print_step(step, big):
sleep(0.25) sleep(0.25)
def default_input(text, default): def default_input(text: str, default: str) -> str:
safe_print() safe_print()
safe_print(f"Press ENTER for default ({default})") safe_print(f"Press ENTER for default ({default})")
return safe_input(text.format(default)) or default return safe_input(text.format(default)) or default
# From https://stackoverflow.com/a/518232/8924614 # From https://stackoverflow.com/a/518232/8924614
def strip_accents(value): def strip_accents(value: str) -> str:
return "".join( return "".join(
c c
for c in unicodedata.normalize("NFD", str(value)) for c in unicodedata.normalize("NFD", str(value))
@@ -268,7 +301,7 @@ def strip_accents(value):
) )
def wizard(path): def wizard(path: str) -> int:
from esphome.components.bk72xx import boards as bk72xx_boards from esphome.components.bk72xx import boards as bk72xx_boards
from esphome.components.esp32 import boards as esp32_boards from esphome.components.esp32 import boards as esp32_boards
from esphome.components.esp8266 import boards as esp8266_boards from esphome.components.esp8266 import boards as esp8266_boards
@@ -509,6 +542,7 @@ def wizard(path):
ssid=ssid, ssid=ssid,
psk=psk, psk=psk,
password=password, password=password,
type="basic",
): ):
return 1 return 1

View File

@@ -315,6 +315,19 @@ def clean_build():
_LOGGER.info("Deleting %s", dependencies_lock) _LOGGER.info("Deleting %s", dependencies_lock)
os.remove(dependencies_lock) os.remove(dependencies_lock)
# Clean PlatformIO cache to resolve CMake compiler detection issues
# This helps when toolchain paths change or get corrupted
try:
from platformio.project.helpers import get_project_cache_dir
except ImportError:
# PlatformIO is not available, skip cache cleaning
pass
else:
cache_dir = get_project_cache_dir()
if cache_dir and cache_dir.strip() and os.path.isdir(cache_dir):
_LOGGER.info("Deleting PlatformIO cache %s", cache_dir)
shutil.rmtree(cache_dir)
GITIGNORE_CONTENT = """# Gitignore settings for ESPHome GITIGNORE_CONTENT = """# Gitignore settings for ESPHome
# This is an example and may include too much for your use-case. # This is an example and may include too much for your use-case.

View File

@@ -0,0 +1,42 @@
# Comprehensive ESP8266 test for mdns with multiple network components
# Tests the complete priority chain:
# wifi (60) -> mdns (55) -> ota (54) -> web_server_ota (52)
esphome:
name: mdns-comprehensive-test
esp8266:
board: esp01_1m
logger:
level: DEBUG
wifi:
ssid: MySSID
password: password1
# web_server_base should run at priority 65 (before wifi)
web_server:
port: 80
# mdns should run at priority 55 (after wifi at 60)
mdns:
services:
- service: _http
protocol: _tcp
port: 80
# OTA should run at priority 54 (after mdns)
ota:
- platform: esphome
password: "otapassword"
# Test status LED at priority 80
status_led:
pin:
number: GPIO2
inverted: true
# Include API at priority 40
api:
password: "apipassword"

View File

@@ -13,7 +13,12 @@ def test_coro_priority_enum_values() -> None:
assert CoroPriority.CORE == 100 assert CoroPriority.CORE == 100
assert CoroPriority.DIAGNOSTICS == 90 assert CoroPriority.DIAGNOSTICS == 90
assert CoroPriority.STATUS == 80 assert CoroPriority.STATUS == 80
assert CoroPriority.WEB_SERVER_BASE == 65
assert CoroPriority.CAPTIVE_PORTAL == 64
assert CoroPriority.COMMUNICATION == 60 assert CoroPriority.COMMUNICATION == 60
assert CoroPriority.NETWORK_SERVICES == 55
assert CoroPriority.OTA_UPDATES == 54
assert CoroPriority.WEB_SERVER_OTA == 52
assert CoroPriority.APPLICATION == 50 assert CoroPriority.APPLICATION == 50
assert CoroPriority.WEB == 40 assert CoroPriority.WEB == 40
assert CoroPriority.AUTOMATION == 30 assert CoroPriority.AUTOMATION == 30
@@ -70,7 +75,12 @@ def test_float_and_enum_are_interchangeable() -> None:
(CoroPriority.CORE, 100.0), (CoroPriority.CORE, 100.0),
(CoroPriority.DIAGNOSTICS, 90.0), (CoroPriority.DIAGNOSTICS, 90.0),
(CoroPriority.STATUS, 80.0), (CoroPriority.STATUS, 80.0),
(CoroPriority.WEB_SERVER_BASE, 65.0),
(CoroPriority.CAPTIVE_PORTAL, 64.0),
(CoroPriority.COMMUNICATION, 60.0), (CoroPriority.COMMUNICATION, 60.0),
(CoroPriority.NETWORK_SERVICES, 55.0),
(CoroPriority.OTA_UPDATES, 54.0),
(CoroPriority.WEB_SERVER_OTA, 52.0),
(CoroPriority.APPLICATION, 50.0), (CoroPriority.APPLICATION, 50.0),
(CoroPriority.WEB, 40.0), (CoroPriority.WEB, 40.0),
(CoroPriority.AUTOMATION, 30.0), (CoroPriority.AUTOMATION, 30.0),
@@ -164,8 +174,13 @@ def test_enum_priority_comparison() -> None:
assert CoroPriority.NETWORK_TRANSPORT > CoroPriority.CORE assert CoroPriority.NETWORK_TRANSPORT > CoroPriority.CORE
assert CoroPriority.CORE > CoroPriority.DIAGNOSTICS assert CoroPriority.CORE > CoroPriority.DIAGNOSTICS
assert CoroPriority.DIAGNOSTICS > CoroPriority.STATUS assert CoroPriority.DIAGNOSTICS > CoroPriority.STATUS
assert CoroPriority.STATUS > CoroPriority.COMMUNICATION assert CoroPriority.STATUS > CoroPriority.WEB_SERVER_BASE
assert CoroPriority.COMMUNICATION > CoroPriority.APPLICATION assert CoroPriority.WEB_SERVER_BASE > CoroPriority.CAPTIVE_PORTAL
assert CoroPriority.CAPTIVE_PORTAL > CoroPriority.COMMUNICATION
assert CoroPriority.COMMUNICATION > CoroPriority.NETWORK_SERVICES
assert CoroPriority.NETWORK_SERVICES > CoroPriority.OTA_UPDATES
assert CoroPriority.OTA_UPDATES > CoroPriority.WEB_SERVER_OTA
assert CoroPriority.WEB_SERVER_OTA > CoroPriority.APPLICATION
assert CoroPriority.APPLICATION > CoroPriority.WEB assert CoroPriority.APPLICATION > CoroPriority.WEB
assert CoroPriority.WEB > CoroPriority.AUTOMATION assert CoroPriority.WEB > CoroPriority.AUTOMATION
assert CoroPriority.AUTOMATION > CoroPriority.BUS assert CoroPriority.AUTOMATION > CoroPriority.BUS

View File

@@ -1226,6 +1226,18 @@ def test_has_mqtt_logging_no_log_topic() -> None:
setup_core(config={}) setup_core(config={})
assert has_mqtt_logging() is False assert has_mqtt_logging() is False
# Setup MQTT config with CONF_LOG_TOPIC but no CONF_LEVEL (regression test for #10771)
# This simulates the default configuration created by validate_config in the MQTT component
setup_core(
config={
CONF_MQTT: {
CONF_BROKER: "mqtt.local",
CONF_LOG_TOPIC: {CONF_TOPIC: "esphome/debug"},
}
}
)
assert has_mqtt_logging() is True
def test_has_mqtt() -> None: def test_has_mqtt() -> None:
"""Test has_mqtt function.""" """Test has_mqtt function."""

View File

@@ -141,3 +141,170 @@ def test_list_yaml_files_mixed_extensions(tmp_path: Path) -> None:
str(yaml_file), str(yaml_file),
str(yml_file), str(yml_file),
} }
def test_list_yaml_files_does_not_recurse_into_subdirectories(tmp_path: Path) -> None:
"""Test that list_yaml_files only finds files in specified directory, not subdirectories."""
# Create directory structure with YAML files at different depths
root = tmp_path / "configs"
root.mkdir()
# Create YAML files in the root directory
(root / "config1.yaml").write_text("test: 1")
(root / "config2.yml").write_text("test: 2")
(root / "device.yaml").write_text("test: device")
# Create subdirectory with YAML files (should NOT be found)
subdir = root / "subdir"
subdir.mkdir()
(subdir / "nested1.yaml").write_text("test: nested1")
(subdir / "nested2.yml").write_text("test: nested2")
# Create deeper subdirectory (should NOT be found)
deep_subdir = subdir / "deeper"
deep_subdir.mkdir()
(deep_subdir / "very_nested.yaml").write_text("test: very_nested")
# Test listing files from the root directory
result = util.list_yaml_files([str(root)])
# Should only find the 3 files in root, not the 3 in subdirectories
assert len(result) == 3
# Check that only root-level files are found
assert str(root / "config1.yaml") in result
assert str(root / "config2.yml") in result
assert str(root / "device.yaml") in result
# Ensure nested files are NOT found
for r in result:
assert "subdir" not in r
assert "deeper" not in r
assert "nested1.yaml" not in r
assert "nested2.yml" not in r
assert "very_nested.yaml" not in r
def test_list_yaml_files_excludes_secrets(tmp_path: Path) -> None:
"""Test that secrets.yaml and secrets.yml are excluded."""
root = tmp_path / "configs"
root.mkdir()
# Create various YAML files including secrets
(root / "config.yaml").write_text("test: config")
(root / "secrets.yaml").write_text("wifi_password: secret123")
(root / "secrets.yml").write_text("api_key: secret456")
(root / "device.yaml").write_text("test: device")
result = util.list_yaml_files([str(root)])
# Should find 2 files (config.yaml and device.yaml), not secrets
assert len(result) == 2
assert str(root / "config.yaml") in result
assert str(root / "device.yaml") in result
assert str(root / "secrets.yaml") not in result
assert str(root / "secrets.yml") not in result
def test_list_yaml_files_excludes_hidden_files(tmp_path: Path) -> None:
"""Test that hidden files (starting with .) are excluded."""
root = tmp_path / "configs"
root.mkdir()
# Create regular and hidden YAML files
(root / "config.yaml").write_text("test: config")
(root / ".hidden.yaml").write_text("test: hidden")
(root / ".backup.yml").write_text("test: backup")
(root / "device.yaml").write_text("test: device")
result = util.list_yaml_files([str(root)])
# Should find only non-hidden files
assert len(result) == 2
assert str(root / "config.yaml") in result
assert str(root / "device.yaml") in result
assert str(root / ".hidden.yaml") not in result
assert str(root / ".backup.yml") not in result
def test_filter_yaml_files_basic() -> None:
"""Test filter_yaml_files function."""
files = [
"/path/to/config.yaml",
"/path/to/device.yml",
"/path/to/readme.txt",
"/path/to/script.py",
"/path/to/data.json",
"/path/to/another.yaml",
]
result = util.filter_yaml_files(files)
assert len(result) == 3
assert "/path/to/config.yaml" in result
assert "/path/to/device.yml" in result
assert "/path/to/another.yaml" in result
assert "/path/to/readme.txt" not in result
assert "/path/to/script.py" not in result
assert "/path/to/data.json" not in result
def test_filter_yaml_files_excludes_secrets() -> None:
"""Test that filter_yaml_files excludes secrets files."""
files = [
"/path/to/config.yaml",
"/path/to/secrets.yaml",
"/path/to/secrets.yml",
"/path/to/device.yaml",
"/some/dir/secrets.yaml",
]
result = util.filter_yaml_files(files)
assert len(result) == 2
assert "/path/to/config.yaml" in result
assert "/path/to/device.yaml" in result
assert "/path/to/secrets.yaml" not in result
assert "/path/to/secrets.yml" not in result
assert "/some/dir/secrets.yaml" not in result
def test_filter_yaml_files_excludes_hidden() -> None:
"""Test that filter_yaml_files excludes hidden files."""
files = [
"/path/to/config.yaml",
"/path/to/.hidden.yaml",
"/path/to/.backup.yml",
"/path/to/device.yaml",
"/some/dir/.config.yaml",
]
result = util.filter_yaml_files(files)
assert len(result) == 2
assert "/path/to/config.yaml" in result
assert "/path/to/device.yaml" in result
assert "/path/to/.hidden.yaml" not in result
assert "/path/to/.backup.yml" not in result
assert "/some/dir/.config.yaml" not in result
def test_filter_yaml_files_case_sensitive() -> None:
"""Test that filter_yaml_files is case-sensitive for extensions."""
files = [
"/path/to/config.yaml",
"/path/to/config.YAML",
"/path/to/config.YML",
"/path/to/config.Yaml",
"/path/to/config.yml",
]
result = util.filter_yaml_files(files)
# Should only match lowercase .yaml and .yml
assert len(result) == 2
assert "/path/to/config.yaml" in result
assert "/path/to/config.yml" in result
assert "/path/to/config.YAML" not in result
assert "/path/to/config.YML" not in result
assert "/path/to/config.Yaml" not in result

View File

@@ -1,9 +1,12 @@
"""Tests for the wizard.py file.""" """Tests for the wizard.py file."""
import os import os
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock from unittest.mock import MagicMock
import pytest import pytest
from pytest import MonkeyPatch
from esphome.components.bk72xx.boards import BK72XX_BOARD_PINS from esphome.components.bk72xx.boards import BK72XX_BOARD_PINS
from esphome.components.esp32.boards import ESP32_BOARD_PINS from esphome.components.esp32.boards import ESP32_BOARD_PINS
@@ -15,7 +18,7 @@ import esphome.wizard as wz
@pytest.fixture @pytest.fixture
def default_config(): def default_config() -> dict[str, Any]:
return { return {
"type": "basic", "type": "basic",
"name": "test-name", "name": "test-name",
@@ -28,7 +31,7 @@ def default_config():
@pytest.fixture @pytest.fixture
def wizard_answers(): def wizard_answers() -> list[str]:
return [ return [
"test-node", # Name of the node "test-node", # Name of the node
"ESP8266", # platform "ESP8266", # platform
@@ -53,7 +56,9 @@ def test_sanitize_quotes_replaces_with_escaped_char():
assert output_str == '\\"key\\": \\"value\\"' assert output_str == '\\"key\\": \\"value\\"'
def test_config_file_fallback_ap_includes_descriptive_name(default_config): def test_config_file_fallback_ap_includes_descriptive_name(
default_config: dict[str, Any],
):
""" """
The fallback AP should include the node and a descriptive name The fallback AP should include the node and a descriptive name
""" """
@@ -67,7 +72,9 @@ def test_config_file_fallback_ap_includes_descriptive_name(default_config):
assert 'ssid: "Test Node Fallback Hotspot"' in config assert 'ssid: "Test Node Fallback Hotspot"' in config
def test_config_file_fallback_ap_name_less_than_32_chars(default_config): def test_config_file_fallback_ap_name_less_than_32_chars(
default_config: dict[str, Any],
):
""" """
The fallback AP name must be less than 32 chars. The fallback AP name must be less than 32 chars.
Since it is composed of the node name and "Fallback Hotspot" this can be too long and needs truncating Since it is composed of the node name and "Fallback Hotspot" this can be too long and needs truncating
@@ -82,7 +89,7 @@ def test_config_file_fallback_ap_name_less_than_32_chars(default_config):
assert 'ssid: "A Very Long Name For This Node"' in config assert 'ssid: "A Very Long Name For This Node"' in config
def test_config_file_should_include_ota(default_config): def test_config_file_should_include_ota(default_config: dict[str, Any]):
""" """
The Over-The-Air update should be enabled by default The Over-The-Air update should be enabled by default
""" """
@@ -95,7 +102,9 @@ def test_config_file_should_include_ota(default_config):
assert "ota:" in config assert "ota:" in config
def test_config_file_should_include_ota_when_password_set(default_config): def test_config_file_should_include_ota_when_password_set(
default_config: dict[str, Any],
):
""" """
The Over-The-Air update should be enabled when a password is set The Over-The-Air update should be enabled when a password is set
""" """
@@ -109,7 +118,9 @@ def test_config_file_should_include_ota_when_password_set(default_config):
assert "ota:" in config assert "ota:" in config
def test_wizard_write_sets_platform(default_config, tmp_path, monkeypatch): def test_wizard_write_sets_platform(
default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
):
""" """
If the platform is not explicitly set, use "ESP8266" if the board is one of the ESP8266 boards If the platform is not explicitly set, use "ESP8266" if the board is one of the ESP8266 boards
""" """
@@ -126,7 +137,7 @@ def test_wizard_write_sets_platform(default_config, tmp_path, monkeypatch):
assert "esp8266:" in generated_config assert "esp8266:" in generated_config
def test_wizard_empty_config(tmp_path, monkeypatch): def test_wizard_empty_config(tmp_path: Path, monkeypatch: MonkeyPatch):
""" """
The wizard should be able to create an empty configuration The wizard should be able to create an empty configuration
""" """
@@ -146,7 +157,7 @@ def test_wizard_empty_config(tmp_path, monkeypatch):
assert generated_config == "" assert generated_config == ""
def test_wizard_upload_config(tmp_path, monkeypatch): def test_wizard_upload_config(tmp_path: Path, monkeypatch: MonkeyPatch):
""" """
The wizard should be able to import an base64 encoded configuration The wizard should be able to import an base64 encoded configuration
""" """
@@ -168,7 +179,7 @@ def test_wizard_upload_config(tmp_path, monkeypatch):
def test_wizard_write_defaults_platform_from_board_esp8266( def test_wizard_write_defaults_platform_from_board_esp8266(
default_config, tmp_path, monkeypatch default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
): ):
""" """
If the platform is not explicitly set, use "ESP8266" if the board is one of the ESP8266 boards If the platform is not explicitly set, use "ESP8266" if the board is one of the ESP8266 boards
@@ -189,7 +200,7 @@ def test_wizard_write_defaults_platform_from_board_esp8266(
def test_wizard_write_defaults_platform_from_board_esp32( def test_wizard_write_defaults_platform_from_board_esp32(
default_config, tmp_path, monkeypatch default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
): ):
""" """
If the platform is not explicitly set, use "ESP32" if the board is one of the ESP32 boards If the platform is not explicitly set, use "ESP32" if the board is one of the ESP32 boards
@@ -210,7 +221,7 @@ def test_wizard_write_defaults_platform_from_board_esp32(
def test_wizard_write_defaults_platform_from_board_bk72xx( def test_wizard_write_defaults_platform_from_board_bk72xx(
default_config, tmp_path, monkeypatch default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
): ):
""" """
If the platform is not explicitly set, use "BK72XX" if the board is one of BK72XX boards If the platform is not explicitly set, use "BK72XX" if the board is one of BK72XX boards
@@ -231,7 +242,7 @@ def test_wizard_write_defaults_platform_from_board_bk72xx(
def test_wizard_write_defaults_platform_from_board_ln882x( def test_wizard_write_defaults_platform_from_board_ln882x(
default_config, tmp_path, monkeypatch default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
): ):
""" """
If the platform is not explicitly set, use "LN882X" if the board is one of LN882X boards If the platform is not explicitly set, use "LN882X" if the board is one of LN882X boards
@@ -252,7 +263,7 @@ def test_wizard_write_defaults_platform_from_board_ln882x(
def test_wizard_write_defaults_platform_from_board_rtl87xx( def test_wizard_write_defaults_platform_from_board_rtl87xx(
default_config, tmp_path, monkeypatch default_config: dict[str, Any], tmp_path: Path, monkeypatch: MonkeyPatch
): ):
""" """
If the platform is not explicitly set, use "RTL87XX" if the board is one of RTL87XX boards If the platform is not explicitly set, use "RTL87XX" if the board is one of RTL87XX boards
@@ -272,7 +283,7 @@ def test_wizard_write_defaults_platform_from_board_rtl87xx(
assert "rtl87xx:" in generated_config assert "rtl87xx:" in generated_config
def test_safe_print_step_prints_step_number_and_description(monkeypatch): def test_safe_print_step_prints_step_number_and_description(monkeypatch: MonkeyPatch):
""" """
The safe_print_step function prints the step number and the passed description The safe_print_step function prints the step number and the passed description
""" """
@@ -296,7 +307,7 @@ def test_safe_print_step_prints_step_number_and_description(monkeypatch):
assert any(f"STEP {step_num}" in arg for arg in all_args) assert any(f"STEP {step_num}" in arg for arg in all_args)
def test_default_input_uses_default_if_no_input_supplied(monkeypatch): def test_default_input_uses_default_if_no_input_supplied(monkeypatch: MonkeyPatch):
""" """
The default_input() function should return the supplied default value if the user doesn't enter anything The default_input() function should return the supplied default value if the user doesn't enter anything
""" """
@@ -312,7 +323,7 @@ def test_default_input_uses_default_if_no_input_supplied(monkeypatch):
assert retval == default_string assert retval == default_string
def test_default_input_uses_user_supplied_value(monkeypatch): def test_default_input_uses_user_supplied_value(monkeypatch: MonkeyPatch):
""" """
The default_input() function should return the value that the user enters The default_input() function should return the value that the user enters
""" """
@@ -376,7 +387,9 @@ def test_wizard_rejects_existing_files(tmpdir):
assert retval == 2 assert retval == 2
def test_wizard_accepts_default_answers_esp8266(tmpdir, monkeypatch, wizard_answers): def test_wizard_accepts_default_answers_esp8266(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
""" """
The wizard should accept the given default answers for esp8266 The wizard should accept the given default answers for esp8266
""" """
@@ -396,7 +409,9 @@ def test_wizard_accepts_default_answers_esp8266(tmpdir, monkeypatch, wizard_answ
assert retval == 0 assert retval == 0
def test_wizard_accepts_default_answers_esp32(tmpdir, monkeypatch, wizard_answers): def test_wizard_accepts_default_answers_esp32(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
""" """
The wizard should accept the given default answers for esp32 The wizard should accept the given default answers for esp32
""" """
@@ -418,7 +433,9 @@ def test_wizard_accepts_default_answers_esp32(tmpdir, monkeypatch, wizard_answer
assert retval == 0 assert retval == 0
def test_wizard_offers_better_node_name(tmpdir, monkeypatch, wizard_answers): def test_wizard_offers_better_node_name(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
""" """
When the node name does not conform, a better alternative is offered When the node name does not conform, a better alternative is offered
* Removes special chars * Removes special chars
@@ -449,7 +466,9 @@ def test_wizard_offers_better_node_name(tmpdir, monkeypatch, wizard_answers):
assert wz.default_input.call_args.args[1] == expected_name assert wz.default_input.call_args.args[1] == expected_name
def test_wizard_requires_correct_platform(tmpdir, monkeypatch, wizard_answers): def test_wizard_requires_correct_platform(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
""" """
When the platform is not either esp32 or esp8266, the wizard should reject it When the platform is not either esp32 or esp8266, the wizard should reject it
""" """
@@ -471,7 +490,9 @@ def test_wizard_requires_correct_platform(tmpdir, monkeypatch, wizard_answers):
assert retval == 0 assert retval == 0
def test_wizard_requires_correct_board(tmpdir, monkeypatch, wizard_answers): def test_wizard_requires_correct_board(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
""" """
When the board is not a valid esp8266 board, the wizard should reject it When the board is not a valid esp8266 board, the wizard should reject it
""" """
@@ -493,7 +514,9 @@ def test_wizard_requires_correct_board(tmpdir, monkeypatch, wizard_answers):
assert retval == 0 assert retval == 0
def test_wizard_requires_valid_ssid(tmpdir, monkeypatch, wizard_answers): def test_wizard_requires_valid_ssid(
tmpdir, monkeypatch: MonkeyPatch, wizard_answers: list[str]
):
""" """
When the board is not a valid esp8266 board, the wizard should reject it When the board is not a valid esp8266 board, the wizard should reject it
""" """
@@ -515,7 +538,9 @@ def test_wizard_requires_valid_ssid(tmpdir, monkeypatch, wizard_answers):
assert retval == 0 assert retval == 0
def test_wizard_write_protects_existing_config(tmpdir, default_config, monkeypatch): def test_wizard_write_protects_existing_config(
tmpdir, default_config: dict[str, Any], monkeypatch: MonkeyPatch
):
""" """
The wizard_write function should not overwrite existing config files and return False The wizard_write function should not overwrite existing config files and return False
""" """

View File

@@ -1,13 +1,34 @@
"""Test writer module functionality.""" """Test writer module functionality."""
from collections.abc import Callable from collections.abc import Callable
from pathlib import Path
from typing import Any from typing import Any
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import pytest import pytest
from esphome.core import EsphomeError
from esphome.storage_json import StorageJSON from esphome.storage_json import StorageJSON
from esphome.writer import storage_should_clean, update_storage_json from esphome.writer import (
CPP_AUTO_GENERATE_BEGIN,
CPP_AUTO_GENERATE_END,
CPP_INCLUDE_BEGIN,
CPP_INCLUDE_END,
GITIGNORE_CONTENT,
clean_build,
clean_cmake_cache,
storage_should_clean,
update_storage_json,
write_cpp,
write_gitignore,
)
@pytest.fixture
def mock_copy_src_tree():
"""Mock copy_src_tree to avoid side effects during tests."""
with patch("esphome.writer.copy_src_tree"):
yield
@pytest.fixture @pytest.fixture
@@ -218,3 +239,493 @@ def test_update_storage_json_logging_components_removed(
# Verify save was called # Verify save was called
new_storage.save.assert_called_once_with("/test/path") new_storage.save.assert_called_once_with("/test/path")
@patch("esphome.writer.CORE")
def test_clean_cmake_cache(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_cmake_cache removes CMakeCache.txt file."""
# Create directory structure
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
device_dir = pioenvs_dir / "test_device"
device_dir.mkdir()
cmake_cache_file = device_dir / "CMakeCache.txt"
cmake_cache_file.write_text("# CMake cache file")
# Setup mocks
mock_core.relative_pioenvs_path.side_effect = [
str(pioenvs_dir), # First call for directory check
str(cmake_cache_file), # Second call for file path
]
mock_core.name = "test_device"
# Verify file exists before
assert cmake_cache_file.exists()
# Call the function
with caplog.at_level("INFO"):
clean_cmake_cache()
# Verify file was removed
assert not cmake_cache_file.exists()
# Verify logging
assert "Deleting" in caplog.text
assert "CMakeCache.txt" in caplog.text
@patch("esphome.writer.CORE")
def test_clean_cmake_cache_no_pioenvs_dir(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test clean_cmake_cache when pioenvs directory doesn't exist."""
# Setup non-existent directory path
pioenvs_dir = tmp_path / ".pioenvs"
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
# Verify directory doesn't exist
assert not pioenvs_dir.exists()
# Call the function - should not crash
clean_cmake_cache()
# Verify directory still doesn't exist
assert not pioenvs_dir.exists()
@patch("esphome.writer.CORE")
def test_clean_cmake_cache_no_cmake_file(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test clean_cmake_cache when CMakeCache.txt doesn't exist."""
# Create directory structure without CMakeCache.txt
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
device_dir = pioenvs_dir / "test_device"
device_dir.mkdir()
cmake_cache_file = device_dir / "CMakeCache.txt"
# Setup mocks
mock_core.relative_pioenvs_path.side_effect = [
str(pioenvs_dir), # First call for directory check
str(cmake_cache_file), # Second call for file path
]
mock_core.name = "test_device"
# Verify file doesn't exist
assert not cmake_cache_file.exists()
# Call the function - should not crash
clean_cmake_cache()
# Verify file still doesn't exist
assert not cmake_cache_file.exists()
@patch("esphome.writer.CORE")
def test_clean_build(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_build removes all build artifacts."""
# Create directory structure and files
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
(pioenvs_dir / "test_file.o").write_text("object file")
piolibdeps_dir = tmp_path / ".piolibdeps"
piolibdeps_dir.mkdir()
(piolibdeps_dir / "library").mkdir()
dependencies_lock = tmp_path / "dependencies.lock"
dependencies_lock.write_text("lock file")
# Create PlatformIO cache directory
platformio_cache_dir = tmp_path / ".platformio" / ".cache"
platformio_cache_dir.mkdir(parents=True)
(platformio_cache_dir / "downloads").mkdir()
(platformio_cache_dir / "http").mkdir()
(platformio_cache_dir / "tmp").mkdir()
(platformio_cache_dir / "downloads" / "package.tar.gz").write_text("package")
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
mock_core.relative_build_path.return_value = str(dependencies_lock)
# Verify all exist before
assert pioenvs_dir.exists()
assert piolibdeps_dir.exists()
assert dependencies_lock.exists()
assert platformio_cache_dir.exists()
# Mock PlatformIO's get_project_cache_dir
with patch(
"platformio.project.helpers.get_project_cache_dir"
) as mock_get_cache_dir:
mock_get_cache_dir.return_value = str(platformio_cache_dir)
# Call the function
with caplog.at_level("INFO"):
clean_build()
# Verify all were removed
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
assert not platformio_cache_dir.exists()
# Verify logging
assert "Deleting" in caplog.text
assert ".pioenvs" in caplog.text
assert ".piolibdeps" in caplog.text
assert "dependencies.lock" in caplog.text
assert "PlatformIO cache" in caplog.text
@patch("esphome.writer.CORE")
def test_clean_build_partial_exists(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_build when only some paths exist."""
# Create only pioenvs directory
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
(pioenvs_dir / "test_file.o").write_text("object file")
piolibdeps_dir = tmp_path / ".piolibdeps"
dependencies_lock = tmp_path / "dependencies.lock"
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
mock_core.relative_build_path.return_value = str(dependencies_lock)
# Verify only pioenvs exists
assert pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Call the function
with caplog.at_level("INFO"):
clean_build()
# Verify only existing path was removed
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Verify logging - only pioenvs should be logged
assert "Deleting" in caplog.text
assert ".pioenvs" in caplog.text
assert ".piolibdeps" not in caplog.text
assert "dependencies.lock" not in caplog.text
@patch("esphome.writer.CORE")
def test_clean_build_nothing_exists(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test clean_build when no build artifacts exist."""
# Setup paths that don't exist
pioenvs_dir = tmp_path / ".pioenvs"
piolibdeps_dir = tmp_path / ".piolibdeps"
dependencies_lock = tmp_path / "dependencies.lock"
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
mock_core.relative_build_path.return_value = str(dependencies_lock)
# Verify nothing exists
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Call the function - should not crash
clean_build()
# Verify nothing was created
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
@patch("esphome.writer.CORE")
def test_clean_build_platformio_not_available(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_build when PlatformIO is not available."""
# Create directory structure and files
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
piolibdeps_dir = tmp_path / ".piolibdeps"
piolibdeps_dir.mkdir()
dependencies_lock = tmp_path / "dependencies.lock"
dependencies_lock.write_text("lock file")
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
mock_core.relative_build_path.return_value = str(dependencies_lock)
# Verify all exist before
assert pioenvs_dir.exists()
assert piolibdeps_dir.exists()
assert dependencies_lock.exists()
# Mock import error for platformio
with (
patch.dict("sys.modules", {"platformio.project.helpers": None}),
caplog.at_level("INFO"),
):
# Call the function
clean_build()
# Verify standard paths were removed but no cache cleaning attempted
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Verify no cache logging
assert "PlatformIO cache" not in caplog.text
@patch("esphome.writer.CORE")
def test_clean_build_empty_cache_dir(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_build when get_project_cache_dir returns empty/whitespace."""
# Create directory structure and files
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(tmp_path / ".piolibdeps")
mock_core.relative_build_path.return_value = str(tmp_path / "dependencies.lock")
# Verify pioenvs exists before
assert pioenvs_dir.exists()
# Mock PlatformIO's get_project_cache_dir to return whitespace
with patch(
"platformio.project.helpers.get_project_cache_dir"
) as mock_get_cache_dir:
mock_get_cache_dir.return_value = " " # Whitespace only
# Call the function
with caplog.at_level("INFO"):
clean_build()
# Verify pioenvs was removed
assert not pioenvs_dir.exists()
# Verify no cache cleaning was attempted due to empty string
assert "PlatformIO cache" not in caplog.text
@patch("esphome.writer.CORE")
def test_write_gitignore_creates_new_file(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_gitignore creates a new .gitignore file when it doesn't exist."""
gitignore_path = tmp_path / ".gitignore"
# Setup mocks
mock_core.relative_config_path.return_value = str(gitignore_path)
# Verify file doesn't exist
assert not gitignore_path.exists()
# Call the function
write_gitignore()
# Verify file was created with correct content
assert gitignore_path.exists()
assert gitignore_path.read_text() == GITIGNORE_CONTENT
@patch("esphome.writer.CORE")
def test_write_gitignore_skips_existing_file(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_gitignore doesn't overwrite existing .gitignore file."""
gitignore_path = tmp_path / ".gitignore"
existing_content = "# Custom gitignore\n/custom_dir/\n"
gitignore_path.write_text(existing_content)
# Setup mocks
mock_core.relative_config_path.return_value = str(gitignore_path)
# Verify file exists with custom content
assert gitignore_path.exists()
assert gitignore_path.read_text() == existing_content
# Call the function
write_gitignore()
# Verify file was not modified
assert gitignore_path.exists()
assert gitignore_path.read_text() == existing_content
@patch("esphome.writer.write_file_if_changed") # Mock to capture output
@patch("esphome.writer.copy_src_tree") # Keep this mock as it's complex
@patch("esphome.writer.CORE")
def test_write_cpp_with_existing_file(
mock_core: MagicMock,
mock_copy_src_tree: MagicMock,
mock_write_file: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp when main.cpp already exists."""
# Create a real file with markers
main_cpp = tmp_path / "main.cpp"
existing_content = f"""#include "esphome.h"
{CPP_INCLUDE_BEGIN}
// Old includes
{CPP_INCLUDE_END}
void setup() {{
{CPP_AUTO_GENERATE_BEGIN}
// Old code
{CPP_AUTO_GENERATE_END}
}}
void loop() {{}}"""
main_cpp.write_text(existing_content)
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
mock_core.cpp_global_section = "// Global section"
# Call the function
test_code = " // New generated code"
write_cpp(test_code)
# Verify copy_src_tree was called
mock_copy_src_tree.assert_called_once()
# Get the content that would be written
mock_write_file.assert_called_once()
written_path, written_content = mock_write_file.call_args[0]
# Check that markers are preserved and content is updated
assert CPP_INCLUDE_BEGIN in written_content
assert CPP_INCLUDE_END in written_content
assert CPP_AUTO_GENERATE_BEGIN in written_content
assert CPP_AUTO_GENERATE_END in written_content
assert test_code in written_content
assert "// Global section" in written_content
@patch("esphome.writer.write_file_if_changed") # Mock to capture output
@patch("esphome.writer.copy_src_tree") # Keep this mock as it's complex
@patch("esphome.writer.CORE")
def test_write_cpp_creates_new_file(
mock_core: MagicMock,
mock_copy_src_tree: MagicMock,
mock_write_file: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp when main.cpp doesn't exist."""
# Setup path for new file
main_cpp = tmp_path / "main.cpp"
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
mock_core.cpp_global_section = "// Global section"
# Verify file doesn't exist
assert not main_cpp.exists()
# Call the function
test_code = " // Generated code"
write_cpp(test_code)
# Verify copy_src_tree was called
mock_copy_src_tree.assert_called_once()
# Get the content that would be written
mock_write_file.assert_called_once()
written_path, written_content = mock_write_file.call_args[0]
assert written_path == str(main_cpp)
# Check that all necessary parts are in the new file
assert '#include "esphome.h"' in written_content
assert CPP_INCLUDE_BEGIN in written_content
assert CPP_INCLUDE_END in written_content
assert CPP_AUTO_GENERATE_BEGIN in written_content
assert CPP_AUTO_GENERATE_END in written_content
assert test_code in written_content
assert "void setup()" in written_content
assert "void loop()" in written_content
assert "App.setup();" in written_content
assert "App.loop();" in written_content
@pytest.mark.usefixtures("mock_copy_src_tree")
@patch("esphome.writer.CORE")
def test_write_cpp_with_missing_end_marker(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp raises error when end marker is missing."""
# Create a file with begin marker but no end marker
main_cpp = tmp_path / "main.cpp"
existing_content = f"""#include "esphome.h"
{CPP_AUTO_GENERATE_BEGIN}
// Code without end marker"""
main_cpp.write_text(existing_content)
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
# Call should raise an error
with pytest.raises(EsphomeError, match="Could not find auto generated code end"):
write_cpp("// New code")
@pytest.mark.usefixtures("mock_copy_src_tree")
@patch("esphome.writer.CORE")
def test_write_cpp_with_duplicate_markers(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp raises error when duplicate markers exist."""
# Create a file with duplicate begin markers
main_cpp = tmp_path / "main.cpp"
existing_content = f"""#include "esphome.h"
{CPP_AUTO_GENERATE_BEGIN}
// First section
{CPP_AUTO_GENERATE_END}
{CPP_AUTO_GENERATE_BEGIN}
// Duplicate section
{CPP_AUTO_GENERATE_END}"""
main_cpp.write_text(existing_content)
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
# Call should raise an error
with pytest.raises(EsphomeError, match="Found multiple auto generate code begins"):
write_cpp("// New code")