1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-18 17:53:47 +01:00

Merge pull request #11284 from esphome/bump-2025.10.1

2025.10.1
This commit is contained in:
Jesse Hills
2025-10-16 23:00:00 +13:00
committed by GitHub
21 changed files with 714 additions and 120 deletions

View File

@@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 2025.10.0
PROJECT_NUMBER = 2025.10.1
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

View File

@@ -117,6 +117,17 @@ class Purpose(StrEnum):
LOGGING = "logging"
class PortType(StrEnum):
SERIAL = "SERIAL"
NETWORK = "NETWORK"
MQTT = "MQTT"
MQTTIP = "MQTTIP"
# Magic MQTT port types that require special handling
_MQTT_PORT_TYPES = frozenset({PortType.MQTT, PortType.MQTTIP})
def _resolve_with_cache(address: str, purpose: Purpose) -> list[str]:
"""Resolve an address using cache if available, otherwise return the address itself."""
if CORE.address_cache and (cached := CORE.address_cache.get_addresses(address)):
@@ -280,16 +291,67 @@ def mqtt_get_ip(config: ConfigType, username: str, password: str, client_id: str
return mqtt.get_esphome_device_ip(config, username, password, client_id)
_PORT_TO_PORT_TYPE = {
"MQTT": "MQTT",
"MQTTIP": "MQTTIP",
}
def _resolve_network_devices(
devices: list[str], config: ConfigType, args: ArgsProtocol
) -> list[str]:
"""Resolve device list, converting MQTT magic strings to actual IP addresses.
This function filters the devices list to:
- Replace MQTT/MQTTIP magic strings with actual IP addresses via MQTT lookup
- Deduplicate addresses while preserving order
- Only resolve MQTT once even if multiple MQTT strings are present
- If MQTT resolution fails, log a warning and continue with other devices
Args:
devices: List of device identifiers (IPs, hostnames, or magic strings)
config: ESPHome configuration
args: Command-line arguments containing MQTT credentials
Returns:
List of network addresses suitable for connection attempts
"""
network_devices: list[str] = []
mqtt_resolved: bool = False
for device in devices:
port_type = get_port_type(device)
if port_type in _MQTT_PORT_TYPES:
# Only resolve MQTT once, even if multiple MQTT entries
if not mqtt_resolved:
try:
mqtt_ips = mqtt_get_ip(
config, args.username, args.password, args.client_id
)
network_devices.extend(mqtt_ips)
except EsphomeError as err:
_LOGGER.warning(
"MQTT IP discovery failed (%s), will try other devices if available",
err,
)
mqtt_resolved = True
elif device not in network_devices:
# Regular network address or IP - add if not already present
network_devices.append(device)
return network_devices
def get_port_type(port: str) -> str:
def get_port_type(port: str) -> PortType:
"""Determine the type of port/device identifier.
Returns:
PortType.SERIAL for serial ports (/dev/ttyUSB0, COM1, etc.)
PortType.MQTT for MQTT logging
PortType.MQTTIP for MQTT IP lookup
PortType.NETWORK for IP addresses, hostnames, or mDNS names
"""
if port.startswith("/") or port.startswith("COM"):
return "SERIAL"
return _PORT_TO_PORT_TYPE.get(port, "NETWORK")
return PortType.SERIAL
if port == "MQTT":
return PortType.MQTT
if port == "MQTTIP":
return PortType.MQTTIP
return PortType.NETWORK
def run_miniterm(config: ConfigType, port: str, args) -> int:
@@ -489,7 +551,7 @@ def upload_using_platformio(config: ConfigType, port: str):
def check_permissions(port: str):
if os.name == "posix" and get_port_type(port) == "SERIAL":
if os.name == "posix" and get_port_type(port) == PortType.SERIAL:
# Check if we can open selected serial port
if not os.access(port, os.F_OK):
raise EsphomeError(
@@ -517,7 +579,7 @@ def upload_program(
except AttributeError:
pass
if get_port_type(host) == "SERIAL":
if get_port_type(host) == PortType.SERIAL:
check_permissions(host)
exit_code = 1
@@ -544,17 +606,16 @@ def upload_program(
from esphome import espota2
remote_port = int(ota_conf[CONF_PORT])
password = ota_conf.get(CONF_PASSWORD, "")
password = ota_conf.get(CONF_PASSWORD)
if getattr(args, "file", None) is not None:
binary = Path(args.file)
else:
binary = CORE.firmware_bin
# MQTT address resolution
if get_port_type(host) in ("MQTT", "MQTTIP"):
devices = mqtt_get_ip(config, args.username, args.password, args.client_id)
# Resolve MQTT magic strings to actual IP addresses
network_devices = _resolve_network_devices(devices, config, args)
return espota2.run_ota(devices, remote_port, password, binary)
return espota2.run_ota(network_devices, remote_port, password, binary)
def show_logs(config: ConfigType, args: ArgsProtocol, devices: list[str]) -> int | None:
@@ -569,33 +630,22 @@ def show_logs(config: ConfigType, args: ArgsProtocol, devices: list[str]) -> int
raise EsphomeError("Logger is not configured!")
port = devices[0]
port_type = get_port_type(port)
if get_port_type(port) == "SERIAL":
if port_type == PortType.SERIAL:
check_permissions(port)
return run_miniterm(config, port, args)
port_type = get_port_type(port)
# Check if we should use API for logging
if has_api():
addresses_to_use: list[str] | None = None
# Resolve MQTT magic strings to actual IP addresses
if has_api() and (
network_devices := _resolve_network_devices(devices, config, args)
):
from esphome.components.api.client import run_logs
if port_type == "NETWORK":
# Network addresses (IPs, mDNS names, or regular DNS hostnames) can be used
# The resolve_ip_address() function in helpers.py handles all types
addresses_to_use = devices
elif port_type in ("MQTT", "MQTTIP") and has_mqtt_ip_lookup():
# Use MQTT IP lookup for MQTT/MQTTIP types
addresses_to_use = mqtt_get_ip(
config, args.username, args.password, args.client_id
)
return run_logs(config, network_devices)
if addresses_to_use is not None:
from esphome.components.api.client import run_logs
return run_logs(config, addresses_to_use)
if port_type in ("NETWORK", "MQTT") and has_mqtt_logging():
if port_type in (PortType.NETWORK, PortType.MQTT) and has_mqtt_logging():
from esphome import mqtt
return mqtt.show_logs(

View File

@@ -775,7 +775,7 @@ void Display::test_card() {
int shift_y = (h - image_h) / 2;
int line_w = (image_w - 6) / 6;
int image_c = image_w / 2;
for (auto i = 0; i <= image_h; i++) {
for (auto i = 0; i != image_h; i++) {
int c = esp_scale(i, image_h);
this->horizontal_line(shift_x + 0, shift_y + i, line_w, r.fade_to_white(c));
this->horizontal_line(shift_x + line_w, shift_y + i, line_w, r.fade_to_black(c)); //
@@ -809,8 +809,11 @@ void Display::test_card() {
}
}
}
this->rectangle(0, 0, w, h, Color(127, 0, 127));
this->filled_rectangle(0, 0, 10, 10, Color(255, 0, 255));
this->filled_rectangle(w - 10, 0, 10, 10, Color(255, 0, 255));
this->filled_rectangle(0, h - 10, 10, 10, Color(255, 0, 255));
this->filled_rectangle(w - 10, h - 10, 10, 10, Color(255, 0, 255));
this->rectangle(0, 0, w, h, Color(255, 255, 255));
this->stop_poller();
}

View File

@@ -19,6 +19,7 @@ from esphome.const import (
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
import esphome.final_validate as fv
from esphome.types import ConfigType
_LOGGER = logging.getLogger(__name__)
@@ -136,11 +137,12 @@ FINAL_VALIDATE_SCHEMA = ota_esphome_final_validate
@coroutine_with_priority(CoroPriority.OTA_UPDATES)
async def to_code(config):
async def to_code(config: ConfigType) -> None:
var = cg.new_Pvariable(config[CONF_ID])
cg.add(var.set_port(config[CONF_PORT]))
if CONF_PASSWORD in config:
# Password could be set to an empty string and we can assume that means no password
if config.get(CONF_PASSWORD):
cg.add(var.set_auth_password(config[CONF_PASSWORD]))
cg.add_define("USE_OTA_PASSWORD")
# Only include hash algorithms when password is configured

View File

@@ -11,6 +11,7 @@ from esphome.const import (
CONF_BRIGHTNESS,
CONF_COLOR_ORDER,
CONF_DIMENSIONS,
CONF_DISABLED,
CONF_HEIGHT,
CONF_INIT_SEQUENCE,
CONF_INVERT_COLORS,
@@ -301,6 +302,8 @@ class DriverChip:
Check if a rotation can be implemented in hardware using the MADCTL register.
A rotation of 180 is always possible if x and y mirroring are supported, 90 and 270 are possible if the model supports swapping X and Y.
"""
if config.get(CONF_TRANSFORM) == CONF_DISABLED:
return False
transforms = self.transforms
rotation = config.get(CONF_ROTATION, 0)
if rotation == 0 or not transforms:
@@ -358,26 +361,26 @@ class DriverChip:
CONF_SWAP_XY: self.get_default(CONF_SWAP_XY),
},
)
# fill in defaults if not provided
mirror_x = transform.get(CONF_MIRROR_X, self.get_default(CONF_MIRROR_X))
mirror_y = transform.get(CONF_MIRROR_Y, self.get_default(CONF_MIRROR_Y))
swap_xy = transform.get(CONF_SWAP_XY, self.get_default(CONF_SWAP_XY))
transform[CONF_MIRROR_X] = mirror_x
transform[CONF_MIRROR_Y] = mirror_y
transform[CONF_SWAP_XY] = swap_xy
if not isinstance(transform, dict):
# Presumably disabled
return {
CONF_MIRROR_X: False,
CONF_MIRROR_Y: False,
CONF_SWAP_XY: False,
CONF_TRANSFORM: False,
}
# Can we use the MADCTL register to set the rotation?
if can_transform and CONF_TRANSFORM not in config:
rotation = config[CONF_ROTATION]
if rotation == 180:
transform[CONF_MIRROR_X] = not mirror_x
transform[CONF_MIRROR_Y] = not mirror_y
transform[CONF_MIRROR_X] = not transform[CONF_MIRROR_X]
transform[CONF_MIRROR_Y] = not transform[CONF_MIRROR_Y]
elif rotation == 90:
transform[CONF_SWAP_XY] = not swap_xy
transform[CONF_MIRROR_X] = not mirror_x
transform[CONF_SWAP_XY] = not transform[CONF_SWAP_XY]
transform[CONF_MIRROR_X] = not transform[CONF_MIRROR_X]
else:
transform[CONF_SWAP_XY] = not swap_xy
transform[CONF_MIRROR_Y] = not mirror_y
transform[CONF_SWAP_XY] = not transform[CONF_SWAP_XY]
transform[CONF_MIRROR_Y] = not transform[CONF_MIRROR_Y]
transform[CONF_TRANSFORM] = True
return transform

View File

@@ -37,6 +37,7 @@ from esphome.const import (
CONF_DATA_RATE,
CONF_DC_PIN,
CONF_DIMENSIONS,
CONF_DISABLED,
CONF_ENABLE_PIN,
CONF_ID,
CONF_INIT_SEQUENCE,
@@ -146,12 +147,15 @@ def swap_xy_schema(model):
def model_schema(config):
model = MODELS[config[CONF_MODEL]]
bus_mode = config[CONF_BUS_MODE]
transform = cv.Schema(
{
cv.Required(CONF_MIRROR_X): cv.boolean,
cv.Required(CONF_MIRROR_Y): cv.boolean,
**swap_xy_schema(model),
}
transform = cv.Any(
cv.Schema(
{
cv.Required(CONF_MIRROR_X): cv.boolean,
cv.Required(CONF_MIRROR_Y): cv.boolean,
**swap_xy_schema(model),
}
),
cv.one_of(CONF_DISABLED, lower=True),
)
# CUSTOM model will need to provide a custom init sequence
iseqconf = (
@@ -160,7 +164,11 @@ def model_schema(config):
else cv.Optional(CONF_INIT_SEQUENCE)
)
# Dimensions are optional if the model has a default width and the x-y transform is not overridden
is_swapped = config.get(CONF_TRANSFORM, {}).get(CONF_SWAP_XY) is True
transform_config = config.get(CONF_TRANSFORM, {})
is_swapped = (
isinstance(transform_config, dict)
and transform_config.get(CONF_SWAP_XY, False) is True
)
cv_dimensions = (
cv.Optional if model.get_default(CONF_WIDTH) and not is_swapped else cv.Required
)
@@ -192,9 +200,7 @@ def model_schema(config):
.extend(
{
cv.GenerateID(): cv.declare_id(MipiSpi),
cv_dimensions(CONF_DIMENSIONS): dimension_schema(
model.get_default(CONF_DRAW_ROUNDING, 1)
),
cv_dimensions(CONF_DIMENSIONS): dimension_schema(1),
model.option(CONF_ENABLE_PIN, cv.UNDEFINED): cv.ensure_list(
pins.gpio_output_pin_schema
),
@@ -400,6 +406,7 @@ def get_instance(config):
offset_height,
DISPLAY_ROTATIONS[rotation],
frac,
config[CONF_DRAW_ROUNDING],
]
)
return MipiSpiBuffer, templateargs
@@ -431,7 +438,6 @@ async def to_code(config):
else:
config[CONF_ROTATION] = 0
cg.add(var.set_model(config[CONF_MODEL]))
cg.add(var.set_draw_rounding(config[CONF_DRAW_ROUNDING]))
if enable_pin := config.get(CONF_ENABLE_PIN):
enable = [await cg.gpio_pin_expression(pin) for pin in enable_pin]
cg.add(var.set_enable_pins(enable))

View File

@@ -38,7 +38,7 @@ static constexpr uint8_t MADCTL_BGR = 0x08; // Bit 3 Blue-Green-Red pixel ord
static constexpr uint8_t MADCTL_XFLIP = 0x02; // Mirror the display horizontally
static constexpr uint8_t MADCTL_YFLIP = 0x01; // Mirror the display vertically
static const uint8_t DELAY_FLAG = 0xFF;
static constexpr uint8_t DELAY_FLAG = 0xFF;
// store a 16 bit value in a buffer, big endian.
static inline void put16_be(uint8_t *buf, uint16_t value) {
buf[0] = value >> 8;
@@ -79,7 +79,7 @@ class MipiSpi : public display::Display,
public spi::SPIDevice<spi::BIT_ORDER_MSB_FIRST, spi::CLOCK_POLARITY_LOW, spi::CLOCK_PHASE_LEADING,
spi::DATA_RATE_1MHZ> {
public:
MipiSpi() {}
MipiSpi() = default;
void update() override { this->stop_poller(); }
void draw_pixel_at(int x, int y, Color color) override {}
void set_model(const char *model) { this->model_ = model; }
@@ -99,7 +99,6 @@ class MipiSpi : public display::Display,
int get_width_internal() override { return WIDTH; }
int get_height_internal() override { return HEIGHT; }
void set_init_sequence(const std::vector<uint8_t> &sequence) { this->init_sequence_ = sequence; }
void set_draw_rounding(unsigned rounding) { this->draw_rounding_ = rounding; }
// reset the display, and write the init sequence
void setup() override {
@@ -326,6 +325,7 @@ class MipiSpi : public display::Display,
/**
* Writes a buffer to the display.
* @param ptr The pointer to the pixel data
* @param w Width of each line in bytes
* @param h Height of the buffer in rows
* @param pad Padding in bytes after each line
@@ -424,7 +424,6 @@ class MipiSpi : public display::Display,
// other properties set by configuration
bool invert_colors_{};
unsigned draw_rounding_{2};
optional<uint8_t> brightness_{};
const char *model_{"Unknown"};
std::vector<uint8_t> init_sequence_{};
@@ -444,12 +443,20 @@ class MipiSpi : public display::Display,
* @tparam OFFSET_WIDTH The x-offset of the display in pixels
* @tparam OFFSET_HEIGHT The y-offset of the display in pixels
* @tparam FRACTION The fraction of the display size to use for the buffer (e.g. 4 means a 1/4 buffer).
* @tparam ROUNDING The alignment requirement for drawing operations (e.g. 2 means that x coordinates must be even)
*/
template<typename BUFFERTYPE, PixelMode BUFFERPIXEL, bool IS_BIG_ENDIAN, PixelMode DISPLAYPIXEL, BusType BUS_TYPE,
int WIDTH, int HEIGHT, int OFFSET_WIDTH, int OFFSET_HEIGHT, display::DisplayRotation ROTATION, int FRACTION>
uint16_t WIDTH, uint16_t HEIGHT, int OFFSET_WIDTH, int OFFSET_HEIGHT, display::DisplayRotation ROTATION,
int FRACTION, unsigned ROUNDING>
class MipiSpiBuffer : public MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DISPLAYPIXEL, BUS_TYPE, WIDTH, HEIGHT,
OFFSET_WIDTH, OFFSET_HEIGHT> {
public:
// these values define the buffer size needed to write in accordance with the chip pixel alignment
// requirements. If the required rounding does not divide the width and height, we round up to the next multiple and
// ignore the extra columns and rows when drawing, but use them to write to the display.
static constexpr unsigned BUFFER_WIDTH = (WIDTH + ROUNDING - 1) / ROUNDING * ROUNDING;
static constexpr unsigned BUFFER_HEIGHT = (HEIGHT + ROUNDING - 1) / ROUNDING * ROUNDING;
MipiSpiBuffer() { this->rotation_ = ROTATION; }
void dump_config() override {
@@ -461,15 +468,15 @@ class MipiSpiBuffer : public MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DIS
" Buffer fraction: 1/%d\n"
" Buffer bytes: %zu\n"
" Draw rounding: %u",
this->rotation_, BUFFERPIXEL * 8, FRACTION, sizeof(BUFFERTYPE) * WIDTH * HEIGHT / FRACTION,
this->draw_rounding_);
this->rotation_, BUFFERPIXEL * 8, FRACTION,
sizeof(BUFFERTYPE) * BUFFER_WIDTH * BUFFER_HEIGHT / FRACTION, ROUNDING);
}
void setup() override {
MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DISPLAYPIXEL, BUS_TYPE, WIDTH, HEIGHT, OFFSET_WIDTH,
OFFSET_HEIGHT>::setup();
RAMAllocator<BUFFERTYPE> allocator{};
this->buffer_ = allocator.allocate(WIDTH * HEIGHT / FRACTION);
this->buffer_ = allocator.allocate(BUFFER_WIDTH * BUFFER_HEIGHT / FRACTION);
if (this->buffer_ == nullptr) {
this->mark_failed("Buffer allocation failed");
}
@@ -508,15 +515,14 @@ class MipiSpiBuffer : public MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DIS
esph_log_v(TAG, "x_low %d, y_low %d, x_high %d, y_high %d", this->x_low_, this->y_low_, this->x_high_,
this->y_high_);
// Some chips require that the drawing window be aligned on certain boundaries
auto dr = this->draw_rounding_;
this->x_low_ = this->x_low_ / dr * dr;
this->y_low_ = this->y_low_ / dr * dr;
this->x_high_ = (this->x_high_ + dr) / dr * dr - 1;
this->y_high_ = (this->y_high_ + dr) / dr * dr - 1;
this->x_low_ = this->x_low_ / ROUNDING * ROUNDING;
this->y_low_ = this->y_low_ / ROUNDING * ROUNDING;
this->x_high_ = (this->x_high_ + ROUNDING) / ROUNDING * ROUNDING - 1;
this->y_high_ = (this->y_high_ + ROUNDING) / ROUNDING * ROUNDING - 1;
int w = this->x_high_ - this->x_low_ + 1;
int h = this->y_high_ - this->y_low_ + 1;
this->write_to_display_(this->x_low_, this->y_low_, w, h, this->buffer_, this->x_low_,
this->y_low_ - this->start_line_, WIDTH - w);
this->y_low_ - this->start_line_, BUFFER_WIDTH - w);
// invalidate watermarks
this->x_low_ = WIDTH;
this->y_low_ = HEIGHT;
@@ -536,10 +542,10 @@ class MipiSpiBuffer : public MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DIS
void draw_pixel_at(int x, int y, Color color) override {
if (!this->get_clipping().inside(x, y))
return;
rotate_coordinates_(x, y);
rotate_coordinates(x, y);
if (x < 0 || x >= WIDTH || y < this->start_line_ || y >= this->end_line_)
return;
this->buffer_[(y - this->start_line_) * WIDTH + x] = convert_color_(color);
this->buffer_[(y - this->start_line_) * BUFFER_WIDTH + x] = convert_color(color);
if (x < this->x_low_) {
this->x_low_ = x;
}
@@ -560,7 +566,7 @@ class MipiSpiBuffer : public MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DIS
this->y_low_ = this->start_line_;
this->x_high_ = WIDTH - 1;
this->y_high_ = this->end_line_ - 1;
std::fill_n(this->buffer_, HEIGHT * WIDTH / FRACTION, convert_color_(color));
std::fill_n(this->buffer_, HEIGHT * BUFFER_WIDTH / FRACTION, convert_color(color));
}
int get_width() override {
@@ -577,7 +583,7 @@ class MipiSpiBuffer : public MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DIS
protected:
// Rotate the coordinates to match the display orientation.
void rotate_coordinates_(int &x, int &y) const {
static void rotate_coordinates(int &x, int &y) {
if constexpr (ROTATION == display::DISPLAY_ROTATION_180_DEGREES) {
x = WIDTH - x - 1;
y = HEIGHT - y - 1;
@@ -593,7 +599,7 @@ class MipiSpiBuffer : public MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DIS
}
// Convert a color to the buffer pixel format.
BUFFERTYPE convert_color_(Color &color) const {
static BUFFERTYPE convert_color(const Color &color) {
if constexpr (BUFFERPIXEL == PIXEL_MODE_8) {
return (color.red & 0xE0) | (color.g & 0xE0) >> 3 | color.b >> 6;
} else if constexpr (BUFFERPIXEL == PIXEL_MODE_16) {

View File

@@ -3,6 +3,7 @@ import esphome.config_validation as cv
from .amoled import CO5300
from .ili import ILI9488_A
from .jc import AXS15231
DriverChip(
"WAVESHARE-4-TFT",
@@ -152,3 +153,12 @@ CO5300.extend(
cs_pin=12,
reset_pin=39,
)
AXS15231.extend(
"WAVESHARE-ESP32-S3-TOUCH-LCD-3.49",
width=172,
height=640,
data_rate="80MHz",
cs_pin=9,
reset_pin=21,
)

View File

@@ -1,7 +1,7 @@
import logging
from esphome import core
from esphome.config_helpers import Extend, Remove, merge_config
from esphome.config_helpers import Extend, Remove, merge_config, merge_dicts_ordered
import esphome.config_validation as cv
from esphome.const import CONF_SUBSTITUTIONS, VALID_SUBSTITUTIONS_CHARACTERS
from esphome.yaml_util import ESPHomeDataBase, ESPLiteralValue, make_data_base
@@ -170,10 +170,10 @@ def do_substitution_pass(config, command_line_substitutions, ignore_missing=Fals
return
# Merge substitutions in config, overriding with substitutions coming from command line:
substitutions = {
**config.get(CONF_SUBSTITUTIONS, {}),
**(command_line_substitutions or {}),
}
# Use merge_dicts_ordered to preserve OrderedDict type for move_to_end()
substitutions = merge_dicts_ordered(
config.get(CONF_SUBSTITUTIONS, {}), command_line_substitutions or {}
)
with cv.prepend_path("substitutions"):
if not isinstance(substitutions, dict):
raise cv.Invalid(

View File

@@ -402,8 +402,8 @@ async def to_code(config):
add_idf_sdkconfig_option("CONFIG_LWIP_DHCPS", False)
# Disable Enterprise WiFi support if no EAP is configured
if CORE.is_esp32 and not has_eap:
add_idf_sdkconfig_option("CONFIG_ESP_WIFI_ENTERPRISE_SUPPORT", False)
if CORE.is_esp32:
add_idf_sdkconfig_option("CONFIG_ESP_WIFI_ENTERPRISE_SUPPORT", has_eap)
cg.add(var.set_reboot_timeout(config[CONF_REBOOT_TIMEOUT]))
cg.add(var.set_power_save_mode(config[CONF_POWER_SAVE_MODE]))

View File

@@ -12,7 +12,7 @@ from typing import Any
import voluptuous as vol
from esphome import core, loader, pins, yaml_util
from esphome.config_helpers import Extend, Remove
from esphome.config_helpers import Extend, Remove, merge_dicts_ordered
import esphome.config_validation as cv
from esphome.const import (
CONF_ESPHOME,
@@ -922,10 +922,9 @@ def validate_config(
if CONF_SUBSTITUTIONS in config or command_line_substitutions:
from esphome.components import substitutions
result[CONF_SUBSTITUTIONS] = {
**(config.get(CONF_SUBSTITUTIONS) or {}),
**command_line_substitutions,
}
result[CONF_SUBSTITUTIONS] = merge_dicts_ordered(
config.get(CONF_SUBSTITUTIONS) or {}, command_line_substitutions
)
result.add_output_path([CONF_SUBSTITUTIONS], CONF_SUBSTITUTIONS)
try:
substitutions.do_substitution_pass(config, command_line_substitutions)

View File

@@ -10,6 +10,7 @@ from esphome.const import (
PlatformFramework,
)
from esphome.core import CORE
from esphome.util import OrderedDict
# Pre-build lookup map from (platform, framework) tuples to PlatformFramework enum
_PLATFORM_FRAMEWORK_LOOKUP = {
@@ -17,6 +18,25 @@ _PLATFORM_FRAMEWORK_LOOKUP = {
}
def merge_dicts_ordered(*dicts: dict) -> OrderedDict:
"""Merge multiple dicts into an OrderedDict, preserving key order.
This is a helper to ensure that dictionary merging preserves OrderedDict type,
which is important for operations like move_to_end().
Args:
*dicts: Variable number of dictionaries to merge (later dicts override earlier ones)
Returns:
OrderedDict with merged contents
"""
result = OrderedDict()
for d in dicts:
if d:
result.update(d)
return result
class Extend:
def __init__(self, value):
self.value = value
@@ -60,7 +80,11 @@ def merge_config(full_old, full_new):
if isinstance(new, dict):
if not isinstance(old, dict):
return new
res = old.copy()
# Preserve OrderedDict type by copying to OrderedDict if either input is OrderedDict
if isinstance(old, OrderedDict) or isinstance(new, OrderedDict):
res = OrderedDict(old)
else:
res = old.copy()
for k, v in new.items():
if isinstance(v, Remove) and k in old:
del res[k]

View File

@@ -4,7 +4,7 @@ from enum import Enum
from esphome.enum import StrEnum
__version__ = "2025.10.0"
__version__ = "2025.10.1"
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
VALID_SUBSTITUTIONS_CHARACTERS = (

View File

@@ -242,7 +242,7 @@ def send_check(
def perform_ota(
sock: socket.socket, password: str, file_handle: io.IOBase, filename: Path
sock: socket.socket, password: str | None, file_handle: io.IOBase, filename: Path
) -> None:
file_contents = file_handle.read()
file_size = len(file_contents)
@@ -278,13 +278,13 @@ def perform_ota(
def perform_auth(
sock: socket.socket,
password: str,
password: str | None,
hash_func: Callable[..., Any],
nonce_size: int,
hash_name: str,
) -> None:
"""Perform challenge-response authentication using specified hash algorithm."""
if not password:
if password is None:
raise OTAError("ESP requests password, but no password given!")
nonce_bytes = receive_exactly(
@@ -385,7 +385,7 @@ def perform_ota(
def run_ota_impl_(
remote_host: str | list[str], remote_port: int, password: str, filename: Path
remote_host: str | list[str], remote_port: int, password: str | None, filename: Path
) -> tuple[int, str | None]:
from esphome.core import CORE
@@ -436,7 +436,7 @@ def run_ota_impl_(
def run_ota(
remote_host: str | list[str], remote_port: int, password: str, filename: Path
remote_host: str | list[str], remote_port: int, password: str | None, filename: Path
) -> tuple[int, str | None]:
try:
return run_ota_impl_(remote_host, remote_port, password, filename)

View File

@@ -69,7 +69,7 @@ def run_schema_validation(config: ConfigType) -> None:
{
"id": "display_id",
"model": "custom",
"dimensions": {"width": 320, "height": 240},
"dimensions": {"width": 260, "height": 260},
"draw_rounding": 13,
"init_sequence": [[0xA0, 0x01]],
},
@@ -336,7 +336,7 @@ def test_native_generation(
main_cpp = generate_main(component_fixture_path("native.yaml"))
assert (
"mipi_spi::MipiSpiBuffer<uint16_t, mipi_spi::PIXEL_MODE_16, true, mipi_spi::PIXEL_MODE_16, mipi_spi::BUS_TYPE_QUAD, 360, 360, 0, 1, display::DISPLAY_ROTATION_0_DEGREES, 1>()"
"mipi_spi::MipiSpiBuffer<uint16_t, mipi_spi::PIXEL_MODE_16, true, mipi_spi::PIXEL_MODE_16, mipi_spi::BUS_TYPE_QUAD, 360, 360, 0, 1, display::DISPLAY_ROTATION_0_DEGREES, 1, 1>()"
in main_cpp
)
assert "set_init_sequence({240, 1, 8, 242" in main_cpp

View File

@@ -7,8 +7,8 @@ display:
id: ili9xxx_display
model: GC9A01A
invert_colors: True
cs_pin: 10
dc_pin: 6
cs_pin: 11
dc_pin: 7
pages:
- id: page1
lambda: |-

View File

@@ -10,7 +10,7 @@ display:
invert_colors: true
show_test_card: true
spi_mode: mode0
draw_rounding: 8
draw_rounding: 4
use_axis_flips: true
init_sequence:
- [0xd0, 1, 2, 3]

View File

@@ -1,7 +1,7 @@
substitutions:
dc_pin: GPIO14
cs_pin: GPIO13
enable_pin: GPIO16
enable_pin: GPIO17
reset_pin: GPIO20
packages:

View File

@@ -287,7 +287,7 @@ def test_perform_ota_no_auth(mock_socket: Mock, mock_file: io.BytesIO) -> None:
mock_socket.recv.side_effect = recv_responses
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
# Should not send any auth-related data
auth_calls = [
@@ -317,7 +317,7 @@ def test_perform_ota_with_compression(mock_socket: Mock) -> None:
mock_socket.recv.side_effect = recv_responses
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
# Verify compressed content was sent
# Get the binary size that was sent (4 bytes after features)
@@ -347,7 +347,7 @@ def test_perform_ota_auth_without_password(mock_socket: Mock) -> None:
with pytest.raises(
espota2.OTAError, match="ESP requests password, but no password given"
):
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
@pytest.mark.usefixtures("mock_time")
@@ -413,7 +413,7 @@ def test_perform_ota_sha256_auth_without_password(mock_socket: Mock) -> None:
with pytest.raises(
espota2.OTAError, match="ESP requests password, but no password given"
):
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
def test_perform_ota_unexpected_auth_response(mock_socket: Mock) -> None:
@@ -450,7 +450,7 @@ def test_perform_ota_unsupported_version(mock_socket: Mock) -> None:
mock_socket.recv.side_effect = responses
with pytest.raises(espota2.OTAError, match="Device uses unsupported OTA version"):
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
@pytest.mark.usefixtures("mock_time")
@@ -471,7 +471,7 @@ def test_perform_ota_upload_error(mock_socket: Mock, mock_file: io.BytesIO) -> N
mock_socket.recv.side_effect = recv_responses
with pytest.raises(espota2.OTAError, match="Error receiving acknowledge chunk OK"):
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
@pytest.mark.usefixtures("mock_socket_constructor", "mock_resolve_ip")
@@ -706,7 +706,7 @@ def test_perform_ota_version_differences(
]
mock_socket.recv.side_effect = recv_responses
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
# For v1.0, verify that we only get the expected number of recv calls
# v1.0 doesn't have chunk acknowledgments, so fewer recv calls
@@ -732,7 +732,7 @@ def test_perform_ota_version_differences(
]
mock_socket.recv.side_effect = recv_responses_v2
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
# For v2.0, verify more recv calls due to chunk acknowledgments
assert mock_socket.recv.call_count == 9 # v2.0 has 9 recv calls (includes chunk OK)

View File

@@ -1062,7 +1062,7 @@ def test_upload_program_ota_with_file_arg(
assert exit_code == 0
assert host == "192.168.1.100"
mock_run_ota.assert_called_once_with(
["192.168.1.100"], 3232, "", Path("custom.bin")
["192.168.1.100"], 3232, None, Path("custom.bin")
)
@@ -1119,7 +1119,9 @@ def test_upload_program_ota_with_mqtt_resolution(
expected_firmware = (
tmp_path / ".esphome" / "build" / "test" / ".pioenvs" / "test" / "firmware.bin"
)
mock_run_ota.assert_called_once_with(["192.168.1.100"], 3232, "", expected_firmware)
mock_run_ota.assert_called_once_with(
["192.168.1.100"], 3232, None, expected_firmware
)
@patch("esphome.__main__.importlib.import_module")
@@ -1976,3 +1978,292 @@ def test_command_clean_all_args_used() -> None:
# Verify the correct configuration paths were passed
mock_clean_all.assert_any_call(["/path/to/config1"])
mock_clean_all.assert_any_call(["/path/to/config2", "/path/to/config3"])
def test_upload_program_ota_static_ip_with_mqttip(
mock_mqtt_get_ip: Mock,
mock_run_ota: Mock,
tmp_path: Path,
) -> None:
"""Test upload_program with static IP and MQTTIP (issue #11260).
This tests the scenario where a device has manual_ip (static IP) configured
and MQTT is also configured. The devices list contains both the static IP
and "MQTTIP" magic string. This previously failed because only the first
device was checked for MQTT resolution.
"""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path)
mock_mqtt_get_ip.return_value = ["192.168.2.50"] # Different subnet
mock_run_ota.return_value = (0, "192.168.1.100")
config = {
CONF_OTA: [
{
CONF_PLATFORM: CONF_ESPHOME,
CONF_PORT: 3232,
}
],
CONF_MQTT: {
CONF_BROKER: "mqtt.local",
},
}
args = MockArgs(username="user", password="pass", client_id="client")
# Simulates choose_upload_log_host returning static IP + MQTTIP
devices = ["192.168.1.100", "MQTTIP"]
exit_code, host = upload_program(config, args, devices)
assert exit_code == 0
assert host == "192.168.1.100"
# Verify MQTT was resolved
mock_mqtt_get_ip.assert_called_once_with(config, "user", "pass", "client")
# Verify espota2.run_ota was called with both IPs
expected_firmware = (
tmp_path / ".esphome" / "build" / "test" / ".pioenvs" / "test" / "firmware.bin"
)
mock_run_ota.assert_called_once_with(
["192.168.1.100", "192.168.2.50"], 3232, None, expected_firmware
)
def test_upload_program_ota_multiple_mqttip_resolves_once(
mock_mqtt_get_ip: Mock,
mock_run_ota: Mock,
tmp_path: Path,
) -> None:
"""Test that MQTT resolution only happens once even with multiple MQTT magic strings."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path)
mock_mqtt_get_ip.return_value = ["192.168.2.50", "192.168.2.51"]
mock_run_ota.return_value = (0, "192.168.2.50")
config = {
CONF_OTA: [
{
CONF_PLATFORM: CONF_ESPHOME,
CONF_PORT: 3232,
}
],
CONF_MQTT: {
CONF_BROKER: "mqtt.local",
},
}
args = MockArgs(username="user", password="pass", client_id="client")
# Multiple MQTT magic strings in the list
devices = ["MQTTIP", "MQTT", "192.168.1.100"]
exit_code, host = upload_program(config, args, devices)
assert exit_code == 0
assert host == "192.168.2.50"
# Verify MQTT was only resolved once despite multiple MQTT magic strings
mock_mqtt_get_ip.assert_called_once_with(config, "user", "pass", "client")
# Verify espota2.run_ota was called with all unique IPs
expected_firmware = (
tmp_path / ".esphome" / "build" / "test" / ".pioenvs" / "test" / "firmware.bin"
)
mock_run_ota.assert_called_once_with(
["192.168.2.50", "192.168.2.51", "192.168.1.100"], 3232, None, expected_firmware
)
def test_upload_program_ota_mqttip_deduplication(
mock_mqtt_get_ip: Mock,
mock_run_ota: Mock,
tmp_path: Path,
) -> None:
"""Test that duplicate IPs are filtered when MQTT returns same IP as static IP."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path)
# MQTT returns the same IP as the static IP
mock_mqtt_get_ip.return_value = ["192.168.1.100"]
mock_run_ota.return_value = (0, "192.168.1.100")
config = {
CONF_OTA: [
{
CONF_PLATFORM: CONF_ESPHOME,
CONF_PORT: 3232,
}
],
CONF_MQTT: {
CONF_BROKER: "mqtt.local",
},
}
args = MockArgs(username="user", password="pass", client_id="client")
devices = ["192.168.1.100", "MQTTIP"]
exit_code, host = upload_program(config, args, devices)
assert exit_code == 0
assert host == "192.168.1.100"
# Verify MQTT was resolved
mock_mqtt_get_ip.assert_called_once_with(config, "user", "pass", "client")
# Verify espota2.run_ota was called with deduplicated IPs (only one instance of 192.168.1.100)
# Note: Current implementation doesn't dedupe, so we'll get the IP twice
# This test documents current behavior - deduplication could be future enhancement
mock_run_ota.assert_called_once()
call_args = mock_run_ota.call_args[0]
# Should contain both the original IP and MQTT-resolved IP (even if duplicate)
assert "192.168.1.100" in call_args[0]
@patch("esphome.components.api.client.run_logs")
def test_show_logs_api_static_ip_with_mqttip(
mock_run_logs: Mock,
mock_mqtt_get_ip: Mock,
) -> None:
"""Test show_logs with static IP and MQTTIP (issue #11260).
This tests the scenario where a device has manual_ip (static IP) configured
and MQTT is also configured. The devices list contains both the static IP
and "MQTTIP" magic string.
"""
setup_core(
config={
"logger": {},
CONF_API: {},
CONF_MQTT: {CONF_BROKER: "mqtt.local"},
},
platform=PLATFORM_ESP32,
)
mock_run_logs.return_value = 0
mock_mqtt_get_ip.return_value = ["192.168.2.50"]
args = MockArgs(username="user", password="pass", client_id="client")
# Simulates choose_upload_log_host returning static IP + MQTTIP
devices = ["192.168.1.100", "MQTTIP"]
result = show_logs(CORE.config, args, devices)
assert result == 0
# Verify MQTT was resolved
mock_mqtt_get_ip.assert_called_once_with(CORE.config, "user", "pass", "client")
# Verify run_logs was called with both IPs
mock_run_logs.assert_called_once_with(
CORE.config, ["192.168.1.100", "192.168.2.50"]
)
@patch("esphome.components.api.client.run_logs")
def test_show_logs_api_multiple_mqttip_resolves_once(
mock_run_logs: Mock,
mock_mqtt_get_ip: Mock,
) -> None:
"""Test that MQTT resolution only happens once for show_logs with multiple MQTT magic strings."""
setup_core(
config={
"logger": {},
CONF_API: {},
CONF_MQTT: {CONF_BROKER: "mqtt.local"},
},
platform=PLATFORM_ESP32,
)
mock_run_logs.return_value = 0
mock_mqtt_get_ip.return_value = ["192.168.2.50", "192.168.2.51"]
args = MockArgs(username="user", password="pass", client_id="client")
# Multiple MQTT magic strings in the list
devices = ["MQTTIP", "192.168.1.100", "MQTT"]
result = show_logs(CORE.config, args, devices)
assert result == 0
# Verify MQTT was only resolved once despite multiple MQTT magic strings
mock_mqtt_get_ip.assert_called_once_with(CORE.config, "user", "pass", "client")
# Verify run_logs was called with all unique IPs (MQTT strings replaced with IPs)
# Note: "MQTT" is a different magic string from "MQTTIP", but both trigger MQTT resolution
# The _resolve_network_devices helper filters out both after first resolution
mock_run_logs.assert_called_once_with(
CORE.config, ["192.168.2.50", "192.168.2.51", "192.168.1.100"]
)
def test_upload_program_ota_mqtt_timeout_fallback(
mock_mqtt_get_ip: Mock,
mock_run_ota: Mock,
tmp_path: Path,
) -> None:
"""Test upload_program falls back to other devices when MQTT times out."""
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path)
# MQTT times out
mock_mqtt_get_ip.side_effect = EsphomeError("Failed to find IP via MQTT")
mock_run_ota.return_value = (0, "192.168.1.100")
config = {
CONF_OTA: [
{
CONF_PLATFORM: CONF_ESPHOME,
CONF_PORT: 3232,
}
],
CONF_MQTT: {
CONF_BROKER: "mqtt.local",
},
}
args = MockArgs(username="user", password="pass", client_id="client")
# Static IP first, MQTTIP second
devices = ["192.168.1.100", "MQTTIP"]
exit_code, host = upload_program(config, args, devices)
# Should succeed using the static IP even though MQTT failed
assert exit_code == 0
assert host == "192.168.1.100"
# Verify MQTT was attempted
mock_mqtt_get_ip.assert_called_once_with(config, "user", "pass", "client")
# Verify espota2.run_ota was called with only the static IP (MQTT failed)
expected_firmware = (
tmp_path / ".esphome" / "build" / "test" / ".pioenvs" / "test" / "firmware.bin"
)
mock_run_ota.assert_called_once_with(
["192.168.1.100"], 3232, None, expected_firmware
)
@patch("esphome.components.api.client.run_logs")
def test_show_logs_api_mqtt_timeout_fallback(
mock_run_logs: Mock,
mock_mqtt_get_ip: Mock,
) -> None:
"""Test show_logs falls back to other devices when MQTT times out."""
setup_core(
config={
"logger": {},
CONF_API: {},
CONF_MQTT: {CONF_BROKER: "mqtt.local"},
},
platform=PLATFORM_ESP32,
)
mock_run_logs.return_value = 0
# MQTT times out
mock_mqtt_get_ip.side_effect = EsphomeError("Failed to find IP via MQTT")
args = MockArgs(username="user", password="pass", client_id="client")
# Static IP first, MQTTIP second
devices = ["192.168.1.100", "MQTTIP"]
result = show_logs(CORE.config, args, devices)
# Should succeed using the static IP even though MQTT failed
assert result == 0
# Verify MQTT was attempted
mock_mqtt_get_ip.assert_called_once_with(CORE.config, "user", "pass", "client")
# Verify run_logs was called with only the static IP (MQTT failed)
mock_run_logs.assert_called_once_with(CORE.config, ["192.168.1.100"])

View File

@@ -2,9 +2,12 @@ import glob
import logging
from pathlib import Path
from esphome import yaml_util
from esphome import config as config_module, yaml_util
from esphome.components import substitutions
from esphome.const import CONF_PACKAGES
from esphome.config_helpers import merge_config
from esphome.const import CONF_PACKAGES, CONF_SUBSTITUTIONS
from esphome.core import CORE
from esphome.util import OrderedDict
_LOGGER = logging.getLogger(__name__)
@@ -118,3 +121,200 @@ def test_substitutions_fixtures(fixture_path):
if DEV_MODE:
_LOGGER.error("Tests passed, but Dev mode is enabled.")
assert not DEV_MODE # make sure DEV_MODE is disabled after you are finished.
def test_substitutions_with_command_line_maintains_ordered_dict() -> None:
"""Test that substitutions remain an OrderedDict when command line substitutions are provided,
and that move_to_end() can be called successfully.
This is a regression test for https://github.com/esphome/esphome/issues/11182
where the config would become a regular dict and fail when move_to_end() was called.
"""
# Create an OrderedDict config with substitutions
config = OrderedDict()
config["esphome"] = {"name": "test"}
config[CONF_SUBSTITUTIONS] = {"var1": "value1", "var2": "value2"}
config["other_key"] = "other_value"
# Command line substitutions that should override
command_line_subs = {"var2": "override", "var3": "new_value"}
# Call do_substitution_pass with command line substitutions
substitutions.do_substitution_pass(config, command_line_subs)
# Verify that config is still an OrderedDict
assert isinstance(config, OrderedDict), "Config should remain an OrderedDict"
# Verify substitutions are at the beginning (move_to_end with last=False)
keys = list(config.keys())
assert keys[0] == CONF_SUBSTITUTIONS, "Substitutions should be first key"
# Verify substitutions were properly merged
assert config[CONF_SUBSTITUTIONS]["var1"] == "value1"
assert config[CONF_SUBSTITUTIONS]["var2"] == "override"
assert config[CONF_SUBSTITUTIONS]["var3"] == "new_value"
# Verify config[CONF_SUBSTITUTIONS] is also an OrderedDict
assert isinstance(config[CONF_SUBSTITUTIONS], OrderedDict), (
"Substitutions should be an OrderedDict"
)
def test_substitutions_without_command_line_maintains_ordered_dict() -> None:
"""Test that substitutions work correctly without command line substitutions."""
config = OrderedDict()
config["esphome"] = {"name": "test"}
config[CONF_SUBSTITUTIONS] = {"var1": "value1"}
config["other_key"] = "other_value"
# Call without command line substitutions
substitutions.do_substitution_pass(config, None)
# Verify that config is still an OrderedDict
assert isinstance(config, OrderedDict), "Config should remain an OrderedDict"
# Verify substitutions are at the beginning
keys = list(config.keys())
assert keys[0] == CONF_SUBSTITUTIONS, "Substitutions should be first key"
def test_substitutions_after_merge_config_maintains_ordered_dict() -> None:
"""Test that substitutions work after merge_config (packages scenario).
This is a regression test for https://github.com/esphome/esphome/issues/11182
where using packages would cause config to become a regular dict, breaking move_to_end().
"""
# Simulate what happens with packages - merge two OrderedDict configs
base_config = OrderedDict()
base_config["esphome"] = {"name": "base"}
base_config[CONF_SUBSTITUTIONS] = {"var1": "value1"}
package_config = OrderedDict()
package_config["sensor"] = [{"platform": "template"}]
package_config[CONF_SUBSTITUTIONS] = {"var2": "value2"}
# Merge configs (simulating package merge)
merged_config = merge_config(base_config, package_config)
# Verify merged config is still an OrderedDict
assert isinstance(merged_config, OrderedDict), (
"Merged config should be an OrderedDict"
)
# Now try to run substitution pass on the merged config
substitutions.do_substitution_pass(merged_config, None)
# Should not raise AttributeError
assert isinstance(merged_config, OrderedDict), (
"Config should still be OrderedDict after substitution pass"
)
keys = list(merged_config.keys())
assert keys[0] == CONF_SUBSTITUTIONS, "Substitutions should be first key"
def test_validate_config_with_command_line_substitutions_maintains_ordered_dict(
tmp_path,
) -> None:
"""Test that validate_config preserves OrderedDict when merging command-line substitutions.
This tests the code path in config.py where result[CONF_SUBSTITUTIONS] is set
using merge_dicts_ordered() with command-line substitutions provided.
"""
# Create a minimal valid config
test_config = OrderedDict()
test_config["esphome"] = {"name": "test_device", "platform": "ESP32"}
test_config[CONF_SUBSTITUTIONS] = OrderedDict({"var1": "value1", "var2": "value2"})
test_config["esp32"] = {"board": "esp32dev"}
# Command line substitutions that should override
command_line_subs = {"var2": "override", "var3": "new_value"}
# Set up CORE for the test with a proper Path object
test_yaml = tmp_path / "test.yaml"
test_yaml.write_text("# test config")
CORE.config_path = test_yaml
# Call validate_config with command line substitutions
result = config_module.validate_config(test_config, command_line_subs)
# Verify that result[CONF_SUBSTITUTIONS] is an OrderedDict
assert isinstance(result.get(CONF_SUBSTITUTIONS), OrderedDict), (
"Result substitutions should be an OrderedDict"
)
# Verify substitutions were properly merged
assert result[CONF_SUBSTITUTIONS]["var1"] == "value1"
assert result[CONF_SUBSTITUTIONS]["var2"] == "override"
assert result[CONF_SUBSTITUTIONS]["var3"] == "new_value"
def test_validate_config_without_command_line_substitutions_maintains_ordered_dict(
tmp_path,
) -> None:
"""Test that validate_config preserves OrderedDict without command-line substitutions.
This tests the code path in config.py where result[CONF_SUBSTITUTIONS] is set
using merge_dicts_ordered() when command_line_substitutions is None.
"""
# Create a minimal valid config
test_config = OrderedDict()
test_config["esphome"] = {"name": "test_device", "platform": "ESP32"}
test_config[CONF_SUBSTITUTIONS] = OrderedDict({"var1": "value1", "var2": "value2"})
test_config["esp32"] = {"board": "esp32dev"}
# Set up CORE for the test with a proper Path object
test_yaml = tmp_path / "test.yaml"
test_yaml.write_text("# test config")
CORE.config_path = test_yaml
# Call validate_config without command line substitutions
result = config_module.validate_config(test_config, None)
# Verify that result[CONF_SUBSTITUTIONS] is an OrderedDict
assert isinstance(result.get(CONF_SUBSTITUTIONS), OrderedDict), (
"Result substitutions should be an OrderedDict"
)
# Verify substitutions are unchanged
assert result[CONF_SUBSTITUTIONS]["var1"] == "value1"
assert result[CONF_SUBSTITUTIONS]["var2"] == "value2"
def test_merge_config_preserves_ordered_dict() -> None:
"""Test that merge_config preserves OrderedDict type.
This is a regression test to ensure merge_config doesn't lose OrderedDict type
when merging configs, which causes AttributeError on move_to_end().
"""
# Test OrderedDict + dict = OrderedDict
od = OrderedDict([("a", 1), ("b", 2)])
d = {"b": 20, "c": 3}
result = merge_config(od, d)
assert isinstance(result, OrderedDict), (
"OrderedDict + dict should return OrderedDict"
)
# Test dict + OrderedDict = OrderedDict
d = {"a": 1, "b": 2}
od = OrderedDict([("b", 20), ("c", 3)])
result = merge_config(d, od)
assert isinstance(result, OrderedDict), (
"dict + OrderedDict should return OrderedDict"
)
# Test OrderedDict + OrderedDict = OrderedDict
od1 = OrderedDict([("a", 1), ("b", 2)])
od2 = OrderedDict([("b", 20), ("c", 3)])
result = merge_config(od1, od2)
assert isinstance(result, OrderedDict), (
"OrderedDict + OrderedDict should return OrderedDict"
)
# Test that dict + dict still returns regular dict (no unnecessary conversion)
d1 = {"a": 1, "b": 2}
d2 = {"b": 20, "c": 3}
result = merge_config(d1, d2)
assert isinstance(result, dict), "dict + dict should return dict"
assert not isinstance(result, OrderedDict), (
"dict + dict should not return OrderedDict"
)