mirror of
https://github.com/esphome/esphome.git
synced 2025-11-03 08:31:47 +00:00
Compare commits
26 Commits
dependabot
...
2025.10.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6a478b9070 | ||
|
|
a32a1d11fb | ||
|
|
daeb8ef88c | ||
|
|
febee437d6 | ||
|
|
de2f475dbd | ||
|
|
ebc0f5f7c9 | ||
|
|
87ca8784ef | ||
|
|
a186c1062f | ||
|
|
ea38237f29 | ||
|
|
6aff1394ad | ||
|
|
0e34d1b64d | ||
|
|
1483cee0fb | ||
|
|
8c1bd2fd85 | ||
|
|
ea609dc0f6 | ||
|
|
913095f6be | ||
|
|
bb24ad4a30 | ||
|
|
0d612fecfc | ||
|
|
9c235b4140 | ||
|
|
070b0882b8 | ||
|
|
7f1173fcba | ||
|
|
a75ccf841c | ||
|
|
56eb605ec9 | ||
|
|
2c4818de00 | ||
|
|
2b94de8732 | ||
|
|
f71aed3a5c | ||
|
|
353e097085 |
2
Doxyfile
2
Doxyfile
@@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome
|
|||||||
# could be handy for archiving the generated documentation or if some version
|
# could be handy for archiving the generated documentation or if some version
|
||||||
# control system is used.
|
# control system is used.
|
||||||
|
|
||||||
PROJECT_NUMBER = 2025.10.0
|
PROJECT_NUMBER = 2025.10.3
|
||||||
|
|
||||||
# Using the PROJECT_BRIEF tag one can provide an optional one line description
|
# Using the PROJECT_BRIEF tag one can provide an optional one line description
|
||||||
# for a project that appears at the top of each page and should give viewer a
|
# for a project that appears at the top of each page and should give viewer a
|
||||||
|
|||||||
@@ -117,6 +117,17 @@ class Purpose(StrEnum):
|
|||||||
LOGGING = "logging"
|
LOGGING = "logging"
|
||||||
|
|
||||||
|
|
||||||
|
class PortType(StrEnum):
|
||||||
|
SERIAL = "SERIAL"
|
||||||
|
NETWORK = "NETWORK"
|
||||||
|
MQTT = "MQTT"
|
||||||
|
MQTTIP = "MQTTIP"
|
||||||
|
|
||||||
|
|
||||||
|
# Magic MQTT port types that require special handling
|
||||||
|
_MQTT_PORT_TYPES = frozenset({PortType.MQTT, PortType.MQTTIP})
|
||||||
|
|
||||||
|
|
||||||
def _resolve_with_cache(address: str, purpose: Purpose) -> list[str]:
|
def _resolve_with_cache(address: str, purpose: Purpose) -> list[str]:
|
||||||
"""Resolve an address using cache if available, otherwise return the address itself."""
|
"""Resolve an address using cache if available, otherwise return the address itself."""
|
||||||
if CORE.address_cache and (cached := CORE.address_cache.get_addresses(address)):
|
if CORE.address_cache and (cached := CORE.address_cache.get_addresses(address)):
|
||||||
@@ -174,7 +185,9 @@ def choose_upload_log_host(
|
|||||||
else:
|
else:
|
||||||
resolved.append(device)
|
resolved.append(device)
|
||||||
if not resolved:
|
if not resolved:
|
||||||
_LOGGER.error("All specified devices: %s could not be resolved.", defaults)
|
raise EsphomeError(
|
||||||
|
f"All specified devices {defaults} could not be resolved. Is the device connected to the network?"
|
||||||
|
)
|
||||||
return resolved
|
return resolved
|
||||||
|
|
||||||
# No devices specified, show interactive chooser
|
# No devices specified, show interactive chooser
|
||||||
@@ -280,16 +293,67 @@ def mqtt_get_ip(config: ConfigType, username: str, password: str, client_id: str
|
|||||||
return mqtt.get_esphome_device_ip(config, username, password, client_id)
|
return mqtt.get_esphome_device_ip(config, username, password, client_id)
|
||||||
|
|
||||||
|
|
||||||
_PORT_TO_PORT_TYPE = {
|
def _resolve_network_devices(
|
||||||
"MQTT": "MQTT",
|
devices: list[str], config: ConfigType, args: ArgsProtocol
|
||||||
"MQTTIP": "MQTTIP",
|
) -> list[str]:
|
||||||
}
|
"""Resolve device list, converting MQTT magic strings to actual IP addresses.
|
||||||
|
|
||||||
|
This function filters the devices list to:
|
||||||
|
- Replace MQTT/MQTTIP magic strings with actual IP addresses via MQTT lookup
|
||||||
|
- Deduplicate addresses while preserving order
|
||||||
|
- Only resolve MQTT once even if multiple MQTT strings are present
|
||||||
|
- If MQTT resolution fails, log a warning and continue with other devices
|
||||||
|
|
||||||
|
Args:
|
||||||
|
devices: List of device identifiers (IPs, hostnames, or magic strings)
|
||||||
|
config: ESPHome configuration
|
||||||
|
args: Command-line arguments containing MQTT credentials
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of network addresses suitable for connection attempts
|
||||||
|
"""
|
||||||
|
network_devices: list[str] = []
|
||||||
|
mqtt_resolved: bool = False
|
||||||
|
|
||||||
|
for device in devices:
|
||||||
|
port_type = get_port_type(device)
|
||||||
|
if port_type in _MQTT_PORT_TYPES:
|
||||||
|
# Only resolve MQTT once, even if multiple MQTT entries
|
||||||
|
if not mqtt_resolved:
|
||||||
|
try:
|
||||||
|
mqtt_ips = mqtt_get_ip(
|
||||||
|
config, args.username, args.password, args.client_id
|
||||||
|
)
|
||||||
|
network_devices.extend(mqtt_ips)
|
||||||
|
except EsphomeError as err:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"MQTT IP discovery failed (%s), will try other devices if available",
|
||||||
|
err,
|
||||||
|
)
|
||||||
|
mqtt_resolved = True
|
||||||
|
elif device not in network_devices:
|
||||||
|
# Regular network address or IP - add if not already present
|
||||||
|
network_devices.append(device)
|
||||||
|
|
||||||
|
return network_devices
|
||||||
|
|
||||||
|
|
||||||
def get_port_type(port: str) -> str:
|
def get_port_type(port: str) -> PortType:
|
||||||
|
"""Determine the type of port/device identifier.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
PortType.SERIAL for serial ports (/dev/ttyUSB0, COM1, etc.)
|
||||||
|
PortType.MQTT for MQTT logging
|
||||||
|
PortType.MQTTIP for MQTT IP lookup
|
||||||
|
PortType.NETWORK for IP addresses, hostnames, or mDNS names
|
||||||
|
"""
|
||||||
if port.startswith("/") or port.startswith("COM"):
|
if port.startswith("/") or port.startswith("COM"):
|
||||||
return "SERIAL"
|
return PortType.SERIAL
|
||||||
return _PORT_TO_PORT_TYPE.get(port, "NETWORK")
|
if port == "MQTT":
|
||||||
|
return PortType.MQTT
|
||||||
|
if port == "MQTTIP":
|
||||||
|
return PortType.MQTTIP
|
||||||
|
return PortType.NETWORK
|
||||||
|
|
||||||
|
|
||||||
def run_miniterm(config: ConfigType, port: str, args) -> int:
|
def run_miniterm(config: ConfigType, port: str, args) -> int:
|
||||||
@@ -489,7 +553,7 @@ def upload_using_platformio(config: ConfigType, port: str):
|
|||||||
|
|
||||||
|
|
||||||
def check_permissions(port: str):
|
def check_permissions(port: str):
|
||||||
if os.name == "posix" and get_port_type(port) == "SERIAL":
|
if os.name == "posix" and get_port_type(port) == PortType.SERIAL:
|
||||||
# Check if we can open selected serial port
|
# Check if we can open selected serial port
|
||||||
if not os.access(port, os.F_OK):
|
if not os.access(port, os.F_OK):
|
||||||
raise EsphomeError(
|
raise EsphomeError(
|
||||||
@@ -517,7 +581,7 @@ def upload_program(
|
|||||||
except AttributeError:
|
except AttributeError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if get_port_type(host) == "SERIAL":
|
if get_port_type(host) == PortType.SERIAL:
|
||||||
check_permissions(host)
|
check_permissions(host)
|
||||||
|
|
||||||
exit_code = 1
|
exit_code = 1
|
||||||
@@ -544,17 +608,16 @@ def upload_program(
|
|||||||
from esphome import espota2
|
from esphome import espota2
|
||||||
|
|
||||||
remote_port = int(ota_conf[CONF_PORT])
|
remote_port = int(ota_conf[CONF_PORT])
|
||||||
password = ota_conf.get(CONF_PASSWORD, "")
|
password = ota_conf.get(CONF_PASSWORD)
|
||||||
if getattr(args, "file", None) is not None:
|
if getattr(args, "file", None) is not None:
|
||||||
binary = Path(args.file)
|
binary = Path(args.file)
|
||||||
else:
|
else:
|
||||||
binary = CORE.firmware_bin
|
binary = CORE.firmware_bin
|
||||||
|
|
||||||
# MQTT address resolution
|
# Resolve MQTT magic strings to actual IP addresses
|
||||||
if get_port_type(host) in ("MQTT", "MQTTIP"):
|
network_devices = _resolve_network_devices(devices, config, args)
|
||||||
devices = mqtt_get_ip(config, args.username, args.password, args.client_id)
|
|
||||||
|
|
||||||
return espota2.run_ota(devices, remote_port, password, binary)
|
return espota2.run_ota(network_devices, remote_port, password, binary)
|
||||||
|
|
||||||
|
|
||||||
def show_logs(config: ConfigType, args: ArgsProtocol, devices: list[str]) -> int | None:
|
def show_logs(config: ConfigType, args: ArgsProtocol, devices: list[str]) -> int | None:
|
||||||
@@ -569,33 +632,22 @@ def show_logs(config: ConfigType, args: ArgsProtocol, devices: list[str]) -> int
|
|||||||
raise EsphomeError("Logger is not configured!")
|
raise EsphomeError("Logger is not configured!")
|
||||||
|
|
||||||
port = devices[0]
|
port = devices[0]
|
||||||
|
port_type = get_port_type(port)
|
||||||
|
|
||||||
if get_port_type(port) == "SERIAL":
|
if port_type == PortType.SERIAL:
|
||||||
check_permissions(port)
|
check_permissions(port)
|
||||||
return run_miniterm(config, port, args)
|
return run_miniterm(config, port, args)
|
||||||
|
|
||||||
port_type = get_port_type(port)
|
|
||||||
|
|
||||||
# Check if we should use API for logging
|
# Check if we should use API for logging
|
||||||
if has_api():
|
# Resolve MQTT magic strings to actual IP addresses
|
||||||
addresses_to_use: list[str] | None = None
|
if has_api() and (
|
||||||
|
network_devices := _resolve_network_devices(devices, config, args)
|
||||||
|
):
|
||||||
|
from esphome.components.api.client import run_logs
|
||||||
|
|
||||||
if port_type == "NETWORK":
|
return run_logs(config, network_devices)
|
||||||
# Network addresses (IPs, mDNS names, or regular DNS hostnames) can be used
|
|
||||||
# The resolve_ip_address() function in helpers.py handles all types
|
|
||||||
addresses_to_use = devices
|
|
||||||
elif port_type in ("MQTT", "MQTTIP") and has_mqtt_ip_lookup():
|
|
||||||
# Use MQTT IP lookup for MQTT/MQTTIP types
|
|
||||||
addresses_to_use = mqtt_get_ip(
|
|
||||||
config, args.username, args.password, args.client_id
|
|
||||||
)
|
|
||||||
|
|
||||||
if addresses_to_use is not None:
|
if port_type in (PortType.NETWORK, PortType.MQTT) and has_mqtt_logging():
|
||||||
from esphome.components.api.client import run_logs
|
|
||||||
|
|
||||||
return run_logs(config, addresses_to_use)
|
|
||||||
|
|
||||||
if port_type in ("NETWORK", "MQTT") and has_mqtt_logging():
|
|
||||||
from esphome import mqtt
|
from esphome import mqtt
|
||||||
|
|
||||||
return mqtt.show_logs(
|
return mqtt.show_logs(
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ CONFIG_SCHEMA = cv.All(
|
|||||||
cv.Schema(
|
cv.Schema(
|
||||||
{
|
{
|
||||||
cv.GenerateID(): cv.declare_id(BME680BSECComponent),
|
cv.GenerateID(): cv.declare_id(BME680BSECComponent),
|
||||||
cv.Optional(CONF_TEMPERATURE_OFFSET, default=0): cv.temperature,
|
cv.Optional(CONF_TEMPERATURE_OFFSET, default=0): cv.temperature_delta,
|
||||||
cv.Optional(CONF_IAQ_MODE, default="STATIC"): cv.enum(
|
cv.Optional(CONF_IAQ_MODE, default="STATIC"): cv.enum(
|
||||||
IAQ_MODE_OPTIONS, upper=True
|
IAQ_MODE_OPTIONS, upper=True
|
||||||
),
|
),
|
||||||
|
|||||||
@@ -139,7 +139,7 @@ CONFIG_SCHEMA_BASE = (
|
|||||||
cv.Optional(CONF_SUPPLY_VOLTAGE, default="3.3V"): cv.enum(
|
cv.Optional(CONF_SUPPLY_VOLTAGE, default="3.3V"): cv.enum(
|
||||||
VOLTAGE_OPTIONS, upper=True
|
VOLTAGE_OPTIONS, upper=True
|
||||||
),
|
),
|
||||||
cv.Optional(CONF_TEMPERATURE_OFFSET, default=0): cv.temperature,
|
cv.Optional(CONF_TEMPERATURE_OFFSET, default=0): cv.temperature_delta,
|
||||||
cv.Optional(
|
cv.Optional(
|
||||||
CONF_STATE_SAVE_INTERVAL, default="6hours"
|
CONF_STATE_SAVE_INTERVAL, default="6hours"
|
||||||
): cv.positive_time_period_minutes,
|
): cv.positive_time_period_minutes,
|
||||||
|
|||||||
@@ -30,14 +30,12 @@ class DateTimeBase : public EntityBase {
|
|||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
#ifdef USE_TIME
|
|
||||||
class DateTimeStateTrigger : public Trigger<ESPTime> {
|
class DateTimeStateTrigger : public Trigger<ESPTime> {
|
||||||
public:
|
public:
|
||||||
explicit DateTimeStateTrigger(DateTimeBase *parent) {
|
explicit DateTimeStateTrigger(DateTimeBase *parent) {
|
||||||
parent->add_on_state_callback([this, parent]() { this->trigger(parent->state_as_esptime()); });
|
parent->add_on_state_callback([this, parent]() { this->trigger(parent->state_as_esptime()); });
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
#endif
|
|
||||||
|
|
||||||
} // namespace datetime
|
} // namespace datetime
|
||||||
} // namespace esphome
|
} // namespace esphome
|
||||||
|
|||||||
@@ -775,7 +775,7 @@ void Display::test_card() {
|
|||||||
int shift_y = (h - image_h) / 2;
|
int shift_y = (h - image_h) / 2;
|
||||||
int line_w = (image_w - 6) / 6;
|
int line_w = (image_w - 6) / 6;
|
||||||
int image_c = image_w / 2;
|
int image_c = image_w / 2;
|
||||||
for (auto i = 0; i <= image_h; i++) {
|
for (auto i = 0; i != image_h; i++) {
|
||||||
int c = esp_scale(i, image_h);
|
int c = esp_scale(i, image_h);
|
||||||
this->horizontal_line(shift_x + 0, shift_y + i, line_w, r.fade_to_white(c));
|
this->horizontal_line(shift_x + 0, shift_y + i, line_w, r.fade_to_white(c));
|
||||||
this->horizontal_line(shift_x + line_w, shift_y + i, line_w, r.fade_to_black(c)); //
|
this->horizontal_line(shift_x + line_w, shift_y + i, line_w, r.fade_to_black(c)); //
|
||||||
@@ -809,8 +809,11 @@ void Display::test_card() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this->rectangle(0, 0, w, h, Color(127, 0, 127));
|
|
||||||
this->filled_rectangle(0, 0, 10, 10, Color(255, 0, 255));
|
this->filled_rectangle(0, 0, 10, 10, Color(255, 0, 255));
|
||||||
|
this->filled_rectangle(w - 10, 0, 10, 10, Color(255, 0, 255));
|
||||||
|
this->filled_rectangle(0, h - 10, 10, 10, Color(255, 0, 255));
|
||||||
|
this->filled_rectangle(w - 10, h - 10, 10, 10, Color(255, 0, 255));
|
||||||
|
this->rectangle(0, 0, w, h, Color(255, 255, 255));
|
||||||
this->stop_poller();
|
this->stop_poller();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -790,6 +790,7 @@ async def to_code(config):
|
|||||||
add_idf_sdkconfig_option("CONFIG_AUTOSTART_ARDUINO", True)
|
add_idf_sdkconfig_option("CONFIG_AUTOSTART_ARDUINO", True)
|
||||||
add_idf_sdkconfig_option("CONFIG_MBEDTLS_PSK_MODES", True)
|
add_idf_sdkconfig_option("CONFIG_MBEDTLS_PSK_MODES", True)
|
||||||
add_idf_sdkconfig_option("CONFIG_MBEDTLS_CERTIFICATE_BUNDLE", True)
|
add_idf_sdkconfig_option("CONFIG_MBEDTLS_CERTIFICATE_BUNDLE", True)
|
||||||
|
add_idf_sdkconfig_option("CONFIG_ESP_PHY_REDUCE_TX_POWER", True)
|
||||||
|
|
||||||
cg.add_build_flag("-Wno-nonnull-compare")
|
cg.add_build_flag("-Wno-nonnull-compare")
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@
|
|||||||
#include <freertos/FreeRTOS.h>
|
#include <freertos/FreeRTOS.h>
|
||||||
#include <freertos/task.h>
|
#include <freertos/task.h>
|
||||||
#include <esp_idf_version.h>
|
#include <esp_idf_version.h>
|
||||||
|
#include <esp_ota_ops.h>
|
||||||
#include <esp_task_wdt.h>
|
#include <esp_task_wdt.h>
|
||||||
#include <esp_timer.h>
|
#include <esp_timer.h>
|
||||||
#include <soc/rtc.h>
|
#include <soc/rtc.h>
|
||||||
@@ -52,6 +53,16 @@ void arch_init() {
|
|||||||
disableCore1WDT();
|
disableCore1WDT();
|
||||||
#endif
|
#endif
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
// If the bootloader was compiled with CONFIG_BOOTLOADER_APP_ROLLBACK_ENABLE the current
|
||||||
|
// partition will get rolled back unless it is marked as valid.
|
||||||
|
esp_ota_img_states_t state;
|
||||||
|
const esp_partition_t *running = esp_ota_get_running_partition();
|
||||||
|
if (esp_ota_get_state_partition(running, &state) == ESP_OK) {
|
||||||
|
if (state == ESP_OTA_IMG_PENDING_VERIFY) {
|
||||||
|
esp_ota_mark_app_valid_cancel_rollback();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
void IRAM_ATTR HOT arch_feed_wdt() { esp_task_wdt_reset(); }
|
void IRAM_ATTR HOT arch_feed_wdt() { esp_task_wdt_reset(); }
|
||||||
|
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ from esphome.const import (
|
|||||||
from esphome.core import CORE, coroutine_with_priority
|
from esphome.core import CORE, coroutine_with_priority
|
||||||
from esphome.coroutine import CoroPriority
|
from esphome.coroutine import CoroPriority
|
||||||
import esphome.final_validate as fv
|
import esphome.final_validate as fv
|
||||||
|
from esphome.types import ConfigType
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -136,11 +137,12 @@ FINAL_VALIDATE_SCHEMA = ota_esphome_final_validate
|
|||||||
|
|
||||||
|
|
||||||
@coroutine_with_priority(CoroPriority.OTA_UPDATES)
|
@coroutine_with_priority(CoroPriority.OTA_UPDATES)
|
||||||
async def to_code(config):
|
async def to_code(config: ConfigType) -> None:
|
||||||
var = cg.new_Pvariable(config[CONF_ID])
|
var = cg.new_Pvariable(config[CONF_ID])
|
||||||
cg.add(var.set_port(config[CONF_PORT]))
|
cg.add(var.set_port(config[CONF_PORT]))
|
||||||
|
|
||||||
if CONF_PASSWORD in config:
|
# Password could be set to an empty string and we can assume that means no password
|
||||||
|
if config.get(CONF_PASSWORD):
|
||||||
cg.add(var.set_auth_password(config[CONF_PASSWORD]))
|
cg.add(var.set_auth_password(config[CONF_PASSWORD]))
|
||||||
cg.add_define("USE_OTA_PASSWORD")
|
cg.add_define("USE_OTA_PASSWORD")
|
||||||
# Only include hash algorithms when password is configured
|
# Only include hash algorithms when password is configured
|
||||||
|
|||||||
@@ -16,7 +16,8 @@ void HDC1080Component::setup() {
|
|||||||
|
|
||||||
// if configuration fails - there is a problem
|
// if configuration fails - there is a problem
|
||||||
if (this->write_register(HDC1080_CMD_CONFIGURATION, config, 2) != i2c::ERROR_OK) {
|
if (this->write_register(HDC1080_CMD_CONFIGURATION, config, 2) != i2c::ERROR_OK) {
|
||||||
this->mark_failed();
|
ESP_LOGW(TAG, "Failed to configure HDC1080");
|
||||||
|
this->status_set_warning();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,8 +9,8 @@ static const char *const TAG = "htu21d";
|
|||||||
|
|
||||||
static const uint8_t HTU21D_ADDRESS = 0x40;
|
static const uint8_t HTU21D_ADDRESS = 0x40;
|
||||||
static const uint8_t HTU21D_REGISTER_RESET = 0xFE;
|
static const uint8_t HTU21D_REGISTER_RESET = 0xFE;
|
||||||
static const uint8_t HTU21D_REGISTER_TEMPERATURE = 0xE3;
|
static const uint8_t HTU21D_REGISTER_TEMPERATURE = 0xF3;
|
||||||
static const uint8_t HTU21D_REGISTER_HUMIDITY = 0xE5;
|
static const uint8_t HTU21D_REGISTER_HUMIDITY = 0xF5;
|
||||||
static const uint8_t HTU21D_WRITERHT_REG_CMD = 0xE6; /**< Write RH/T User Register 1 */
|
static const uint8_t HTU21D_WRITERHT_REG_CMD = 0xE6; /**< Write RH/T User Register 1 */
|
||||||
static const uint8_t HTU21D_REGISTER_STATUS = 0xE7;
|
static const uint8_t HTU21D_REGISTER_STATUS = 0xE7;
|
||||||
static const uint8_t HTU21D_WRITEHEATER_REG_CMD = 0x51; /**< Write Heater Control Register */
|
static const uint8_t HTU21D_WRITEHEATER_REG_CMD = 0x51; /**< Write Heater Control Register */
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ from esphome.const import (
|
|||||||
CONF_BRIGHTNESS,
|
CONF_BRIGHTNESS,
|
||||||
CONF_COLOR_ORDER,
|
CONF_COLOR_ORDER,
|
||||||
CONF_DIMENSIONS,
|
CONF_DIMENSIONS,
|
||||||
|
CONF_DISABLED,
|
||||||
CONF_HEIGHT,
|
CONF_HEIGHT,
|
||||||
CONF_INIT_SEQUENCE,
|
CONF_INIT_SEQUENCE,
|
||||||
CONF_INVERT_COLORS,
|
CONF_INVERT_COLORS,
|
||||||
@@ -301,6 +302,8 @@ class DriverChip:
|
|||||||
Check if a rotation can be implemented in hardware using the MADCTL register.
|
Check if a rotation can be implemented in hardware using the MADCTL register.
|
||||||
A rotation of 180 is always possible if x and y mirroring are supported, 90 and 270 are possible if the model supports swapping X and Y.
|
A rotation of 180 is always possible if x and y mirroring are supported, 90 and 270 are possible if the model supports swapping X and Y.
|
||||||
"""
|
"""
|
||||||
|
if config.get(CONF_TRANSFORM) == CONF_DISABLED:
|
||||||
|
return False
|
||||||
transforms = self.transforms
|
transforms = self.transforms
|
||||||
rotation = config.get(CONF_ROTATION, 0)
|
rotation = config.get(CONF_ROTATION, 0)
|
||||||
if rotation == 0 or not transforms:
|
if rotation == 0 or not transforms:
|
||||||
@@ -358,26 +361,26 @@ class DriverChip:
|
|||||||
CONF_SWAP_XY: self.get_default(CONF_SWAP_XY),
|
CONF_SWAP_XY: self.get_default(CONF_SWAP_XY),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
# fill in defaults if not provided
|
if not isinstance(transform, dict):
|
||||||
mirror_x = transform.get(CONF_MIRROR_X, self.get_default(CONF_MIRROR_X))
|
# Presumably disabled
|
||||||
mirror_y = transform.get(CONF_MIRROR_Y, self.get_default(CONF_MIRROR_Y))
|
return {
|
||||||
swap_xy = transform.get(CONF_SWAP_XY, self.get_default(CONF_SWAP_XY))
|
CONF_MIRROR_X: False,
|
||||||
transform[CONF_MIRROR_X] = mirror_x
|
CONF_MIRROR_Y: False,
|
||||||
transform[CONF_MIRROR_Y] = mirror_y
|
CONF_SWAP_XY: False,
|
||||||
transform[CONF_SWAP_XY] = swap_xy
|
CONF_TRANSFORM: False,
|
||||||
|
}
|
||||||
# Can we use the MADCTL register to set the rotation?
|
# Can we use the MADCTL register to set the rotation?
|
||||||
if can_transform and CONF_TRANSFORM not in config:
|
if can_transform and CONF_TRANSFORM not in config:
|
||||||
rotation = config[CONF_ROTATION]
|
rotation = config[CONF_ROTATION]
|
||||||
if rotation == 180:
|
if rotation == 180:
|
||||||
transform[CONF_MIRROR_X] = not mirror_x
|
transform[CONF_MIRROR_X] = not transform[CONF_MIRROR_X]
|
||||||
transform[CONF_MIRROR_Y] = not mirror_y
|
transform[CONF_MIRROR_Y] = not transform[CONF_MIRROR_Y]
|
||||||
elif rotation == 90:
|
elif rotation == 90:
|
||||||
transform[CONF_SWAP_XY] = not swap_xy
|
transform[CONF_SWAP_XY] = not transform[CONF_SWAP_XY]
|
||||||
transform[CONF_MIRROR_X] = not mirror_x
|
transform[CONF_MIRROR_X] = not transform[CONF_MIRROR_X]
|
||||||
else:
|
else:
|
||||||
transform[CONF_SWAP_XY] = not swap_xy
|
transform[CONF_SWAP_XY] = not transform[CONF_SWAP_XY]
|
||||||
transform[CONF_MIRROR_Y] = not mirror_y
|
transform[CONF_MIRROR_Y] = not transform[CONF_MIRROR_Y]
|
||||||
transform[CONF_TRANSFORM] = True
|
transform[CONF_TRANSFORM] = True
|
||||||
return transform
|
return transform
|
||||||
|
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ from esphome.const import (
|
|||||||
CONF_DATA_RATE,
|
CONF_DATA_RATE,
|
||||||
CONF_DC_PIN,
|
CONF_DC_PIN,
|
||||||
CONF_DIMENSIONS,
|
CONF_DIMENSIONS,
|
||||||
|
CONF_DISABLED,
|
||||||
CONF_ENABLE_PIN,
|
CONF_ENABLE_PIN,
|
||||||
CONF_ID,
|
CONF_ID,
|
||||||
CONF_INIT_SEQUENCE,
|
CONF_INIT_SEQUENCE,
|
||||||
@@ -146,12 +147,15 @@ def swap_xy_schema(model):
|
|||||||
def model_schema(config):
|
def model_schema(config):
|
||||||
model = MODELS[config[CONF_MODEL]]
|
model = MODELS[config[CONF_MODEL]]
|
||||||
bus_mode = config[CONF_BUS_MODE]
|
bus_mode = config[CONF_BUS_MODE]
|
||||||
transform = cv.Schema(
|
transform = cv.Any(
|
||||||
{
|
cv.Schema(
|
||||||
cv.Required(CONF_MIRROR_X): cv.boolean,
|
{
|
||||||
cv.Required(CONF_MIRROR_Y): cv.boolean,
|
cv.Required(CONF_MIRROR_X): cv.boolean,
|
||||||
**swap_xy_schema(model),
|
cv.Required(CONF_MIRROR_Y): cv.boolean,
|
||||||
}
|
**swap_xy_schema(model),
|
||||||
|
}
|
||||||
|
),
|
||||||
|
cv.one_of(CONF_DISABLED, lower=True),
|
||||||
)
|
)
|
||||||
# CUSTOM model will need to provide a custom init sequence
|
# CUSTOM model will need to provide a custom init sequence
|
||||||
iseqconf = (
|
iseqconf = (
|
||||||
@@ -160,7 +164,11 @@ def model_schema(config):
|
|||||||
else cv.Optional(CONF_INIT_SEQUENCE)
|
else cv.Optional(CONF_INIT_SEQUENCE)
|
||||||
)
|
)
|
||||||
# Dimensions are optional if the model has a default width and the x-y transform is not overridden
|
# Dimensions are optional if the model has a default width and the x-y transform is not overridden
|
||||||
is_swapped = config.get(CONF_TRANSFORM, {}).get(CONF_SWAP_XY) is True
|
transform_config = config.get(CONF_TRANSFORM, {})
|
||||||
|
is_swapped = (
|
||||||
|
isinstance(transform_config, dict)
|
||||||
|
and transform_config.get(CONF_SWAP_XY, False) is True
|
||||||
|
)
|
||||||
cv_dimensions = (
|
cv_dimensions = (
|
||||||
cv.Optional if model.get_default(CONF_WIDTH) and not is_swapped else cv.Required
|
cv.Optional if model.get_default(CONF_WIDTH) and not is_swapped else cv.Required
|
||||||
)
|
)
|
||||||
@@ -192,9 +200,7 @@ def model_schema(config):
|
|||||||
.extend(
|
.extend(
|
||||||
{
|
{
|
||||||
cv.GenerateID(): cv.declare_id(MipiSpi),
|
cv.GenerateID(): cv.declare_id(MipiSpi),
|
||||||
cv_dimensions(CONF_DIMENSIONS): dimension_schema(
|
cv_dimensions(CONF_DIMENSIONS): dimension_schema(1),
|
||||||
model.get_default(CONF_DRAW_ROUNDING, 1)
|
|
||||||
),
|
|
||||||
model.option(CONF_ENABLE_PIN, cv.UNDEFINED): cv.ensure_list(
|
model.option(CONF_ENABLE_PIN, cv.UNDEFINED): cv.ensure_list(
|
||||||
pins.gpio_output_pin_schema
|
pins.gpio_output_pin_schema
|
||||||
),
|
),
|
||||||
@@ -400,6 +406,7 @@ def get_instance(config):
|
|||||||
offset_height,
|
offset_height,
|
||||||
DISPLAY_ROTATIONS[rotation],
|
DISPLAY_ROTATIONS[rotation],
|
||||||
frac,
|
frac,
|
||||||
|
config[CONF_DRAW_ROUNDING],
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
return MipiSpiBuffer, templateargs
|
return MipiSpiBuffer, templateargs
|
||||||
@@ -431,7 +438,6 @@ async def to_code(config):
|
|||||||
else:
|
else:
|
||||||
config[CONF_ROTATION] = 0
|
config[CONF_ROTATION] = 0
|
||||||
cg.add(var.set_model(config[CONF_MODEL]))
|
cg.add(var.set_model(config[CONF_MODEL]))
|
||||||
cg.add(var.set_draw_rounding(config[CONF_DRAW_ROUNDING]))
|
|
||||||
if enable_pin := config.get(CONF_ENABLE_PIN):
|
if enable_pin := config.get(CONF_ENABLE_PIN):
|
||||||
enable = [await cg.gpio_pin_expression(pin) for pin in enable_pin]
|
enable = [await cg.gpio_pin_expression(pin) for pin in enable_pin]
|
||||||
cg.add(var.set_enable_pins(enable))
|
cg.add(var.set_enable_pins(enable))
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ static constexpr uint8_t MADCTL_BGR = 0x08; // Bit 3 Blue-Green-Red pixel ord
|
|||||||
static constexpr uint8_t MADCTL_XFLIP = 0x02; // Mirror the display horizontally
|
static constexpr uint8_t MADCTL_XFLIP = 0x02; // Mirror the display horizontally
|
||||||
static constexpr uint8_t MADCTL_YFLIP = 0x01; // Mirror the display vertically
|
static constexpr uint8_t MADCTL_YFLIP = 0x01; // Mirror the display vertically
|
||||||
|
|
||||||
static const uint8_t DELAY_FLAG = 0xFF;
|
static constexpr uint8_t DELAY_FLAG = 0xFF;
|
||||||
// store a 16 bit value in a buffer, big endian.
|
// store a 16 bit value in a buffer, big endian.
|
||||||
static inline void put16_be(uint8_t *buf, uint16_t value) {
|
static inline void put16_be(uint8_t *buf, uint16_t value) {
|
||||||
buf[0] = value >> 8;
|
buf[0] = value >> 8;
|
||||||
@@ -79,7 +79,7 @@ class MipiSpi : public display::Display,
|
|||||||
public spi::SPIDevice<spi::BIT_ORDER_MSB_FIRST, spi::CLOCK_POLARITY_LOW, spi::CLOCK_PHASE_LEADING,
|
public spi::SPIDevice<spi::BIT_ORDER_MSB_FIRST, spi::CLOCK_POLARITY_LOW, spi::CLOCK_PHASE_LEADING,
|
||||||
spi::DATA_RATE_1MHZ> {
|
spi::DATA_RATE_1MHZ> {
|
||||||
public:
|
public:
|
||||||
MipiSpi() {}
|
MipiSpi() = default;
|
||||||
void update() override { this->stop_poller(); }
|
void update() override { this->stop_poller(); }
|
||||||
void draw_pixel_at(int x, int y, Color color) override {}
|
void draw_pixel_at(int x, int y, Color color) override {}
|
||||||
void set_model(const char *model) { this->model_ = model; }
|
void set_model(const char *model) { this->model_ = model; }
|
||||||
@@ -99,7 +99,6 @@ class MipiSpi : public display::Display,
|
|||||||
int get_width_internal() override { return WIDTH; }
|
int get_width_internal() override { return WIDTH; }
|
||||||
int get_height_internal() override { return HEIGHT; }
|
int get_height_internal() override { return HEIGHT; }
|
||||||
void set_init_sequence(const std::vector<uint8_t> &sequence) { this->init_sequence_ = sequence; }
|
void set_init_sequence(const std::vector<uint8_t> &sequence) { this->init_sequence_ = sequence; }
|
||||||
void set_draw_rounding(unsigned rounding) { this->draw_rounding_ = rounding; }
|
|
||||||
|
|
||||||
// reset the display, and write the init sequence
|
// reset the display, and write the init sequence
|
||||||
void setup() override {
|
void setup() override {
|
||||||
@@ -326,6 +325,7 @@ class MipiSpi : public display::Display,
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Writes a buffer to the display.
|
* Writes a buffer to the display.
|
||||||
|
* @param ptr The pointer to the pixel data
|
||||||
* @param w Width of each line in bytes
|
* @param w Width of each line in bytes
|
||||||
* @param h Height of the buffer in rows
|
* @param h Height of the buffer in rows
|
||||||
* @param pad Padding in bytes after each line
|
* @param pad Padding in bytes after each line
|
||||||
@@ -424,7 +424,6 @@ class MipiSpi : public display::Display,
|
|||||||
|
|
||||||
// other properties set by configuration
|
// other properties set by configuration
|
||||||
bool invert_colors_{};
|
bool invert_colors_{};
|
||||||
unsigned draw_rounding_{2};
|
|
||||||
optional<uint8_t> brightness_{};
|
optional<uint8_t> brightness_{};
|
||||||
const char *model_{"Unknown"};
|
const char *model_{"Unknown"};
|
||||||
std::vector<uint8_t> init_sequence_{};
|
std::vector<uint8_t> init_sequence_{};
|
||||||
@@ -444,12 +443,20 @@ class MipiSpi : public display::Display,
|
|||||||
* @tparam OFFSET_WIDTH The x-offset of the display in pixels
|
* @tparam OFFSET_WIDTH The x-offset of the display in pixels
|
||||||
* @tparam OFFSET_HEIGHT The y-offset of the display in pixels
|
* @tparam OFFSET_HEIGHT The y-offset of the display in pixels
|
||||||
* @tparam FRACTION The fraction of the display size to use for the buffer (e.g. 4 means a 1/4 buffer).
|
* @tparam FRACTION The fraction of the display size to use for the buffer (e.g. 4 means a 1/4 buffer).
|
||||||
|
* @tparam ROUNDING The alignment requirement for drawing operations (e.g. 2 means that x coordinates must be even)
|
||||||
*/
|
*/
|
||||||
template<typename BUFFERTYPE, PixelMode BUFFERPIXEL, bool IS_BIG_ENDIAN, PixelMode DISPLAYPIXEL, BusType BUS_TYPE,
|
template<typename BUFFERTYPE, PixelMode BUFFERPIXEL, bool IS_BIG_ENDIAN, PixelMode DISPLAYPIXEL, BusType BUS_TYPE,
|
||||||
int WIDTH, int HEIGHT, int OFFSET_WIDTH, int OFFSET_HEIGHT, display::DisplayRotation ROTATION, int FRACTION>
|
uint16_t WIDTH, uint16_t HEIGHT, int OFFSET_WIDTH, int OFFSET_HEIGHT, display::DisplayRotation ROTATION,
|
||||||
|
int FRACTION, unsigned ROUNDING>
|
||||||
class MipiSpiBuffer : public MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DISPLAYPIXEL, BUS_TYPE, WIDTH, HEIGHT,
|
class MipiSpiBuffer : public MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DISPLAYPIXEL, BUS_TYPE, WIDTH, HEIGHT,
|
||||||
OFFSET_WIDTH, OFFSET_HEIGHT> {
|
OFFSET_WIDTH, OFFSET_HEIGHT> {
|
||||||
public:
|
public:
|
||||||
|
// these values define the buffer size needed to write in accordance with the chip pixel alignment
|
||||||
|
// requirements. If the required rounding does not divide the width and height, we round up to the next multiple and
|
||||||
|
// ignore the extra columns and rows when drawing, but use them to write to the display.
|
||||||
|
static constexpr unsigned BUFFER_WIDTH = (WIDTH + ROUNDING - 1) / ROUNDING * ROUNDING;
|
||||||
|
static constexpr unsigned BUFFER_HEIGHT = (HEIGHT + ROUNDING - 1) / ROUNDING * ROUNDING;
|
||||||
|
|
||||||
MipiSpiBuffer() { this->rotation_ = ROTATION; }
|
MipiSpiBuffer() { this->rotation_ = ROTATION; }
|
||||||
|
|
||||||
void dump_config() override {
|
void dump_config() override {
|
||||||
@@ -461,15 +468,15 @@ class MipiSpiBuffer : public MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DIS
|
|||||||
" Buffer fraction: 1/%d\n"
|
" Buffer fraction: 1/%d\n"
|
||||||
" Buffer bytes: %zu\n"
|
" Buffer bytes: %zu\n"
|
||||||
" Draw rounding: %u",
|
" Draw rounding: %u",
|
||||||
this->rotation_, BUFFERPIXEL * 8, FRACTION, sizeof(BUFFERTYPE) * WIDTH * HEIGHT / FRACTION,
|
this->rotation_, BUFFERPIXEL * 8, FRACTION,
|
||||||
this->draw_rounding_);
|
sizeof(BUFFERTYPE) * BUFFER_WIDTH * BUFFER_HEIGHT / FRACTION, ROUNDING);
|
||||||
}
|
}
|
||||||
|
|
||||||
void setup() override {
|
void setup() override {
|
||||||
MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DISPLAYPIXEL, BUS_TYPE, WIDTH, HEIGHT, OFFSET_WIDTH,
|
MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DISPLAYPIXEL, BUS_TYPE, WIDTH, HEIGHT, OFFSET_WIDTH,
|
||||||
OFFSET_HEIGHT>::setup();
|
OFFSET_HEIGHT>::setup();
|
||||||
RAMAllocator<BUFFERTYPE> allocator{};
|
RAMAllocator<BUFFERTYPE> allocator{};
|
||||||
this->buffer_ = allocator.allocate(WIDTH * HEIGHT / FRACTION);
|
this->buffer_ = allocator.allocate(BUFFER_WIDTH * BUFFER_HEIGHT / FRACTION);
|
||||||
if (this->buffer_ == nullptr) {
|
if (this->buffer_ == nullptr) {
|
||||||
this->mark_failed("Buffer allocation failed");
|
this->mark_failed("Buffer allocation failed");
|
||||||
}
|
}
|
||||||
@@ -508,15 +515,14 @@ class MipiSpiBuffer : public MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DIS
|
|||||||
esph_log_v(TAG, "x_low %d, y_low %d, x_high %d, y_high %d", this->x_low_, this->y_low_, this->x_high_,
|
esph_log_v(TAG, "x_low %d, y_low %d, x_high %d, y_high %d", this->x_low_, this->y_low_, this->x_high_,
|
||||||
this->y_high_);
|
this->y_high_);
|
||||||
// Some chips require that the drawing window be aligned on certain boundaries
|
// Some chips require that the drawing window be aligned on certain boundaries
|
||||||
auto dr = this->draw_rounding_;
|
this->x_low_ = this->x_low_ / ROUNDING * ROUNDING;
|
||||||
this->x_low_ = this->x_low_ / dr * dr;
|
this->y_low_ = this->y_low_ / ROUNDING * ROUNDING;
|
||||||
this->y_low_ = this->y_low_ / dr * dr;
|
this->x_high_ = (this->x_high_ + ROUNDING) / ROUNDING * ROUNDING - 1;
|
||||||
this->x_high_ = (this->x_high_ + dr) / dr * dr - 1;
|
this->y_high_ = (this->y_high_ + ROUNDING) / ROUNDING * ROUNDING - 1;
|
||||||
this->y_high_ = (this->y_high_ + dr) / dr * dr - 1;
|
|
||||||
int w = this->x_high_ - this->x_low_ + 1;
|
int w = this->x_high_ - this->x_low_ + 1;
|
||||||
int h = this->y_high_ - this->y_low_ + 1;
|
int h = this->y_high_ - this->y_low_ + 1;
|
||||||
this->write_to_display_(this->x_low_, this->y_low_, w, h, this->buffer_, this->x_low_,
|
this->write_to_display_(this->x_low_, this->y_low_, w, h, this->buffer_, this->x_low_,
|
||||||
this->y_low_ - this->start_line_, WIDTH - w);
|
this->y_low_ - this->start_line_, BUFFER_WIDTH - w);
|
||||||
// invalidate watermarks
|
// invalidate watermarks
|
||||||
this->x_low_ = WIDTH;
|
this->x_low_ = WIDTH;
|
||||||
this->y_low_ = HEIGHT;
|
this->y_low_ = HEIGHT;
|
||||||
@@ -536,10 +542,10 @@ class MipiSpiBuffer : public MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DIS
|
|||||||
void draw_pixel_at(int x, int y, Color color) override {
|
void draw_pixel_at(int x, int y, Color color) override {
|
||||||
if (!this->get_clipping().inside(x, y))
|
if (!this->get_clipping().inside(x, y))
|
||||||
return;
|
return;
|
||||||
rotate_coordinates_(x, y);
|
rotate_coordinates(x, y);
|
||||||
if (x < 0 || x >= WIDTH || y < this->start_line_ || y >= this->end_line_)
|
if (x < 0 || x >= WIDTH || y < this->start_line_ || y >= this->end_line_)
|
||||||
return;
|
return;
|
||||||
this->buffer_[(y - this->start_line_) * WIDTH + x] = convert_color_(color);
|
this->buffer_[(y - this->start_line_) * BUFFER_WIDTH + x] = convert_color(color);
|
||||||
if (x < this->x_low_) {
|
if (x < this->x_low_) {
|
||||||
this->x_low_ = x;
|
this->x_low_ = x;
|
||||||
}
|
}
|
||||||
@@ -560,7 +566,7 @@ class MipiSpiBuffer : public MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DIS
|
|||||||
this->y_low_ = this->start_line_;
|
this->y_low_ = this->start_line_;
|
||||||
this->x_high_ = WIDTH - 1;
|
this->x_high_ = WIDTH - 1;
|
||||||
this->y_high_ = this->end_line_ - 1;
|
this->y_high_ = this->end_line_ - 1;
|
||||||
std::fill_n(this->buffer_, HEIGHT * WIDTH / FRACTION, convert_color_(color));
|
std::fill_n(this->buffer_, HEIGHT * BUFFER_WIDTH / FRACTION, convert_color(color));
|
||||||
}
|
}
|
||||||
|
|
||||||
int get_width() override {
|
int get_width() override {
|
||||||
@@ -577,7 +583,7 @@ class MipiSpiBuffer : public MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DIS
|
|||||||
|
|
||||||
protected:
|
protected:
|
||||||
// Rotate the coordinates to match the display orientation.
|
// Rotate the coordinates to match the display orientation.
|
||||||
void rotate_coordinates_(int &x, int &y) const {
|
static void rotate_coordinates(int &x, int &y) {
|
||||||
if constexpr (ROTATION == display::DISPLAY_ROTATION_180_DEGREES) {
|
if constexpr (ROTATION == display::DISPLAY_ROTATION_180_DEGREES) {
|
||||||
x = WIDTH - x - 1;
|
x = WIDTH - x - 1;
|
||||||
y = HEIGHT - y - 1;
|
y = HEIGHT - y - 1;
|
||||||
@@ -593,7 +599,7 @@ class MipiSpiBuffer : public MipiSpi<BUFFERTYPE, BUFFERPIXEL, IS_BIG_ENDIAN, DIS
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Convert a color to the buffer pixel format.
|
// Convert a color to the buffer pixel format.
|
||||||
BUFFERTYPE convert_color_(Color &color) const {
|
static BUFFERTYPE convert_color(const Color &color) {
|
||||||
if constexpr (BUFFERPIXEL == PIXEL_MODE_8) {
|
if constexpr (BUFFERPIXEL == PIXEL_MODE_8) {
|
||||||
return (color.red & 0xE0) | (color.g & 0xE0) >> 3 | color.b >> 6;
|
return (color.red & 0xE0) | (color.g & 0xE0) >> 3 | color.b >> 6;
|
||||||
} else if constexpr (BUFFERPIXEL == PIXEL_MODE_16) {
|
} else if constexpr (BUFFERPIXEL == PIXEL_MODE_16) {
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import esphome.config_validation as cv
|
|||||||
|
|
||||||
from .amoled import CO5300
|
from .amoled import CO5300
|
||||||
from .ili import ILI9488_A
|
from .ili import ILI9488_A
|
||||||
|
from .jc import AXS15231
|
||||||
|
|
||||||
DriverChip(
|
DriverChip(
|
||||||
"WAVESHARE-4-TFT",
|
"WAVESHARE-4-TFT",
|
||||||
@@ -152,3 +153,12 @@ CO5300.extend(
|
|||||||
cs_pin=12,
|
cs_pin=12,
|
||||||
reset_pin=39,
|
reset_pin=39,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
AXS15231.extend(
|
||||||
|
"WAVESHARE-ESP32-S3-TOUCH-LCD-3.49",
|
||||||
|
width=172,
|
||||||
|
height=640,
|
||||||
|
data_rate="80MHz",
|
||||||
|
cs_pin=9,
|
||||||
|
reset_pin=21,
|
||||||
|
)
|
||||||
|
|||||||
@@ -81,7 +81,7 @@ CONFIG_SCHEMA = (
|
|||||||
cv.int_range(min=0, max=0xFFFF, max_included=False),
|
cv.int_range(min=0, max=0xFFFF, max_included=False),
|
||||||
),
|
),
|
||||||
cv.Optional(CONF_AMBIENT_PRESSURE_COMPENSATION): cv.pressure,
|
cv.Optional(CONF_AMBIENT_PRESSURE_COMPENSATION): cv.pressure,
|
||||||
cv.Optional(CONF_TEMPERATURE_OFFSET, default="4°C"): cv.temperature,
|
cv.Optional(CONF_TEMPERATURE_OFFSET, default="4°C"): cv.temperature_delta,
|
||||||
cv.Optional(CONF_AMBIENT_PRESSURE_COMPENSATION_SOURCE): cv.use_id(
|
cv.Optional(CONF_AMBIENT_PRESSURE_COMPENSATION_SOURCE): cv.use_id(
|
||||||
sensor.Sensor
|
sensor.Sensor
|
||||||
),
|
),
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
from esphome import core
|
from esphome import core
|
||||||
from esphome.config_helpers import Extend, Remove, merge_config
|
from esphome.config_helpers import Extend, Remove, merge_config, merge_dicts_ordered
|
||||||
import esphome.config_validation as cv
|
import esphome.config_validation as cv
|
||||||
from esphome.const import CONF_SUBSTITUTIONS, VALID_SUBSTITUTIONS_CHARACTERS
|
from esphome.const import CONF_SUBSTITUTIONS, VALID_SUBSTITUTIONS_CHARACTERS
|
||||||
from esphome.yaml_util import ESPHomeDataBase, ESPLiteralValue, make_data_base
|
from esphome.yaml_util import ESPHomeDataBase, ESPLiteralValue, make_data_base
|
||||||
@@ -170,10 +170,10 @@ def do_substitution_pass(config, command_line_substitutions, ignore_missing=Fals
|
|||||||
return
|
return
|
||||||
|
|
||||||
# Merge substitutions in config, overriding with substitutions coming from command line:
|
# Merge substitutions in config, overriding with substitutions coming from command line:
|
||||||
substitutions = {
|
# Use merge_dicts_ordered to preserve OrderedDict type for move_to_end()
|
||||||
**config.get(CONF_SUBSTITUTIONS, {}),
|
substitutions = merge_dicts_ordered(
|
||||||
**(command_line_substitutions or {}),
|
config.get(CONF_SUBSTITUTIONS, {}), command_line_substitutions or {}
|
||||||
}
|
)
|
||||||
with cv.prepend_path("substitutions"):
|
with cv.prepend_path("substitutions"):
|
||||||
if not isinstance(substitutions, dict):
|
if not isinstance(substitutions, dict):
|
||||||
raise cv.Invalid(
|
raise cv.Invalid(
|
||||||
|
|||||||
@@ -56,6 +56,13 @@ uint32_t ESP8266UartComponent::get_config() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void ESP8266UartComponent::setup() {
|
void ESP8266UartComponent::setup() {
|
||||||
|
if (this->rx_pin_) {
|
||||||
|
this->rx_pin_->setup();
|
||||||
|
}
|
||||||
|
if (this->tx_pin_ && this->rx_pin_ != this->tx_pin_) {
|
||||||
|
this->tx_pin_->setup();
|
||||||
|
}
|
||||||
|
|
||||||
// Use Arduino HardwareSerial UARTs if all used pins match the ones
|
// Use Arduino HardwareSerial UARTs if all used pins match the ones
|
||||||
// preconfigured by the platform. For example if RX disabled but TX pin
|
// preconfigured by the platform. For example if RX disabled but TX pin
|
||||||
// is 1 we still want to use Serial.
|
// is 1 we still want to use Serial.
|
||||||
|
|||||||
@@ -6,6 +6,9 @@
|
|||||||
#include "esphome/core/defines.h"
|
#include "esphome/core/defines.h"
|
||||||
#include "esphome/core/helpers.h"
|
#include "esphome/core/helpers.h"
|
||||||
#include "esphome/core/log.h"
|
#include "esphome/core/log.h"
|
||||||
|
#include "esphome/core/gpio.h"
|
||||||
|
#include "driver/gpio.h"
|
||||||
|
#include "soc/gpio_num.h"
|
||||||
|
|
||||||
#ifdef USE_LOGGER
|
#ifdef USE_LOGGER
|
||||||
#include "esphome/components/logger/logger.h"
|
#include "esphome/components/logger/logger.h"
|
||||||
@@ -104,6 +107,13 @@ void IDFUARTComponent::load_settings(bool dump_config) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this->rx_pin_) {
|
||||||
|
this->rx_pin_->setup();
|
||||||
|
}
|
||||||
|
if (this->tx_pin_ && this->rx_pin_ != this->tx_pin_) {
|
||||||
|
this->tx_pin_->setup();
|
||||||
|
}
|
||||||
|
|
||||||
int8_t tx = this->tx_pin_ != nullptr ? this->tx_pin_->get_pin() : -1;
|
int8_t tx = this->tx_pin_ != nullptr ? this->tx_pin_->get_pin() : -1;
|
||||||
int8_t rx = this->rx_pin_ != nullptr ? this->rx_pin_->get_pin() : -1;
|
int8_t rx = this->rx_pin_ != nullptr ? this->rx_pin_->get_pin() : -1;
|
||||||
int8_t flow_control = this->flow_control_pin_ != nullptr ? this->flow_control_pin_->get_pin() : -1;
|
int8_t flow_control = this->flow_control_pin_ != nullptr ? this->flow_control_pin_->get_pin() : -1;
|
||||||
|
|||||||
@@ -46,6 +46,13 @@ uint16_t LibreTinyUARTComponent::get_config() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void LibreTinyUARTComponent::setup() {
|
void LibreTinyUARTComponent::setup() {
|
||||||
|
if (this->rx_pin_) {
|
||||||
|
this->rx_pin_->setup();
|
||||||
|
}
|
||||||
|
if (this->tx_pin_ && this->rx_pin_ != this->tx_pin_) {
|
||||||
|
this->tx_pin_->setup();
|
||||||
|
}
|
||||||
|
|
||||||
int8_t tx_pin = tx_pin_ == nullptr ? -1 : tx_pin_->get_pin();
|
int8_t tx_pin = tx_pin_ == nullptr ? -1 : tx_pin_->get_pin();
|
||||||
int8_t rx_pin = rx_pin_ == nullptr ? -1 : rx_pin_->get_pin();
|
int8_t rx_pin = rx_pin_ == nullptr ? -1 : rx_pin_->get_pin();
|
||||||
bool tx_inverted = tx_pin_ != nullptr && tx_pin_->is_inverted();
|
bool tx_inverted = tx_pin_ != nullptr && tx_pin_->is_inverted();
|
||||||
|
|||||||
@@ -52,6 +52,13 @@ uint16_t RP2040UartComponent::get_config() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void RP2040UartComponent::setup() {
|
void RP2040UartComponent::setup() {
|
||||||
|
if (this->rx_pin_) {
|
||||||
|
this->rx_pin_->setup();
|
||||||
|
}
|
||||||
|
if (this->tx_pin_ && this->rx_pin_ != this->tx_pin_) {
|
||||||
|
this->tx_pin_->setup();
|
||||||
|
}
|
||||||
|
|
||||||
uint16_t config = get_config();
|
uint16_t config = get_config();
|
||||||
|
|
||||||
constexpr uint32_t valid_tx_uart_0 = __bitset({0, 12, 16, 28});
|
constexpr uint32_t valid_tx_uart_0 = __bitset({0, 12, 16, 28});
|
||||||
|
|||||||
@@ -402,8 +402,8 @@ async def to_code(config):
|
|||||||
add_idf_sdkconfig_option("CONFIG_LWIP_DHCPS", False)
|
add_idf_sdkconfig_option("CONFIG_LWIP_DHCPS", False)
|
||||||
|
|
||||||
# Disable Enterprise WiFi support if no EAP is configured
|
# Disable Enterprise WiFi support if no EAP is configured
|
||||||
if CORE.is_esp32 and not has_eap:
|
if CORE.is_esp32:
|
||||||
add_idf_sdkconfig_option("CONFIG_ESP_WIFI_ENTERPRISE_SUPPORT", False)
|
add_idf_sdkconfig_option("CONFIG_ESP_WIFI_ENTERPRISE_SUPPORT", has_eap)
|
||||||
|
|
||||||
cg.add(var.set_reboot_timeout(config[CONF_REBOOT_TIMEOUT]))
|
cg.add(var.set_reboot_timeout(config[CONF_REBOOT_TIMEOUT]))
|
||||||
cg.add(var.set_power_save_mode(config[CONF_POWER_SAVE_MODE]))
|
cg.add(var.set_power_save_mode(config[CONF_POWER_SAVE_MODE]))
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ from typing import Any
|
|||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
|
||||||
from esphome import core, loader, pins, yaml_util
|
from esphome import core, loader, pins, yaml_util
|
||||||
from esphome.config_helpers import Extend, Remove
|
from esphome.config_helpers import Extend, Remove, merge_dicts_ordered
|
||||||
import esphome.config_validation as cv
|
import esphome.config_validation as cv
|
||||||
from esphome.const import (
|
from esphome.const import (
|
||||||
CONF_ESPHOME,
|
CONF_ESPHOME,
|
||||||
@@ -922,10 +922,9 @@ def validate_config(
|
|||||||
if CONF_SUBSTITUTIONS in config or command_line_substitutions:
|
if CONF_SUBSTITUTIONS in config or command_line_substitutions:
|
||||||
from esphome.components import substitutions
|
from esphome.components import substitutions
|
||||||
|
|
||||||
result[CONF_SUBSTITUTIONS] = {
|
result[CONF_SUBSTITUTIONS] = merge_dicts_ordered(
|
||||||
**(config.get(CONF_SUBSTITUTIONS) or {}),
|
config.get(CONF_SUBSTITUTIONS) or {}, command_line_substitutions
|
||||||
**command_line_substitutions,
|
)
|
||||||
}
|
|
||||||
result.add_output_path([CONF_SUBSTITUTIONS], CONF_SUBSTITUTIONS)
|
result.add_output_path([CONF_SUBSTITUTIONS], CONF_SUBSTITUTIONS)
|
||||||
try:
|
try:
|
||||||
substitutions.do_substitution_pass(config, command_line_substitutions)
|
substitutions.do_substitution_pass(config, command_line_substitutions)
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ from esphome.const import (
|
|||||||
PlatformFramework,
|
PlatformFramework,
|
||||||
)
|
)
|
||||||
from esphome.core import CORE
|
from esphome.core import CORE
|
||||||
|
from esphome.util import OrderedDict
|
||||||
|
|
||||||
# Pre-build lookup map from (platform, framework) tuples to PlatformFramework enum
|
# Pre-build lookup map from (platform, framework) tuples to PlatformFramework enum
|
||||||
_PLATFORM_FRAMEWORK_LOOKUP = {
|
_PLATFORM_FRAMEWORK_LOOKUP = {
|
||||||
@@ -17,6 +18,25 @@ _PLATFORM_FRAMEWORK_LOOKUP = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def merge_dicts_ordered(*dicts: dict) -> OrderedDict:
|
||||||
|
"""Merge multiple dicts into an OrderedDict, preserving key order.
|
||||||
|
|
||||||
|
This is a helper to ensure that dictionary merging preserves OrderedDict type,
|
||||||
|
which is important for operations like move_to_end().
|
||||||
|
|
||||||
|
Args:
|
||||||
|
*dicts: Variable number of dictionaries to merge (later dicts override earlier ones)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
OrderedDict with merged contents
|
||||||
|
"""
|
||||||
|
result = OrderedDict()
|
||||||
|
for d in dicts:
|
||||||
|
if d:
|
||||||
|
result.update(d)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
class Extend:
|
class Extend:
|
||||||
def __init__(self, value):
|
def __init__(self, value):
|
||||||
self.value = value
|
self.value = value
|
||||||
@@ -60,7 +80,11 @@ def merge_config(full_old, full_new):
|
|||||||
if isinstance(new, dict):
|
if isinstance(new, dict):
|
||||||
if not isinstance(old, dict):
|
if not isinstance(old, dict):
|
||||||
return new
|
return new
|
||||||
res = old.copy()
|
# Preserve OrderedDict type by copying to OrderedDict if either input is OrderedDict
|
||||||
|
if isinstance(old, OrderedDict) or isinstance(new, OrderedDict):
|
||||||
|
res = OrderedDict(old)
|
||||||
|
else:
|
||||||
|
res = old.copy()
|
||||||
for k, v in new.items():
|
for k, v in new.items():
|
||||||
if isinstance(v, Remove) and k in old:
|
if isinstance(v, Remove) and k in old:
|
||||||
del res[k]
|
del res[k]
|
||||||
|
|||||||
@@ -244,6 +244,20 @@ RESERVED_IDS = [
|
|||||||
"uart0",
|
"uart0",
|
||||||
"uart1",
|
"uart1",
|
||||||
"uart2",
|
"uart2",
|
||||||
|
# ESP32 ROM functions
|
||||||
|
"crc16_be",
|
||||||
|
"crc16_le",
|
||||||
|
"crc32_be",
|
||||||
|
"crc32_le",
|
||||||
|
"crc8_be",
|
||||||
|
"crc8_le",
|
||||||
|
"dbg_state",
|
||||||
|
"debug_timer",
|
||||||
|
"one_bits",
|
||||||
|
"recv_packet",
|
||||||
|
"send_packet",
|
||||||
|
"check_pos",
|
||||||
|
"software_reset",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ from enum import Enum
|
|||||||
|
|
||||||
from esphome.enum import StrEnum
|
from esphome.enum import StrEnum
|
||||||
|
|
||||||
__version__ = "2025.10.0"
|
__version__ = "2025.10.3"
|
||||||
|
|
||||||
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
|
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
|
||||||
VALID_SUBSTITUTIONS_CHARACTERS = (
|
VALID_SUBSTITUTIONS_CHARACTERS = (
|
||||||
@@ -696,6 +696,7 @@ CONF_OPEN_DRAIN = "open_drain"
|
|||||||
CONF_OPEN_DRAIN_INTERRUPT = "open_drain_interrupt"
|
CONF_OPEN_DRAIN_INTERRUPT = "open_drain_interrupt"
|
||||||
CONF_OPEN_DURATION = "open_duration"
|
CONF_OPEN_DURATION = "open_duration"
|
||||||
CONF_OPEN_ENDSTOP = "open_endstop"
|
CONF_OPEN_ENDSTOP = "open_endstop"
|
||||||
|
CONF_OPENTHREAD = "openthread"
|
||||||
CONF_OPERATION = "operation"
|
CONF_OPERATION = "operation"
|
||||||
CONF_OPTIMISTIC = "optimistic"
|
CONF_OPTIMISTIC = "optimistic"
|
||||||
CONF_OPTION = "option"
|
CONF_OPTION = "option"
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ from esphome.const import (
|
|||||||
CONF_COMMENT,
|
CONF_COMMENT,
|
||||||
CONF_ESPHOME,
|
CONF_ESPHOME,
|
||||||
CONF_ETHERNET,
|
CONF_ETHERNET,
|
||||||
|
CONF_OPENTHREAD,
|
||||||
CONF_PORT,
|
CONF_PORT,
|
||||||
CONF_USE_ADDRESS,
|
CONF_USE_ADDRESS,
|
||||||
CONF_WEB_SERVER,
|
CONF_WEB_SERVER,
|
||||||
@@ -641,6 +642,9 @@ class EsphomeCore:
|
|||||||
if CONF_ETHERNET in self.config:
|
if CONF_ETHERNET in self.config:
|
||||||
return self.config[CONF_ETHERNET][CONF_USE_ADDRESS]
|
return self.config[CONF_ETHERNET][CONF_USE_ADDRESS]
|
||||||
|
|
||||||
|
if CONF_OPENTHREAD in self.config:
|
||||||
|
return f"{self.name}.local"
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
|||||||
@@ -10,6 +10,10 @@ from esphome.helpers import get_bool_env
|
|||||||
|
|
||||||
from .util.password import password_hash
|
from .util.password import password_hash
|
||||||
|
|
||||||
|
# Sentinel file name used for CORE.config_path when dashboard initializes.
|
||||||
|
# This ensures .parent returns the config directory instead of root.
|
||||||
|
_DASHBOARD_SENTINEL_FILE = "___DASHBOARD_SENTINEL___.yaml"
|
||||||
|
|
||||||
|
|
||||||
class DashboardSettings:
|
class DashboardSettings:
|
||||||
"""Settings for the dashboard."""
|
"""Settings for the dashboard."""
|
||||||
@@ -48,7 +52,12 @@ class DashboardSettings:
|
|||||||
self.config_dir = Path(args.configuration)
|
self.config_dir = Path(args.configuration)
|
||||||
self.absolute_config_dir = self.config_dir.resolve()
|
self.absolute_config_dir = self.config_dir.resolve()
|
||||||
self.verbose = args.verbose
|
self.verbose = args.verbose
|
||||||
CORE.config_path = self.config_dir / "."
|
# Set to a sentinel file so .parent gives us the config directory.
|
||||||
|
# Previously this was `os.path.join(self.config_dir, ".")` which worked because
|
||||||
|
# os.path.dirname("/config/.") returns "/config", but Path("/config/.").parent
|
||||||
|
# normalizes to Path("/config") first, then .parent returns Path("/"), breaking
|
||||||
|
# secret resolution. Using a sentinel file ensures .parent gives the correct directory.
|
||||||
|
CORE.config_path = self.config_dir / _DASHBOARD_SENTINEL_FILE
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def relative_url(self) -> str:
|
def relative_url(self) -> str:
|
||||||
|
|||||||
@@ -1058,7 +1058,8 @@ class DownloadBinaryRequestHandler(BaseHandler):
|
|||||||
"download",
|
"download",
|
||||||
f"{storage_json.name}-{file_name}",
|
f"{storage_json.name}-{file_name}",
|
||||||
)
|
)
|
||||||
path = storage_json.firmware_bin_path.with_name(file_name)
|
|
||||||
|
path = storage_json.firmware_bin_path.parent.joinpath(file_name)
|
||||||
|
|
||||||
if not path.is_file():
|
if not path.is_file():
|
||||||
args = ["esphome", "idedata", settings.rel_path(configuration)]
|
args = ["esphome", "idedata", settings.rel_path(configuration)]
|
||||||
|
|||||||
@@ -242,7 +242,7 @@ def send_check(
|
|||||||
|
|
||||||
|
|
||||||
def perform_ota(
|
def perform_ota(
|
||||||
sock: socket.socket, password: str, file_handle: io.IOBase, filename: Path
|
sock: socket.socket, password: str | None, file_handle: io.IOBase, filename: Path
|
||||||
) -> None:
|
) -> None:
|
||||||
file_contents = file_handle.read()
|
file_contents = file_handle.read()
|
||||||
file_size = len(file_contents)
|
file_size = len(file_contents)
|
||||||
@@ -278,13 +278,13 @@ def perform_ota(
|
|||||||
|
|
||||||
def perform_auth(
|
def perform_auth(
|
||||||
sock: socket.socket,
|
sock: socket.socket,
|
||||||
password: str,
|
password: str | None,
|
||||||
hash_func: Callable[..., Any],
|
hash_func: Callable[..., Any],
|
||||||
nonce_size: int,
|
nonce_size: int,
|
||||||
hash_name: str,
|
hash_name: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Perform challenge-response authentication using specified hash algorithm."""
|
"""Perform challenge-response authentication using specified hash algorithm."""
|
||||||
if not password:
|
if password is None:
|
||||||
raise OTAError("ESP requests password, but no password given!")
|
raise OTAError("ESP requests password, but no password given!")
|
||||||
|
|
||||||
nonce_bytes = receive_exactly(
|
nonce_bytes = receive_exactly(
|
||||||
@@ -385,7 +385,7 @@ def perform_ota(
|
|||||||
|
|
||||||
|
|
||||||
def run_ota_impl_(
|
def run_ota_impl_(
|
||||||
remote_host: str | list[str], remote_port: int, password: str, filename: Path
|
remote_host: str | list[str], remote_port: int, password: str | None, filename: Path
|
||||||
) -> tuple[int, str | None]:
|
) -> tuple[int, str | None]:
|
||||||
from esphome.core import CORE
|
from esphome.core import CORE
|
||||||
|
|
||||||
@@ -436,7 +436,7 @@ def run_ota_impl_(
|
|||||||
|
|
||||||
|
|
||||||
def run_ota(
|
def run_ota(
|
||||||
remote_host: str | list[str], remote_port: int, password: str, filename: Path
|
remote_host: str | list[str], remote_port: int, password: str | None, filename: Path
|
||||||
) -> tuple[int, str | None]:
|
) -> tuple[int, str | None]:
|
||||||
try:
|
try:
|
||||||
return run_ota_impl_(remote_host, remote_port, password, filename)
|
return run_ota_impl_(remote_host, remote_port, password, filename)
|
||||||
|
|||||||
@@ -224,36 +224,37 @@ def resolve_ip_address(
|
|||||||
return res
|
return res
|
||||||
|
|
||||||
# Process hosts
|
# Process hosts
|
||||||
cached_addresses: list[str] = []
|
|
||||||
uncached_hosts: list[str] = []
|
uncached_hosts: list[str] = []
|
||||||
has_cache = address_cache is not None
|
|
||||||
|
|
||||||
for h in hosts:
|
for h in hosts:
|
||||||
if is_ip_address(h):
|
if is_ip_address(h):
|
||||||
if has_cache:
|
_add_ip_addresses_to_addrinfo([h], port, res)
|
||||||
# If we have a cache, treat IPs as cached
|
|
||||||
cached_addresses.append(h)
|
|
||||||
else:
|
|
||||||
# If no cache, pass IPs through to resolver with hostnames
|
|
||||||
uncached_hosts.append(h)
|
|
||||||
elif address_cache and (cached := address_cache.get_addresses(h)):
|
elif address_cache and (cached := address_cache.get_addresses(h)):
|
||||||
# Found in cache
|
_add_ip_addresses_to_addrinfo(cached, port, res)
|
||||||
cached_addresses.extend(cached)
|
|
||||||
else:
|
else:
|
||||||
# Not cached, need to resolve
|
# Not cached, need to resolve
|
||||||
if address_cache and address_cache.has_cache():
|
if address_cache and address_cache.has_cache():
|
||||||
_LOGGER.info("Host %s not in cache, will need to resolve", h)
|
_LOGGER.info("Host %s not in cache, will need to resolve", h)
|
||||||
uncached_hosts.append(h)
|
uncached_hosts.append(h)
|
||||||
|
|
||||||
# Process cached addresses (includes direct IPs and cached lookups)
|
|
||||||
_add_ip_addresses_to_addrinfo(cached_addresses, port, res)
|
|
||||||
|
|
||||||
# If we have uncached hosts (only non-IP hostnames), resolve them
|
# If we have uncached hosts (only non-IP hostnames), resolve them
|
||||||
if uncached_hosts:
|
if uncached_hosts:
|
||||||
|
from aioesphomeapi.host_resolver import AddrInfo as AioAddrInfo
|
||||||
|
|
||||||
|
from esphome.core import EsphomeError
|
||||||
from esphome.resolver import AsyncResolver
|
from esphome.resolver import AsyncResolver
|
||||||
|
|
||||||
resolver = AsyncResolver(uncached_hosts, port)
|
resolver = AsyncResolver(uncached_hosts, port)
|
||||||
addr_infos = resolver.resolve()
|
addr_infos: list[AioAddrInfo] = []
|
||||||
|
try:
|
||||||
|
addr_infos = resolver.resolve()
|
||||||
|
except EsphomeError as err:
|
||||||
|
if not res:
|
||||||
|
# No pre-resolved addresses available, DNS resolution is fatal
|
||||||
|
raise
|
||||||
|
_LOGGER.info("%s (using %d already resolved IP addresses)", err, len(res))
|
||||||
|
|
||||||
# Convert aioesphomeapi AddrInfo to our format
|
# Convert aioesphomeapi AddrInfo to our format
|
||||||
for addr_info in addr_infos:
|
for addr_info in addr_infos:
|
||||||
sockaddr = addr_info.sockaddr
|
sockaddr = addr_info.sockaddr
|
||||||
|
|||||||
@@ -69,7 +69,7 @@ def run_schema_validation(config: ConfigType) -> None:
|
|||||||
{
|
{
|
||||||
"id": "display_id",
|
"id": "display_id",
|
||||||
"model": "custom",
|
"model": "custom",
|
||||||
"dimensions": {"width": 320, "height": 240},
|
"dimensions": {"width": 260, "height": 260},
|
||||||
"draw_rounding": 13,
|
"draw_rounding": 13,
|
||||||
"init_sequence": [[0xA0, 0x01]],
|
"init_sequence": [[0xA0, 0x01]],
|
||||||
},
|
},
|
||||||
@@ -336,7 +336,7 @@ def test_native_generation(
|
|||||||
|
|
||||||
main_cpp = generate_main(component_fixture_path("native.yaml"))
|
main_cpp = generate_main(component_fixture_path("native.yaml"))
|
||||||
assert (
|
assert (
|
||||||
"mipi_spi::MipiSpiBuffer<uint16_t, mipi_spi::PIXEL_MODE_16, true, mipi_spi::PIXEL_MODE_16, mipi_spi::BUS_TYPE_QUAD, 360, 360, 0, 1, display::DISPLAY_ROTATION_0_DEGREES, 1>()"
|
"mipi_spi::MipiSpiBuffer<uint16_t, mipi_spi::PIXEL_MODE_16, true, mipi_spi::PIXEL_MODE_16, mipi_spi::BUS_TYPE_QUAD, 360, 360, 0, 1, display::DISPLAY_ROTATION_0_DEGREES, 1, 1>()"
|
||||||
in main_cpp
|
in main_cpp
|
||||||
)
|
)
|
||||||
assert "set_init_sequence({240, 1, 8, 242" in main_cpp
|
assert "set_init_sequence({240, 1, 8, 242" in main_cpp
|
||||||
|
|||||||
@@ -7,8 +7,8 @@ display:
|
|||||||
id: ili9xxx_display
|
id: ili9xxx_display
|
||||||
model: GC9A01A
|
model: GC9A01A
|
||||||
invert_colors: True
|
invert_colors: True
|
||||||
cs_pin: 10
|
cs_pin: 11
|
||||||
dc_pin: 6
|
dc_pin: 7
|
||||||
pages:
|
pages:
|
||||||
- id: page1
|
- id: page1
|
||||||
lambda: |-
|
lambda: |-
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ display:
|
|||||||
invert_colors: true
|
invert_colors: true
|
||||||
show_test_card: true
|
show_test_card: true
|
||||||
spi_mode: mode0
|
spi_mode: mode0
|
||||||
draw_rounding: 8
|
draw_rounding: 4
|
||||||
use_axis_flips: true
|
use_axis_flips: true
|
||||||
init_sequence:
|
init_sequence:
|
||||||
- [0xd0, 1, 2, 3]
|
- [0xd0, 1, 2, 3]
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
substitutions:
|
substitutions:
|
||||||
dc_pin: GPIO14
|
dc_pin: GPIO14
|
||||||
cs_pin: GPIO13
|
cs_pin: GPIO13
|
||||||
enable_pin: GPIO16
|
enable_pin: GPIO17
|
||||||
reset_pin: GPIO20
|
reset_pin: GPIO20
|
||||||
|
|
||||||
packages:
|
packages:
|
||||||
|
|||||||
@@ -2,11 +2,13 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from argparse import Namespace
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
from esphome.core import CORE
|
||||||
from esphome.dashboard.settings import DashboardSettings
|
from esphome.dashboard.settings import DashboardSettings
|
||||||
|
|
||||||
|
|
||||||
@@ -159,3 +161,63 @@ def test_rel_path_with_numeric_args(dashboard_settings: DashboardSettings) -> No
|
|||||||
result = dashboard_settings.rel_path("123", "456.789")
|
result = dashboard_settings.rel_path("123", "456.789")
|
||||||
expected = dashboard_settings.config_dir / "123" / "456.789"
|
expected = dashboard_settings.config_dir / "123" / "456.789"
|
||||||
assert result == expected
|
assert result == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_config_path_parent_resolves_to_config_dir(tmp_path: Path) -> None:
|
||||||
|
"""Test that CORE.config_path.parent resolves to config_dir after parse_args.
|
||||||
|
|
||||||
|
This is a regression test for issue #11280 where binary download failed
|
||||||
|
when using packages with secrets after the Path migration in 2025.10.0.
|
||||||
|
|
||||||
|
The issue was that after switching from os.path to Path:
|
||||||
|
- Before: os.path.dirname("/config/.") → "/config"
|
||||||
|
- After: Path("/config/.").parent → Path("/") (normalized first!)
|
||||||
|
|
||||||
|
The fix uses a sentinel file so .parent returns the correct directory:
|
||||||
|
- Fixed: Path("/config/___DASHBOARD_SENTINEL___.yaml").parent → Path("/config")
|
||||||
|
"""
|
||||||
|
# Create test directory structure with secrets and packages
|
||||||
|
config_dir = tmp_path / "config"
|
||||||
|
config_dir.mkdir()
|
||||||
|
|
||||||
|
# Create secrets.yaml with obviously fake test values
|
||||||
|
secrets_file = config_dir / "secrets.yaml"
|
||||||
|
secrets_file.write_text(
|
||||||
|
"wifi_ssid: TEST-DUMMY-SSID\n"
|
||||||
|
"wifi_password: not-a-real-password-just-for-testing\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create package file that uses secrets
|
||||||
|
package_file = config_dir / "common.yaml"
|
||||||
|
package_file.write_text(
|
||||||
|
"wifi:\n ssid: !secret wifi_ssid\n password: !secret wifi_password\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create main device config that includes the package
|
||||||
|
device_config = config_dir / "test-device.yaml"
|
||||||
|
device_config.write_text(
|
||||||
|
"esphome:\n name: test-device\n\npackages:\n common: !include common.yaml\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set up dashboard settings with our test config directory
|
||||||
|
settings = DashboardSettings()
|
||||||
|
args = Namespace(
|
||||||
|
configuration=str(config_dir),
|
||||||
|
password=None,
|
||||||
|
username=None,
|
||||||
|
ha_addon=False,
|
||||||
|
verbose=False,
|
||||||
|
)
|
||||||
|
settings.parse_args(args)
|
||||||
|
|
||||||
|
# Verify that CORE.config_path.parent correctly points to the config directory
|
||||||
|
# This is critical for secret resolution in yaml_util.py which does:
|
||||||
|
# main_config_dir = CORE.config_path.parent
|
||||||
|
# main_secret_yml = main_config_dir / "secrets.yaml"
|
||||||
|
assert CORE.config_path.parent == config_dir.resolve()
|
||||||
|
assert (CORE.config_path.parent / "secrets.yaml").exists()
|
||||||
|
assert (CORE.config_path.parent / "common.yaml").exists()
|
||||||
|
|
||||||
|
# Verify that CORE.config_path itself uses the sentinel file
|
||||||
|
assert CORE.config_path.name == "___DASHBOARD_SENTINEL___.yaml"
|
||||||
|
assert not CORE.config_path.exists() # Sentinel file doesn't actually exist
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from argparse import Namespace
|
||||||
import asyncio
|
import asyncio
|
||||||
from collections.abc import Generator
|
from collections.abc import Generator
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
@@ -17,6 +18,8 @@ from tornado.ioloop import IOLoop
|
|||||||
from tornado.testing import bind_unused_port
|
from tornado.testing import bind_unused_port
|
||||||
from tornado.websocket import WebSocketClientConnection, websocket_connect
|
from tornado.websocket import WebSocketClientConnection, websocket_connect
|
||||||
|
|
||||||
|
from esphome import yaml_util
|
||||||
|
from esphome.core import CORE
|
||||||
from esphome.dashboard import web_server
|
from esphome.dashboard import web_server
|
||||||
from esphome.dashboard.const import DashboardEvent
|
from esphome.dashboard.const import DashboardEvent
|
||||||
from esphome.dashboard.core import DASHBOARD
|
from esphome.dashboard.core import DASHBOARD
|
||||||
@@ -32,6 +35,26 @@ from esphome.zeroconf import DiscoveredImport
|
|||||||
from .common import get_fixture_path
|
from .common import get_fixture_path
|
||||||
|
|
||||||
|
|
||||||
|
def get_build_path(base_path: Path, device_name: str) -> Path:
|
||||||
|
"""Get the build directory path for a device.
|
||||||
|
|
||||||
|
This is a test helper that constructs the standard ESPHome build directory
|
||||||
|
structure. Note: This helper does NOT perform path traversal sanitization
|
||||||
|
because it's only used in tests where we control the inputs. The actual
|
||||||
|
web_server.py code handles sanitization in DownloadBinaryRequestHandler.get()
|
||||||
|
via file_name.replace("..", "").lstrip("/").
|
||||||
|
|
||||||
|
Args:
|
||||||
|
base_path: The base temporary path (typically tmp_path from pytest)
|
||||||
|
device_name: The name of the device (should not contain path separators
|
||||||
|
in production use, but tests may use it for specific scenarios)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Path to the build directory (.esphome/build/device_name)
|
||||||
|
"""
|
||||||
|
return base_path / ".esphome" / "build" / device_name
|
||||||
|
|
||||||
|
|
||||||
class DashboardTestHelper:
|
class DashboardTestHelper:
|
||||||
def __init__(self, io_loop: IOLoop, client: AsyncHTTPClient, port: int) -> None:
|
def __init__(self, io_loop: IOLoop, client: AsyncHTTPClient, port: int) -> None:
|
||||||
self.io_loop = io_loop
|
self.io_loop = io_loop
|
||||||
@@ -414,6 +437,180 @@ async def test_download_binary_handler_idedata_fallback(
|
|||||||
assert response.body == b"bootloader content"
|
assert response.body == b"bootloader content"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.usefixtures("mock_ext_storage_path")
|
||||||
|
async def test_download_binary_handler_subdirectory_file(
|
||||||
|
dashboard: DashboardTestHelper,
|
||||||
|
tmp_path: Path,
|
||||||
|
mock_storage_json: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""Test the DownloadBinaryRequestHandler.get with file in subdirectory (nRF52 case).
|
||||||
|
|
||||||
|
This is a regression test for issue #11343 where the Path migration broke
|
||||||
|
downloads for nRF52 firmware files in subdirectories like 'zephyr/zephyr.uf2'.
|
||||||
|
|
||||||
|
The issue was that with_name() doesn't accept path separators:
|
||||||
|
- Before: path = storage_json.firmware_bin_path.with_name(file_name)
|
||||||
|
ValueError: Invalid name 'zephyr/zephyr.uf2'
|
||||||
|
- After: path = storage_json.firmware_bin_path.parent.joinpath(file_name)
|
||||||
|
Works correctly with subdirectory paths
|
||||||
|
"""
|
||||||
|
# Create a fake nRF52 build structure with firmware in subdirectory
|
||||||
|
build_dir = get_build_path(tmp_path, "nrf52-device")
|
||||||
|
zephyr_dir = build_dir / "zephyr"
|
||||||
|
zephyr_dir.mkdir(parents=True)
|
||||||
|
|
||||||
|
# Create the main firmware binary (would be in build root)
|
||||||
|
firmware_file = build_dir / "firmware.bin"
|
||||||
|
firmware_file.write_bytes(b"main firmware")
|
||||||
|
|
||||||
|
# Create the UF2 file in zephyr subdirectory (nRF52 specific)
|
||||||
|
uf2_file = zephyr_dir / "zephyr.uf2"
|
||||||
|
uf2_file.write_bytes(b"nRF52 UF2 firmware content")
|
||||||
|
|
||||||
|
# Mock storage JSON
|
||||||
|
mock_storage = Mock()
|
||||||
|
mock_storage.name = "nrf52-device"
|
||||||
|
mock_storage.firmware_bin_path = firmware_file
|
||||||
|
mock_storage_json.load.return_value = mock_storage
|
||||||
|
|
||||||
|
# Request the UF2 file with subdirectory path
|
||||||
|
response = await dashboard.fetch(
|
||||||
|
"/download.bin?configuration=nrf52-device.yaml&file=zephyr/zephyr.uf2",
|
||||||
|
method="GET",
|
||||||
|
)
|
||||||
|
assert response.code == 200
|
||||||
|
assert response.body == b"nRF52 UF2 firmware content"
|
||||||
|
assert response.headers["Content-Type"] == "application/octet-stream"
|
||||||
|
assert "attachment" in response.headers["Content-Disposition"]
|
||||||
|
# Download name should be device-name + full file path
|
||||||
|
assert "nrf52-device-zephyr/zephyr.uf2" in response.headers["Content-Disposition"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.usefixtures("mock_ext_storage_path")
|
||||||
|
async def test_download_binary_handler_subdirectory_file_url_encoded(
|
||||||
|
dashboard: DashboardTestHelper,
|
||||||
|
tmp_path: Path,
|
||||||
|
mock_storage_json: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""Test the DownloadBinaryRequestHandler.get with URL-encoded subdirectory path.
|
||||||
|
|
||||||
|
Verifies that URL-encoded paths (e.g., zephyr%2Fzephyr.uf2) are correctly
|
||||||
|
decoded and handled, and that custom download names work with subdirectories.
|
||||||
|
"""
|
||||||
|
# Create a fake build structure with firmware in subdirectory
|
||||||
|
build_dir = get_build_path(tmp_path, "test")
|
||||||
|
zephyr_dir = build_dir / "zephyr"
|
||||||
|
zephyr_dir.mkdir(parents=True)
|
||||||
|
|
||||||
|
firmware_file = build_dir / "firmware.bin"
|
||||||
|
firmware_file.write_bytes(b"content")
|
||||||
|
|
||||||
|
uf2_file = zephyr_dir / "zephyr.uf2"
|
||||||
|
uf2_file.write_bytes(b"content")
|
||||||
|
|
||||||
|
# Mock storage JSON
|
||||||
|
mock_storage = Mock()
|
||||||
|
mock_storage.name = "test_device"
|
||||||
|
mock_storage.firmware_bin_path = firmware_file
|
||||||
|
mock_storage_json.load.return_value = mock_storage
|
||||||
|
|
||||||
|
# Request with URL-encoded path and custom download name
|
||||||
|
response = await dashboard.fetch(
|
||||||
|
"/download.bin?configuration=test.yaml&file=zephyr%2Fzephyr.uf2&download=custom_name.bin",
|
||||||
|
method="GET",
|
||||||
|
)
|
||||||
|
assert response.code == 200
|
||||||
|
assert "custom_name.bin" in response.headers["Content-Disposition"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.usefixtures("mock_ext_storage_path")
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"attack_path",
|
||||||
|
[
|
||||||
|
pytest.param("../../../secrets.yaml", id="basic_traversal"),
|
||||||
|
pytest.param("..%2F..%2F..%2Fsecrets.yaml", id="url_encoded"),
|
||||||
|
pytest.param("zephyr/../../../secrets.yaml", id="traversal_with_prefix"),
|
||||||
|
pytest.param("/etc/passwd", id="absolute_path"),
|
||||||
|
pytest.param("//etc/passwd", id="double_slash_absolute"),
|
||||||
|
pytest.param("....//secrets.yaml", id="multiple_dots"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
async def test_download_binary_handler_path_traversal_protection(
|
||||||
|
dashboard: DashboardTestHelper,
|
||||||
|
tmp_path: Path,
|
||||||
|
mock_storage_json: MagicMock,
|
||||||
|
attack_path: str,
|
||||||
|
) -> None:
|
||||||
|
"""Test that DownloadBinaryRequestHandler prevents path traversal attacks.
|
||||||
|
|
||||||
|
Verifies that attempts to use '..' in file paths are sanitized to prevent
|
||||||
|
accessing files outside the build directory. Tests multiple attack vectors.
|
||||||
|
"""
|
||||||
|
# Create build structure
|
||||||
|
build_dir = get_build_path(tmp_path, "test")
|
||||||
|
build_dir.mkdir(parents=True)
|
||||||
|
firmware_file = build_dir / "firmware.bin"
|
||||||
|
firmware_file.write_bytes(b"firmware content")
|
||||||
|
|
||||||
|
# Create a sensitive file outside the build directory that should NOT be accessible
|
||||||
|
sensitive_file = tmp_path / "secrets.yaml"
|
||||||
|
sensitive_file.write_bytes(b"secret: my_secret_password")
|
||||||
|
|
||||||
|
# Mock storage JSON
|
||||||
|
mock_storage = Mock()
|
||||||
|
mock_storage.name = "test_device"
|
||||||
|
mock_storage.firmware_bin_path = firmware_file
|
||||||
|
mock_storage_json.load.return_value = mock_storage
|
||||||
|
|
||||||
|
# Attempt path traversal attack - should be blocked
|
||||||
|
with pytest.raises(HTTPClientError) as exc_info:
|
||||||
|
await dashboard.fetch(
|
||||||
|
f"/download.bin?configuration=test.yaml&file={attack_path}",
|
||||||
|
method="GET",
|
||||||
|
)
|
||||||
|
# Should get 404 (file not found after sanitization) or 500 (idedata fails)
|
||||||
|
assert exc_info.value.code in (404, 500)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.usefixtures("mock_ext_storage_path")
|
||||||
|
async def test_download_binary_handler_multiple_subdirectory_levels(
|
||||||
|
dashboard: DashboardTestHelper,
|
||||||
|
tmp_path: Path,
|
||||||
|
mock_storage_json: MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""Test downloading files from multiple subdirectory levels.
|
||||||
|
|
||||||
|
Verifies that joinpath correctly handles multi-level paths like 'build/output/firmware.bin'.
|
||||||
|
"""
|
||||||
|
# Create nested directory structure
|
||||||
|
build_dir = get_build_path(tmp_path, "test")
|
||||||
|
nested_dir = build_dir / "build" / "output"
|
||||||
|
nested_dir.mkdir(parents=True)
|
||||||
|
|
||||||
|
firmware_file = build_dir / "firmware.bin"
|
||||||
|
firmware_file.write_bytes(b"main")
|
||||||
|
|
||||||
|
nested_file = nested_dir / "firmware.bin"
|
||||||
|
nested_file.write_bytes(b"nested firmware content")
|
||||||
|
|
||||||
|
# Mock storage JSON
|
||||||
|
mock_storage = Mock()
|
||||||
|
mock_storage.name = "test_device"
|
||||||
|
mock_storage.firmware_bin_path = firmware_file
|
||||||
|
mock_storage_json.load.return_value = mock_storage
|
||||||
|
|
||||||
|
response = await dashboard.fetch(
|
||||||
|
"/download.bin?configuration=test.yaml&file=build/output/firmware.bin",
|
||||||
|
method="GET",
|
||||||
|
)
|
||||||
|
assert response.code == 200
|
||||||
|
assert response.body == b"nested firmware content"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_edit_request_handler_post_invalid_file(
|
async def test_edit_request_handler_post_invalid_file(
|
||||||
dashboard: DashboardTestHelper,
|
dashboard: DashboardTestHelper,
|
||||||
@@ -1302,3 +1499,71 @@ async def test_dashboard_subscriber_refresh_event(
|
|||||||
|
|
||||||
# Give it a moment to clean up
|
# Give it a moment to clean up
|
||||||
await asyncio.sleep(0.01)
|
await asyncio.sleep(0.01)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_dashboard_yaml_loading_with_packages_and_secrets(
|
||||||
|
tmp_path: Path,
|
||||||
|
) -> None:
|
||||||
|
"""Test dashboard YAML loading with packages referencing secrets.
|
||||||
|
|
||||||
|
This is a regression test for issue #11280 where binary download failed
|
||||||
|
when using packages with secrets after the Path migration in 2025.10.0.
|
||||||
|
|
||||||
|
This test verifies that CORE.config_path initialization in the dashboard
|
||||||
|
allows yaml_util.load_yaml() to correctly resolve secrets from packages.
|
||||||
|
"""
|
||||||
|
# Create test directory structure with secrets and packages
|
||||||
|
config_dir = tmp_path / "config"
|
||||||
|
config_dir.mkdir()
|
||||||
|
|
||||||
|
# Create secrets.yaml with obviously fake test values
|
||||||
|
secrets_file = config_dir / "secrets.yaml"
|
||||||
|
secrets_file.write_text(
|
||||||
|
"wifi_ssid: TEST-DUMMY-SSID\n"
|
||||||
|
"wifi_password: not-a-real-password-just-for-testing\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create package file that uses secrets
|
||||||
|
package_file = config_dir / "common.yaml"
|
||||||
|
package_file.write_text(
|
||||||
|
"wifi:\n ssid: !secret wifi_ssid\n password: !secret wifi_password\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create main device config that includes the package
|
||||||
|
device_config = config_dir / "test-download-secrets.yaml"
|
||||||
|
device_config.write_text(
|
||||||
|
"esphome:\n name: test-download-secrets\n platform: ESP32\n board: esp32dev\n\n"
|
||||||
|
"packages:\n common: !include common.yaml\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Initialize DASHBOARD settings with our test config directory
|
||||||
|
# This is what sets CORE.config_path - the critical code path for the bug
|
||||||
|
args = Namespace(
|
||||||
|
configuration=str(config_dir),
|
||||||
|
password=None,
|
||||||
|
username=None,
|
||||||
|
ha_addon=False,
|
||||||
|
verbose=False,
|
||||||
|
)
|
||||||
|
DASHBOARD.settings.parse_args(args)
|
||||||
|
|
||||||
|
# With the fix: CORE.config_path should be config_dir / "___DASHBOARD_SENTINEL___.yaml"
|
||||||
|
# so CORE.config_path.parent would be config_dir
|
||||||
|
# Without the fix: CORE.config_path is config_dir / "." which normalizes to config_dir
|
||||||
|
# so CORE.config_path.parent would be tmp_path (the parent of config_dir)
|
||||||
|
|
||||||
|
# The fix ensures CORE.config_path.parent points to config_dir
|
||||||
|
assert CORE.config_path.parent == config_dir.resolve(), (
|
||||||
|
f"CORE.config_path.parent should point to config_dir. "
|
||||||
|
f"Got {CORE.config_path.parent}, expected {config_dir.resolve()}. "
|
||||||
|
f"CORE.config_path is {CORE.config_path}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Now load the YAML with packages that reference secrets
|
||||||
|
# This is where the bug would manifest - yaml_util.load_yaml would fail
|
||||||
|
# to find secrets.yaml because CORE.config_path.parent pointed to the wrong place
|
||||||
|
config = yaml_util.load_yaml(device_config)
|
||||||
|
# If we get here, secret resolution worked!
|
||||||
|
assert "esphome" in config
|
||||||
|
assert config["esphome"]["name"] == "test-download-secrets"
|
||||||
|
|||||||
@@ -570,6 +570,13 @@ class TestEsphomeCore:
|
|||||||
|
|
||||||
assert target.address == "4.3.2.1"
|
assert target.address == "4.3.2.1"
|
||||||
|
|
||||||
|
def test_address__openthread(self, target):
|
||||||
|
target.name = "test-device"
|
||||||
|
target.config = {}
|
||||||
|
target.config[const.CONF_OPENTHREAD] = {}
|
||||||
|
|
||||||
|
assert target.address == "test-device.local"
|
||||||
|
|
||||||
def test_is_esp32(self, target):
|
def test_is_esp32(self, target):
|
||||||
target.data[const.KEY_CORE] = {const.KEY_TARGET_PLATFORM: "esp32"}
|
target.data[const.KEY_CORE] = {const.KEY_TARGET_PLATFORM: "esp32"}
|
||||||
|
|
||||||
|
|||||||
@@ -287,7 +287,7 @@ def test_perform_ota_no_auth(mock_socket: Mock, mock_file: io.BytesIO) -> None:
|
|||||||
|
|
||||||
mock_socket.recv.side_effect = recv_responses
|
mock_socket.recv.side_effect = recv_responses
|
||||||
|
|
||||||
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
|
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
|
||||||
|
|
||||||
# Should not send any auth-related data
|
# Should not send any auth-related data
|
||||||
auth_calls = [
|
auth_calls = [
|
||||||
@@ -317,7 +317,7 @@ def test_perform_ota_with_compression(mock_socket: Mock) -> None:
|
|||||||
|
|
||||||
mock_socket.recv.side_effect = recv_responses
|
mock_socket.recv.side_effect = recv_responses
|
||||||
|
|
||||||
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
|
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
|
||||||
|
|
||||||
# Verify compressed content was sent
|
# Verify compressed content was sent
|
||||||
# Get the binary size that was sent (4 bytes after features)
|
# Get the binary size that was sent (4 bytes after features)
|
||||||
@@ -347,7 +347,7 @@ def test_perform_ota_auth_without_password(mock_socket: Mock) -> None:
|
|||||||
with pytest.raises(
|
with pytest.raises(
|
||||||
espota2.OTAError, match="ESP requests password, but no password given"
|
espota2.OTAError, match="ESP requests password, but no password given"
|
||||||
):
|
):
|
||||||
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
|
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("mock_time")
|
@pytest.mark.usefixtures("mock_time")
|
||||||
@@ -413,7 +413,7 @@ def test_perform_ota_sha256_auth_without_password(mock_socket: Mock) -> None:
|
|||||||
with pytest.raises(
|
with pytest.raises(
|
||||||
espota2.OTAError, match="ESP requests password, but no password given"
|
espota2.OTAError, match="ESP requests password, but no password given"
|
||||||
):
|
):
|
||||||
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
|
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
|
||||||
|
|
||||||
|
|
||||||
def test_perform_ota_unexpected_auth_response(mock_socket: Mock) -> None:
|
def test_perform_ota_unexpected_auth_response(mock_socket: Mock) -> None:
|
||||||
@@ -450,7 +450,7 @@ def test_perform_ota_unsupported_version(mock_socket: Mock) -> None:
|
|||||||
mock_socket.recv.side_effect = responses
|
mock_socket.recv.side_effect = responses
|
||||||
|
|
||||||
with pytest.raises(espota2.OTAError, match="Device uses unsupported OTA version"):
|
with pytest.raises(espota2.OTAError, match="Device uses unsupported OTA version"):
|
||||||
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
|
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("mock_time")
|
@pytest.mark.usefixtures("mock_time")
|
||||||
@@ -471,7 +471,7 @@ def test_perform_ota_upload_error(mock_socket: Mock, mock_file: io.BytesIO) -> N
|
|||||||
mock_socket.recv.side_effect = recv_responses
|
mock_socket.recv.side_effect = recv_responses
|
||||||
|
|
||||||
with pytest.raises(espota2.OTAError, match="Error receiving acknowledge chunk OK"):
|
with pytest.raises(espota2.OTAError, match="Error receiving acknowledge chunk OK"):
|
||||||
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
|
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("mock_socket_constructor", "mock_resolve_ip")
|
@pytest.mark.usefixtures("mock_socket_constructor", "mock_resolve_ip")
|
||||||
@@ -706,7 +706,7 @@ def test_perform_ota_version_differences(
|
|||||||
]
|
]
|
||||||
|
|
||||||
mock_socket.recv.side_effect = recv_responses
|
mock_socket.recv.side_effect = recv_responses
|
||||||
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
|
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
|
||||||
|
|
||||||
# For v1.0, verify that we only get the expected number of recv calls
|
# For v1.0, verify that we only get the expected number of recv calls
|
||||||
# v1.0 doesn't have chunk acknowledgments, so fewer recv calls
|
# v1.0 doesn't have chunk acknowledgments, so fewer recv calls
|
||||||
@@ -732,7 +732,7 @@ def test_perform_ota_version_differences(
|
|||||||
]
|
]
|
||||||
|
|
||||||
mock_socket.recv.side_effect = recv_responses_v2
|
mock_socket.recv.side_effect = recv_responses_v2
|
||||||
espota2.perform_ota(mock_socket, "", mock_file, "test.bin")
|
espota2.perform_ota(mock_socket, None, mock_file, "test.bin")
|
||||||
|
|
||||||
# For v2.0, verify more recv calls due to chunk acknowledgments
|
# For v2.0, verify more recv calls due to chunk acknowledgments
|
||||||
assert mock_socket.recv.call_count == 9 # v2.0 has 9 recv calls (includes chunk OK)
|
assert mock_socket.recv.call_count == 9 # v2.0 has 9 recv calls (includes chunk OK)
|
||||||
|
|||||||
@@ -454,9 +454,27 @@ def test_resolve_ip_address_mixed_list() -> None:
|
|||||||
# Mix of IP and hostname - should use async resolver
|
# Mix of IP and hostname - should use async resolver
|
||||||
result = helpers.resolve_ip_address(["192.168.1.100", "test.local"], 6053)
|
result = helpers.resolve_ip_address(["192.168.1.100", "test.local"], 6053)
|
||||||
|
|
||||||
|
assert len(result) == 2
|
||||||
|
assert result[0][4][0] == "192.168.1.100"
|
||||||
|
assert result[1][4][0] == "192.168.1.200"
|
||||||
|
MockResolver.assert_called_once_with(["test.local"], 6053)
|
||||||
|
mock_resolver.resolve.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
def test_resolve_ip_address_mixed_list_fail() -> None:
|
||||||
|
"""Test resolving a mix of IPs and hostnames with resolve failed."""
|
||||||
|
with patch("esphome.resolver.AsyncResolver") as MockResolver:
|
||||||
|
mock_resolver = MockResolver.return_value
|
||||||
|
mock_resolver.resolve.side_effect = EsphomeError(
|
||||||
|
"Error resolving IP address: [test.local]"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Mix of IP and hostname - should use async resolver
|
||||||
|
result = helpers.resolve_ip_address(["192.168.1.100", "test.local"], 6053)
|
||||||
|
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0][4][0] == "192.168.1.200"
|
assert result[0][4][0] == "192.168.1.100"
|
||||||
MockResolver.assert_called_once_with(["192.168.1.100", "test.local"], 6053)
|
MockResolver.assert_called_once_with(["test.local"], 6053)
|
||||||
mock_resolver.resolve.assert_called_once()
|
mock_resolver.resolve.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -321,12 +321,14 @@ def test_choose_upload_log_host_with_serial_device_no_ports(
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""Test SERIAL device when no serial ports are found."""
|
"""Test SERIAL device when no serial ports are found."""
|
||||||
setup_core()
|
setup_core()
|
||||||
result = choose_upload_log_host(
|
with pytest.raises(
|
||||||
default="SERIAL",
|
EsphomeError, match="All specified devices .* could not be resolved"
|
||||||
check_default=None,
|
):
|
||||||
purpose=Purpose.UPLOADING,
|
choose_upload_log_host(
|
||||||
)
|
default="SERIAL",
|
||||||
assert result == []
|
check_default=None,
|
||||||
|
purpose=Purpose.UPLOADING,
|
||||||
|
)
|
||||||
assert "No serial ports found, skipping SERIAL device" in caplog.text
|
assert "No serial ports found, skipping SERIAL device" in caplog.text
|
||||||
|
|
||||||
|
|
||||||
@@ -367,12 +369,14 @@ def test_choose_upload_log_host_with_ota_device_with_api_config() -> None:
|
|||||||
"""Test OTA device when API is configured (no upload without OTA in config)."""
|
"""Test OTA device when API is configured (no upload without OTA in config)."""
|
||||||
setup_core(config={CONF_API: {}}, address="192.168.1.100")
|
setup_core(config={CONF_API: {}}, address="192.168.1.100")
|
||||||
|
|
||||||
result = choose_upload_log_host(
|
with pytest.raises(
|
||||||
default="OTA",
|
EsphomeError, match="All specified devices .* could not be resolved"
|
||||||
check_default=None,
|
):
|
||||||
purpose=Purpose.UPLOADING,
|
choose_upload_log_host(
|
||||||
)
|
default="OTA",
|
||||||
assert result == []
|
check_default=None,
|
||||||
|
purpose=Purpose.UPLOADING,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_choose_upload_log_host_with_ota_device_with_api_config_logging() -> None:
|
def test_choose_upload_log_host_with_ota_device_with_api_config_logging() -> None:
|
||||||
@@ -405,12 +409,14 @@ def test_choose_upload_log_host_with_ota_device_no_fallback() -> None:
|
|||||||
"""Test OTA device with no valid fallback options."""
|
"""Test OTA device with no valid fallback options."""
|
||||||
setup_core()
|
setup_core()
|
||||||
|
|
||||||
result = choose_upload_log_host(
|
with pytest.raises(
|
||||||
default="OTA",
|
EsphomeError, match="All specified devices .* could not be resolved"
|
||||||
check_default=None,
|
):
|
||||||
purpose=Purpose.UPLOADING,
|
choose_upload_log_host(
|
||||||
)
|
default="OTA",
|
||||||
assert result == []
|
check_default=None,
|
||||||
|
purpose=Purpose.UPLOADING,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("mock_choose_prompt")
|
@pytest.mark.usefixtures("mock_choose_prompt")
|
||||||
@@ -615,21 +621,19 @@ def test_choose_upload_log_host_empty_defaults_list() -> None:
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("mock_no_serial_ports", "mock_no_mqtt_logging")
|
@pytest.mark.usefixtures("mock_no_serial_ports", "mock_no_mqtt_logging")
|
||||||
def test_choose_upload_log_host_all_devices_unresolved(
|
def test_choose_upload_log_host_all_devices_unresolved() -> None:
|
||||||
caplog: pytest.LogCaptureFixture,
|
|
||||||
) -> None:
|
|
||||||
"""Test when all specified devices cannot be resolved."""
|
"""Test when all specified devices cannot be resolved."""
|
||||||
setup_core()
|
setup_core()
|
||||||
|
|
||||||
result = choose_upload_log_host(
|
with pytest.raises(
|
||||||
default=["SERIAL", "OTA"],
|
EsphomeError,
|
||||||
check_default=None,
|
match=r"All specified devices \['SERIAL', 'OTA'\] could not be resolved",
|
||||||
purpose=Purpose.UPLOADING,
|
):
|
||||||
)
|
choose_upload_log_host(
|
||||||
assert result == []
|
default=["SERIAL", "OTA"],
|
||||||
assert (
|
check_default=None,
|
||||||
"All specified devices: ['SERIAL', 'OTA'] could not be resolved." in caplog.text
|
purpose=Purpose.UPLOADING,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("mock_no_serial_ports", "mock_no_mqtt_logging")
|
@pytest.mark.usefixtures("mock_no_serial_ports", "mock_no_mqtt_logging")
|
||||||
@@ -762,12 +766,14 @@ def test_choose_upload_log_host_no_address_with_ota_config() -> None:
|
|||||||
"""Test OTA device when OTA is configured but no address is set."""
|
"""Test OTA device when OTA is configured but no address is set."""
|
||||||
setup_core(config={CONF_OTA: {}})
|
setup_core(config={CONF_OTA: {}})
|
||||||
|
|
||||||
result = choose_upload_log_host(
|
with pytest.raises(
|
||||||
default="OTA",
|
EsphomeError, match="All specified devices .* could not be resolved"
|
||||||
check_default=None,
|
):
|
||||||
purpose=Purpose.UPLOADING,
|
choose_upload_log_host(
|
||||||
)
|
default="OTA",
|
||||||
assert result == []
|
check_default=None,
|
||||||
|
purpose=Purpose.UPLOADING,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -1062,7 +1068,7 @@ def test_upload_program_ota_with_file_arg(
|
|||||||
assert exit_code == 0
|
assert exit_code == 0
|
||||||
assert host == "192.168.1.100"
|
assert host == "192.168.1.100"
|
||||||
mock_run_ota.assert_called_once_with(
|
mock_run_ota.assert_called_once_with(
|
||||||
["192.168.1.100"], 3232, "", Path("custom.bin")
|
["192.168.1.100"], 3232, None, Path("custom.bin")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -1119,7 +1125,9 @@ def test_upload_program_ota_with_mqtt_resolution(
|
|||||||
expected_firmware = (
|
expected_firmware = (
|
||||||
tmp_path / ".esphome" / "build" / "test" / ".pioenvs" / "test" / "firmware.bin"
|
tmp_path / ".esphome" / "build" / "test" / ".pioenvs" / "test" / "firmware.bin"
|
||||||
)
|
)
|
||||||
mock_run_ota.assert_called_once_with(["192.168.1.100"], 3232, "", expected_firmware)
|
mock_run_ota.assert_called_once_with(
|
||||||
|
["192.168.1.100"], 3232, None, expected_firmware
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@patch("esphome.__main__.importlib.import_module")
|
@patch("esphome.__main__.importlib.import_module")
|
||||||
@@ -1976,3 +1984,292 @@ def test_command_clean_all_args_used() -> None:
|
|||||||
# Verify the correct configuration paths were passed
|
# Verify the correct configuration paths were passed
|
||||||
mock_clean_all.assert_any_call(["/path/to/config1"])
|
mock_clean_all.assert_any_call(["/path/to/config1"])
|
||||||
mock_clean_all.assert_any_call(["/path/to/config2", "/path/to/config3"])
|
mock_clean_all.assert_any_call(["/path/to/config2", "/path/to/config3"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_upload_program_ota_static_ip_with_mqttip(
|
||||||
|
mock_mqtt_get_ip: Mock,
|
||||||
|
mock_run_ota: Mock,
|
||||||
|
tmp_path: Path,
|
||||||
|
) -> None:
|
||||||
|
"""Test upload_program with static IP and MQTTIP (issue #11260).
|
||||||
|
|
||||||
|
This tests the scenario where a device has manual_ip (static IP) configured
|
||||||
|
and MQTT is also configured. The devices list contains both the static IP
|
||||||
|
and "MQTTIP" magic string. This previously failed because only the first
|
||||||
|
device was checked for MQTT resolution.
|
||||||
|
"""
|
||||||
|
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path)
|
||||||
|
|
||||||
|
mock_mqtt_get_ip.return_value = ["192.168.2.50"] # Different subnet
|
||||||
|
mock_run_ota.return_value = (0, "192.168.1.100")
|
||||||
|
|
||||||
|
config = {
|
||||||
|
CONF_OTA: [
|
||||||
|
{
|
||||||
|
CONF_PLATFORM: CONF_ESPHOME,
|
||||||
|
CONF_PORT: 3232,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
CONF_MQTT: {
|
||||||
|
CONF_BROKER: "mqtt.local",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
args = MockArgs(username="user", password="pass", client_id="client")
|
||||||
|
# Simulates choose_upload_log_host returning static IP + MQTTIP
|
||||||
|
devices = ["192.168.1.100", "MQTTIP"]
|
||||||
|
|
||||||
|
exit_code, host = upload_program(config, args, devices)
|
||||||
|
|
||||||
|
assert exit_code == 0
|
||||||
|
assert host == "192.168.1.100"
|
||||||
|
|
||||||
|
# Verify MQTT was resolved
|
||||||
|
mock_mqtt_get_ip.assert_called_once_with(config, "user", "pass", "client")
|
||||||
|
|
||||||
|
# Verify espota2.run_ota was called with both IPs
|
||||||
|
expected_firmware = (
|
||||||
|
tmp_path / ".esphome" / "build" / "test" / ".pioenvs" / "test" / "firmware.bin"
|
||||||
|
)
|
||||||
|
mock_run_ota.assert_called_once_with(
|
||||||
|
["192.168.1.100", "192.168.2.50"], 3232, None, expected_firmware
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_upload_program_ota_multiple_mqttip_resolves_once(
|
||||||
|
mock_mqtt_get_ip: Mock,
|
||||||
|
mock_run_ota: Mock,
|
||||||
|
tmp_path: Path,
|
||||||
|
) -> None:
|
||||||
|
"""Test that MQTT resolution only happens once even with multiple MQTT magic strings."""
|
||||||
|
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path)
|
||||||
|
|
||||||
|
mock_mqtt_get_ip.return_value = ["192.168.2.50", "192.168.2.51"]
|
||||||
|
mock_run_ota.return_value = (0, "192.168.2.50")
|
||||||
|
|
||||||
|
config = {
|
||||||
|
CONF_OTA: [
|
||||||
|
{
|
||||||
|
CONF_PLATFORM: CONF_ESPHOME,
|
||||||
|
CONF_PORT: 3232,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
CONF_MQTT: {
|
||||||
|
CONF_BROKER: "mqtt.local",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
args = MockArgs(username="user", password="pass", client_id="client")
|
||||||
|
# Multiple MQTT magic strings in the list
|
||||||
|
devices = ["MQTTIP", "MQTT", "192.168.1.100"]
|
||||||
|
|
||||||
|
exit_code, host = upload_program(config, args, devices)
|
||||||
|
|
||||||
|
assert exit_code == 0
|
||||||
|
assert host == "192.168.2.50"
|
||||||
|
|
||||||
|
# Verify MQTT was only resolved once despite multiple MQTT magic strings
|
||||||
|
mock_mqtt_get_ip.assert_called_once_with(config, "user", "pass", "client")
|
||||||
|
|
||||||
|
# Verify espota2.run_ota was called with all unique IPs
|
||||||
|
expected_firmware = (
|
||||||
|
tmp_path / ".esphome" / "build" / "test" / ".pioenvs" / "test" / "firmware.bin"
|
||||||
|
)
|
||||||
|
mock_run_ota.assert_called_once_with(
|
||||||
|
["192.168.2.50", "192.168.2.51", "192.168.1.100"], 3232, None, expected_firmware
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_upload_program_ota_mqttip_deduplication(
|
||||||
|
mock_mqtt_get_ip: Mock,
|
||||||
|
mock_run_ota: Mock,
|
||||||
|
tmp_path: Path,
|
||||||
|
) -> None:
|
||||||
|
"""Test that duplicate IPs are filtered when MQTT returns same IP as static IP."""
|
||||||
|
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path)
|
||||||
|
|
||||||
|
# MQTT returns the same IP as the static IP
|
||||||
|
mock_mqtt_get_ip.return_value = ["192.168.1.100"]
|
||||||
|
mock_run_ota.return_value = (0, "192.168.1.100")
|
||||||
|
|
||||||
|
config = {
|
||||||
|
CONF_OTA: [
|
||||||
|
{
|
||||||
|
CONF_PLATFORM: CONF_ESPHOME,
|
||||||
|
CONF_PORT: 3232,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
CONF_MQTT: {
|
||||||
|
CONF_BROKER: "mqtt.local",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
args = MockArgs(username="user", password="pass", client_id="client")
|
||||||
|
devices = ["192.168.1.100", "MQTTIP"]
|
||||||
|
|
||||||
|
exit_code, host = upload_program(config, args, devices)
|
||||||
|
|
||||||
|
assert exit_code == 0
|
||||||
|
assert host == "192.168.1.100"
|
||||||
|
|
||||||
|
# Verify MQTT was resolved
|
||||||
|
mock_mqtt_get_ip.assert_called_once_with(config, "user", "pass", "client")
|
||||||
|
|
||||||
|
# Verify espota2.run_ota was called with deduplicated IPs (only one instance of 192.168.1.100)
|
||||||
|
# Note: Current implementation doesn't dedupe, so we'll get the IP twice
|
||||||
|
# This test documents current behavior - deduplication could be future enhancement
|
||||||
|
mock_run_ota.assert_called_once()
|
||||||
|
call_args = mock_run_ota.call_args[0]
|
||||||
|
# Should contain both the original IP and MQTT-resolved IP (even if duplicate)
|
||||||
|
assert "192.168.1.100" in call_args[0]
|
||||||
|
|
||||||
|
|
||||||
|
@patch("esphome.components.api.client.run_logs")
|
||||||
|
def test_show_logs_api_static_ip_with_mqttip(
|
||||||
|
mock_run_logs: Mock,
|
||||||
|
mock_mqtt_get_ip: Mock,
|
||||||
|
) -> None:
|
||||||
|
"""Test show_logs with static IP and MQTTIP (issue #11260).
|
||||||
|
|
||||||
|
This tests the scenario where a device has manual_ip (static IP) configured
|
||||||
|
and MQTT is also configured. The devices list contains both the static IP
|
||||||
|
and "MQTTIP" magic string.
|
||||||
|
"""
|
||||||
|
setup_core(
|
||||||
|
config={
|
||||||
|
"logger": {},
|
||||||
|
CONF_API: {},
|
||||||
|
CONF_MQTT: {CONF_BROKER: "mqtt.local"},
|
||||||
|
},
|
||||||
|
platform=PLATFORM_ESP32,
|
||||||
|
)
|
||||||
|
mock_run_logs.return_value = 0
|
||||||
|
mock_mqtt_get_ip.return_value = ["192.168.2.50"]
|
||||||
|
|
||||||
|
args = MockArgs(username="user", password="pass", client_id="client")
|
||||||
|
# Simulates choose_upload_log_host returning static IP + MQTTIP
|
||||||
|
devices = ["192.168.1.100", "MQTTIP"]
|
||||||
|
|
||||||
|
result = show_logs(CORE.config, args, devices)
|
||||||
|
|
||||||
|
assert result == 0
|
||||||
|
|
||||||
|
# Verify MQTT was resolved
|
||||||
|
mock_mqtt_get_ip.assert_called_once_with(CORE.config, "user", "pass", "client")
|
||||||
|
|
||||||
|
# Verify run_logs was called with both IPs
|
||||||
|
mock_run_logs.assert_called_once_with(
|
||||||
|
CORE.config, ["192.168.1.100", "192.168.2.50"]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@patch("esphome.components.api.client.run_logs")
|
||||||
|
def test_show_logs_api_multiple_mqttip_resolves_once(
|
||||||
|
mock_run_logs: Mock,
|
||||||
|
mock_mqtt_get_ip: Mock,
|
||||||
|
) -> None:
|
||||||
|
"""Test that MQTT resolution only happens once for show_logs with multiple MQTT magic strings."""
|
||||||
|
setup_core(
|
||||||
|
config={
|
||||||
|
"logger": {},
|
||||||
|
CONF_API: {},
|
||||||
|
CONF_MQTT: {CONF_BROKER: "mqtt.local"},
|
||||||
|
},
|
||||||
|
platform=PLATFORM_ESP32,
|
||||||
|
)
|
||||||
|
mock_run_logs.return_value = 0
|
||||||
|
mock_mqtt_get_ip.return_value = ["192.168.2.50", "192.168.2.51"]
|
||||||
|
|
||||||
|
args = MockArgs(username="user", password="pass", client_id="client")
|
||||||
|
# Multiple MQTT magic strings in the list
|
||||||
|
devices = ["MQTTIP", "192.168.1.100", "MQTT"]
|
||||||
|
|
||||||
|
result = show_logs(CORE.config, args, devices)
|
||||||
|
|
||||||
|
assert result == 0
|
||||||
|
|
||||||
|
# Verify MQTT was only resolved once despite multiple MQTT magic strings
|
||||||
|
mock_mqtt_get_ip.assert_called_once_with(CORE.config, "user", "pass", "client")
|
||||||
|
|
||||||
|
# Verify run_logs was called with all unique IPs (MQTT strings replaced with IPs)
|
||||||
|
# Note: "MQTT" is a different magic string from "MQTTIP", but both trigger MQTT resolution
|
||||||
|
# The _resolve_network_devices helper filters out both after first resolution
|
||||||
|
mock_run_logs.assert_called_once_with(
|
||||||
|
CORE.config, ["192.168.2.50", "192.168.2.51", "192.168.1.100"]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_upload_program_ota_mqtt_timeout_fallback(
|
||||||
|
mock_mqtt_get_ip: Mock,
|
||||||
|
mock_run_ota: Mock,
|
||||||
|
tmp_path: Path,
|
||||||
|
) -> None:
|
||||||
|
"""Test upload_program falls back to other devices when MQTT times out."""
|
||||||
|
setup_core(platform=PLATFORM_ESP32, tmp_path=tmp_path)
|
||||||
|
|
||||||
|
# MQTT times out
|
||||||
|
mock_mqtt_get_ip.side_effect = EsphomeError("Failed to find IP via MQTT")
|
||||||
|
mock_run_ota.return_value = (0, "192.168.1.100")
|
||||||
|
|
||||||
|
config = {
|
||||||
|
CONF_OTA: [
|
||||||
|
{
|
||||||
|
CONF_PLATFORM: CONF_ESPHOME,
|
||||||
|
CONF_PORT: 3232,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
CONF_MQTT: {
|
||||||
|
CONF_BROKER: "mqtt.local",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
args = MockArgs(username="user", password="pass", client_id="client")
|
||||||
|
# Static IP first, MQTTIP second
|
||||||
|
devices = ["192.168.1.100", "MQTTIP"]
|
||||||
|
|
||||||
|
exit_code, host = upload_program(config, args, devices)
|
||||||
|
|
||||||
|
# Should succeed using the static IP even though MQTT failed
|
||||||
|
assert exit_code == 0
|
||||||
|
assert host == "192.168.1.100"
|
||||||
|
|
||||||
|
# Verify MQTT was attempted
|
||||||
|
mock_mqtt_get_ip.assert_called_once_with(config, "user", "pass", "client")
|
||||||
|
|
||||||
|
# Verify espota2.run_ota was called with only the static IP (MQTT failed)
|
||||||
|
expected_firmware = (
|
||||||
|
tmp_path / ".esphome" / "build" / "test" / ".pioenvs" / "test" / "firmware.bin"
|
||||||
|
)
|
||||||
|
mock_run_ota.assert_called_once_with(
|
||||||
|
["192.168.1.100"], 3232, None, expected_firmware
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@patch("esphome.components.api.client.run_logs")
|
||||||
|
def test_show_logs_api_mqtt_timeout_fallback(
|
||||||
|
mock_run_logs: Mock,
|
||||||
|
mock_mqtt_get_ip: Mock,
|
||||||
|
) -> None:
|
||||||
|
"""Test show_logs falls back to other devices when MQTT times out."""
|
||||||
|
setup_core(
|
||||||
|
config={
|
||||||
|
"logger": {},
|
||||||
|
CONF_API: {},
|
||||||
|
CONF_MQTT: {CONF_BROKER: "mqtt.local"},
|
||||||
|
},
|
||||||
|
platform=PLATFORM_ESP32,
|
||||||
|
)
|
||||||
|
mock_run_logs.return_value = 0
|
||||||
|
# MQTT times out
|
||||||
|
mock_mqtt_get_ip.side_effect = EsphomeError("Failed to find IP via MQTT")
|
||||||
|
|
||||||
|
args = MockArgs(username="user", password="pass", client_id="client")
|
||||||
|
# Static IP first, MQTTIP second
|
||||||
|
devices = ["192.168.1.100", "MQTTIP"]
|
||||||
|
|
||||||
|
result = show_logs(CORE.config, args, devices)
|
||||||
|
|
||||||
|
# Should succeed using the static IP even though MQTT failed
|
||||||
|
assert result == 0
|
||||||
|
|
||||||
|
# Verify MQTT was attempted
|
||||||
|
mock_mqtt_get_ip.assert_called_once_with(CORE.config, "user", "pass", "client")
|
||||||
|
|
||||||
|
# Verify run_logs was called with only the static IP (MQTT failed)
|
||||||
|
mock_run_logs.assert_called_once_with(CORE.config, ["192.168.1.100"])
|
||||||
|
|||||||
@@ -2,9 +2,12 @@ import glob
|
|||||||
import logging
|
import logging
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from esphome import yaml_util
|
from esphome import config as config_module, yaml_util
|
||||||
from esphome.components import substitutions
|
from esphome.components import substitutions
|
||||||
from esphome.const import CONF_PACKAGES
|
from esphome.config_helpers import merge_config
|
||||||
|
from esphome.const import CONF_PACKAGES, CONF_SUBSTITUTIONS
|
||||||
|
from esphome.core import CORE
|
||||||
|
from esphome.util import OrderedDict
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -118,3 +121,200 @@ def test_substitutions_fixtures(fixture_path):
|
|||||||
if DEV_MODE:
|
if DEV_MODE:
|
||||||
_LOGGER.error("Tests passed, but Dev mode is enabled.")
|
_LOGGER.error("Tests passed, but Dev mode is enabled.")
|
||||||
assert not DEV_MODE # make sure DEV_MODE is disabled after you are finished.
|
assert not DEV_MODE # make sure DEV_MODE is disabled after you are finished.
|
||||||
|
|
||||||
|
|
||||||
|
def test_substitutions_with_command_line_maintains_ordered_dict() -> None:
|
||||||
|
"""Test that substitutions remain an OrderedDict when command line substitutions are provided,
|
||||||
|
and that move_to_end() can be called successfully.
|
||||||
|
|
||||||
|
This is a regression test for https://github.com/esphome/esphome/issues/11182
|
||||||
|
where the config would become a regular dict and fail when move_to_end() was called.
|
||||||
|
"""
|
||||||
|
# Create an OrderedDict config with substitutions
|
||||||
|
config = OrderedDict()
|
||||||
|
config["esphome"] = {"name": "test"}
|
||||||
|
config[CONF_SUBSTITUTIONS] = {"var1": "value1", "var2": "value2"}
|
||||||
|
config["other_key"] = "other_value"
|
||||||
|
|
||||||
|
# Command line substitutions that should override
|
||||||
|
command_line_subs = {"var2": "override", "var3": "new_value"}
|
||||||
|
|
||||||
|
# Call do_substitution_pass with command line substitutions
|
||||||
|
substitutions.do_substitution_pass(config, command_line_subs)
|
||||||
|
|
||||||
|
# Verify that config is still an OrderedDict
|
||||||
|
assert isinstance(config, OrderedDict), "Config should remain an OrderedDict"
|
||||||
|
|
||||||
|
# Verify substitutions are at the beginning (move_to_end with last=False)
|
||||||
|
keys = list(config.keys())
|
||||||
|
assert keys[0] == CONF_SUBSTITUTIONS, "Substitutions should be first key"
|
||||||
|
|
||||||
|
# Verify substitutions were properly merged
|
||||||
|
assert config[CONF_SUBSTITUTIONS]["var1"] == "value1"
|
||||||
|
assert config[CONF_SUBSTITUTIONS]["var2"] == "override"
|
||||||
|
assert config[CONF_SUBSTITUTIONS]["var3"] == "new_value"
|
||||||
|
|
||||||
|
# Verify config[CONF_SUBSTITUTIONS] is also an OrderedDict
|
||||||
|
assert isinstance(config[CONF_SUBSTITUTIONS], OrderedDict), (
|
||||||
|
"Substitutions should be an OrderedDict"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_substitutions_without_command_line_maintains_ordered_dict() -> None:
|
||||||
|
"""Test that substitutions work correctly without command line substitutions."""
|
||||||
|
config = OrderedDict()
|
||||||
|
config["esphome"] = {"name": "test"}
|
||||||
|
config[CONF_SUBSTITUTIONS] = {"var1": "value1"}
|
||||||
|
config["other_key"] = "other_value"
|
||||||
|
|
||||||
|
# Call without command line substitutions
|
||||||
|
substitutions.do_substitution_pass(config, None)
|
||||||
|
|
||||||
|
# Verify that config is still an OrderedDict
|
||||||
|
assert isinstance(config, OrderedDict), "Config should remain an OrderedDict"
|
||||||
|
|
||||||
|
# Verify substitutions are at the beginning
|
||||||
|
keys = list(config.keys())
|
||||||
|
assert keys[0] == CONF_SUBSTITUTIONS, "Substitutions should be first key"
|
||||||
|
|
||||||
|
|
||||||
|
def test_substitutions_after_merge_config_maintains_ordered_dict() -> None:
|
||||||
|
"""Test that substitutions work after merge_config (packages scenario).
|
||||||
|
|
||||||
|
This is a regression test for https://github.com/esphome/esphome/issues/11182
|
||||||
|
where using packages would cause config to become a regular dict, breaking move_to_end().
|
||||||
|
"""
|
||||||
|
# Simulate what happens with packages - merge two OrderedDict configs
|
||||||
|
base_config = OrderedDict()
|
||||||
|
base_config["esphome"] = {"name": "base"}
|
||||||
|
base_config[CONF_SUBSTITUTIONS] = {"var1": "value1"}
|
||||||
|
|
||||||
|
package_config = OrderedDict()
|
||||||
|
package_config["sensor"] = [{"platform": "template"}]
|
||||||
|
package_config[CONF_SUBSTITUTIONS] = {"var2": "value2"}
|
||||||
|
|
||||||
|
# Merge configs (simulating package merge)
|
||||||
|
merged_config = merge_config(base_config, package_config)
|
||||||
|
|
||||||
|
# Verify merged config is still an OrderedDict
|
||||||
|
assert isinstance(merged_config, OrderedDict), (
|
||||||
|
"Merged config should be an OrderedDict"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Now try to run substitution pass on the merged config
|
||||||
|
substitutions.do_substitution_pass(merged_config, None)
|
||||||
|
|
||||||
|
# Should not raise AttributeError
|
||||||
|
assert isinstance(merged_config, OrderedDict), (
|
||||||
|
"Config should still be OrderedDict after substitution pass"
|
||||||
|
)
|
||||||
|
keys = list(merged_config.keys())
|
||||||
|
assert keys[0] == CONF_SUBSTITUTIONS, "Substitutions should be first key"
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_config_with_command_line_substitutions_maintains_ordered_dict(
|
||||||
|
tmp_path,
|
||||||
|
) -> None:
|
||||||
|
"""Test that validate_config preserves OrderedDict when merging command-line substitutions.
|
||||||
|
|
||||||
|
This tests the code path in config.py where result[CONF_SUBSTITUTIONS] is set
|
||||||
|
using merge_dicts_ordered() with command-line substitutions provided.
|
||||||
|
"""
|
||||||
|
# Create a minimal valid config
|
||||||
|
test_config = OrderedDict()
|
||||||
|
test_config["esphome"] = {"name": "test_device", "platform": "ESP32"}
|
||||||
|
test_config[CONF_SUBSTITUTIONS] = OrderedDict({"var1": "value1", "var2": "value2"})
|
||||||
|
test_config["esp32"] = {"board": "esp32dev"}
|
||||||
|
|
||||||
|
# Command line substitutions that should override
|
||||||
|
command_line_subs = {"var2": "override", "var3": "new_value"}
|
||||||
|
|
||||||
|
# Set up CORE for the test with a proper Path object
|
||||||
|
test_yaml = tmp_path / "test.yaml"
|
||||||
|
test_yaml.write_text("# test config")
|
||||||
|
CORE.config_path = test_yaml
|
||||||
|
|
||||||
|
# Call validate_config with command line substitutions
|
||||||
|
result = config_module.validate_config(test_config, command_line_subs)
|
||||||
|
|
||||||
|
# Verify that result[CONF_SUBSTITUTIONS] is an OrderedDict
|
||||||
|
assert isinstance(result.get(CONF_SUBSTITUTIONS), OrderedDict), (
|
||||||
|
"Result substitutions should be an OrderedDict"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify substitutions were properly merged
|
||||||
|
assert result[CONF_SUBSTITUTIONS]["var1"] == "value1"
|
||||||
|
assert result[CONF_SUBSTITUTIONS]["var2"] == "override"
|
||||||
|
assert result[CONF_SUBSTITUTIONS]["var3"] == "new_value"
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_config_without_command_line_substitutions_maintains_ordered_dict(
|
||||||
|
tmp_path,
|
||||||
|
) -> None:
|
||||||
|
"""Test that validate_config preserves OrderedDict without command-line substitutions.
|
||||||
|
|
||||||
|
This tests the code path in config.py where result[CONF_SUBSTITUTIONS] is set
|
||||||
|
using merge_dicts_ordered() when command_line_substitutions is None.
|
||||||
|
"""
|
||||||
|
# Create a minimal valid config
|
||||||
|
test_config = OrderedDict()
|
||||||
|
test_config["esphome"] = {"name": "test_device", "platform": "ESP32"}
|
||||||
|
test_config[CONF_SUBSTITUTIONS] = OrderedDict({"var1": "value1", "var2": "value2"})
|
||||||
|
test_config["esp32"] = {"board": "esp32dev"}
|
||||||
|
|
||||||
|
# Set up CORE for the test with a proper Path object
|
||||||
|
test_yaml = tmp_path / "test.yaml"
|
||||||
|
test_yaml.write_text("# test config")
|
||||||
|
CORE.config_path = test_yaml
|
||||||
|
|
||||||
|
# Call validate_config without command line substitutions
|
||||||
|
result = config_module.validate_config(test_config, None)
|
||||||
|
|
||||||
|
# Verify that result[CONF_SUBSTITUTIONS] is an OrderedDict
|
||||||
|
assert isinstance(result.get(CONF_SUBSTITUTIONS), OrderedDict), (
|
||||||
|
"Result substitutions should be an OrderedDict"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify substitutions are unchanged
|
||||||
|
assert result[CONF_SUBSTITUTIONS]["var1"] == "value1"
|
||||||
|
assert result[CONF_SUBSTITUTIONS]["var2"] == "value2"
|
||||||
|
|
||||||
|
|
||||||
|
def test_merge_config_preserves_ordered_dict() -> None:
|
||||||
|
"""Test that merge_config preserves OrderedDict type.
|
||||||
|
|
||||||
|
This is a regression test to ensure merge_config doesn't lose OrderedDict type
|
||||||
|
when merging configs, which causes AttributeError on move_to_end().
|
||||||
|
"""
|
||||||
|
# Test OrderedDict + dict = OrderedDict
|
||||||
|
od = OrderedDict([("a", 1), ("b", 2)])
|
||||||
|
d = {"b": 20, "c": 3}
|
||||||
|
result = merge_config(od, d)
|
||||||
|
assert isinstance(result, OrderedDict), (
|
||||||
|
"OrderedDict + dict should return OrderedDict"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test dict + OrderedDict = OrderedDict
|
||||||
|
d = {"a": 1, "b": 2}
|
||||||
|
od = OrderedDict([("b", 20), ("c", 3)])
|
||||||
|
result = merge_config(d, od)
|
||||||
|
assert isinstance(result, OrderedDict), (
|
||||||
|
"dict + OrderedDict should return OrderedDict"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test OrderedDict + OrderedDict = OrderedDict
|
||||||
|
od1 = OrderedDict([("a", 1), ("b", 2)])
|
||||||
|
od2 = OrderedDict([("b", 20), ("c", 3)])
|
||||||
|
result = merge_config(od1, od2)
|
||||||
|
assert isinstance(result, OrderedDict), (
|
||||||
|
"OrderedDict + OrderedDict should return OrderedDict"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test that dict + dict still returns regular dict (no unnecessary conversion)
|
||||||
|
d1 = {"a": 1, "b": 2}
|
||||||
|
d2 = {"b": 20, "c": 3}
|
||||||
|
result = merge_config(d1, d2)
|
||||||
|
assert isinstance(result, dict), "dict + dict should return dict"
|
||||||
|
assert not isinstance(result, OrderedDict), (
|
||||||
|
"dict + dict should not return OrderedDict"
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user