1
0
mirror of https://github.com/esphome/esphome.git synced 2025-11-01 23:51:47 +00:00

Compare commits

..

22 Commits

Author SHA1 Message Date
Jesse Hills
2a4ab6a811 Merge pull request #10725 from esphome/bump-2025.9.0b2
2025.9.0b2
2025-09-15 18:28:51 +12:00
Jesse Hills
971de64494 Bump version to 2025.9.0b2 2025-09-15 12:34:56 +12:00
J. Nick Koston
926fdcbecd [esp32_ble] Optimize BLE hex formatting to eliminate sprintf dependency (#10714)
Co-authored-by: Jesse Hills <3060199+jesserockz@users.noreply.github.com>
2025-09-15 12:34:56 +12:00
J. Nick Koston
6b147312cd [wifi] Optimize WiFi MAC formatting to eliminate sprintf dependency (#10715)
Co-authored-by: Jesse Hills <3060199+jesserockz@users.noreply.github.com>
2025-09-15 12:34:56 +12:00
J. Nick Koston
2d9152d9b9 [md5] Optimize MD5::get_hex() to eliminate sprintf dependency (#10710) 2025-09-15 12:34:56 +12:00
dependabot[bot]
24f9550ce5 Bump aioesphomeapi from 40.2.0 to 40.2.1 (#10721)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-15 12:34:56 +12:00
Big Mike
3427aaab8c ina2xx should be total increasing for energy sensor (#10711) 2025-09-15 12:34:56 +12:00
J. Nick Koston
4e17d14acc [scheduler] Fix timing accumulation in scheduler causing incorrect execution measurements (#10719) 2025-09-15 12:34:56 +12:00
J. Nick Koston
1750f02ef3 [api] Optimize HelloResponse server_info to reduce memory usage (#10701) 2025-09-15 12:34:56 +12:00
J. Nick Koston
ae158179bd [api] Revert unneeded GetTime bidirectional support added in #9790 (#10702) 2025-09-15 12:34:55 +12:00
J. Nick Koston
c601494779 [core] Optimize MAC address formatting to eliminate sprintf dependency (#10713) 2025-09-15 12:34:55 +12:00
J. Nick Koston
646f4e66be [ethernet] Fix permanent component failure from undocumented ESP_FAIL in IPv6 setup (#10708) 2025-09-15 12:34:55 +12:00
dependabot[bot]
5b5e5c213c Bump aioesphomeapi from 40.1.0 to 40.2.0 (#10703)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-15 12:34:55 +12:00
Markus
46235684b1 [core] fix upload to device via MQTT IP lookup (e.g. when mDNS is disable) (#10632)
Co-authored-by: J. Nick Koston <nick@koston.org>
Co-authored-by: J. Nick Koston <nick@home-assistant.io>
Co-authored-by: J. Nick Koston <nick+github@koston.org>
2025-09-15 12:34:55 +12:00
J. Nick Koston
5b702a1efa Add additional dashboard and main tests (#10688)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-09-15 12:34:55 +12:00
J. Nick Koston
56e9fd2e38 [tests] Add upload_program and show_logs test coverage to prevent regressions (#10684) 2025-09-15 12:34:55 +12:00
J. Nick Koston
65f15a706f Add some more coverage for dashboard web_server (#10682) 2025-09-15 12:34:55 +12:00
J. Nick Koston
eee64cc3a6 Add comprehensive tests for choose_upload_log_host to prevent regressions (#10679)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: pre-commit-ci-lite[bot] <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com>
2025-09-15 12:34:55 +12:00
J. Nick Koston
f43fb3c3a3 [core] Add millisecond precision to logging timestamps (#10677) 2025-09-15 12:34:55 +12:00
rwrozelle
79b0025fe6 Openthread Fix Factory Reset (#9281)
Co-authored-by: Jesse Hills <3060199+jesserockz@users.noreply.github.com>
2025-09-15 12:34:55 +12:00
Jesse Hills
c6a039a72f [adc] Fix FILTER_SOURCE_FILES location (#10673) 2025-09-15 12:34:54 +12:00
esphomebot
6f1fa094c2 Update webserver local assets to 20250910-110003 (#10668) 2025-09-15 12:34:54 +12:00
63 changed files with 533 additions and 2611 deletions

View File

@@ -11,7 +11,7 @@ ci:
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.13.0
rev: v0.12.12
hooks:
# Run the linter.
- id: ruff

View File

@@ -139,7 +139,6 @@ esphome/components/ens160_base/* @latonita @vincentscode
esphome/components/ens160_i2c/* @latonita
esphome/components/ens160_spi/* @latonita
esphome/components/ens210/* @itn3rd77
esphome/components/epdiy/* @jesserockz
esphome/components/es7210/* @kahrendt
esphome/components/es7243e/* @kbx81
esphome/components/es8156/* @kbx81

View File

@@ -48,7 +48,7 @@ PROJECT_NAME = ESPHome
# could be handy for archiving the generated documentation or if some version
# control system is used.
PROJECT_NUMBER = 2025.10.0-dev
PROJECT_NUMBER = 2025.9.0b2
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a

View File

@@ -27,9 +27,6 @@ service APIConnection {
rpc subscribe_logs (SubscribeLogsRequest) returns (void) {}
rpc subscribe_homeassistant_services (SubscribeHomeassistantServicesRequest) returns (void) {}
rpc subscribe_home_assistant_states (SubscribeHomeAssistantStatesRequest) returns (void) {}
rpc get_time (GetTimeRequest) returns (GetTimeResponse) {
option (needs_authentication) = false;
}
rpc execute_service (ExecuteServiceRequest) returns (void) {}
rpc noise_encryption_set_key (NoiseEncryptionSetKeyRequest) returns (NoiseEncryptionSetKeyResponse) {}
@@ -809,12 +806,12 @@ message HomeAssistantStateResponse {
// ==================== IMPORT TIME ====================
message GetTimeRequest {
option (id) = 36;
option (source) = SOURCE_BOTH;
option (source) = SOURCE_SERVER;
}
message GetTimeResponse {
option (id) = 37;
option (source) = SOURCE_BOTH;
option (source) = SOURCE_CLIENT;
option (no_delay) = true;
fixed32 epoch_seconds = 1;

View File

@@ -42,6 +42,8 @@ static constexpr uint8_t MAX_PING_RETRIES = 60;
static constexpr uint16_t PING_RETRY_INTERVAL = 1000;
static constexpr uint32_t KEEPALIVE_DISCONNECT_TIMEOUT = (KEEPALIVE_TIMEOUT_MS * 5) / 2;
static constexpr auto ESPHOME_VERSION_REF = StringRef::from_lit(ESPHOME_VERSION);
static const char *const TAG = "api.connection";
#ifdef USE_CAMERA
static const int CAMERA_STOP_STREAM = 5000;
@@ -1081,12 +1083,6 @@ void APIConnection::on_get_time_response(const GetTimeResponse &value) {
}
#endif
bool APIConnection::send_get_time_response(const GetTimeRequest &msg) {
GetTimeResponse resp;
resp.epoch_seconds = ::time(nullptr);
return this->send_message(resp, GetTimeResponse::MESSAGE_TYPE);
}
#ifdef USE_BLUETOOTH_PROXY
void APIConnection::subscribe_bluetooth_le_advertisements(const SubscribeBluetoothLEAdvertisementsRequest &msg) {
bluetooth_proxy::global_bluetooth_proxy->subscribe_api_connection(this, msg.flags);
@@ -1376,9 +1372,8 @@ bool APIConnection::send_hello_response(const HelloRequest &msg) {
HelloResponse resp;
resp.api_version_major = 1;
resp.api_version_minor = 12;
// Temporary string for concatenation - will be valid during send_message call
std::string server_info = App.get_name() + " (esphome v" ESPHOME_VERSION ")";
resp.set_server_info(StringRef(server_info));
// Send only the version string - the client only logs this for debugging and doesn't use it otherwise
resp.set_server_info(ESPHOME_VERSION_REF);
resp.set_name(StringRef(App.get_name()));
#ifdef USE_API_PASSWORD
@@ -1425,8 +1420,6 @@ bool APIConnection::send_device_info_response(const DeviceInfoRequest &msg) {
std::string mac_address = get_mac_address_pretty();
resp.set_mac_address(StringRef(mac_address));
// Compile-time StringRef constants
static constexpr auto ESPHOME_VERSION_REF = StringRef::from_lit(ESPHOME_VERSION);
resp.set_esphome_version(ESPHOME_VERSION_REF);
resp.set_compilation_time(App.get_compilation_time_ref());

View File

@@ -219,7 +219,6 @@ class APIConnection final : public APIServerConnection {
#ifdef USE_API_HOMEASSISTANT_STATES
void subscribe_home_assistant_states(const SubscribeHomeAssistantStatesRequest &msg) override;
#endif
bool send_get_time_response(const GetTimeRequest &msg) override;
#ifdef USE_API_SERVICES
void execute_service(const ExecuteServiceRequest &msg) override;
#endif

View File

@@ -921,14 +921,6 @@ bool GetTimeResponse::decode_32bit(uint32_t field_id, Proto32Bit value) {
}
return true;
}
void GetTimeResponse::encode(ProtoWriteBuffer buffer) const {
buffer.encode_fixed32(1, this->epoch_seconds);
buffer.encode_string(2, this->timezone_ref_);
}
void GetTimeResponse::calculate_size(ProtoSize &size) const {
size.add_fixed32(1, this->epoch_seconds);
size.add_length(1, this->timezone_ref_.size());
}
#ifdef USE_API_SERVICES
void ListEntitiesServicesArgument::encode(ProtoWriteBuffer buffer) const {
buffer.encode_string(1, this->name_ref_);

View File

@@ -1180,10 +1180,6 @@ class GetTimeResponse final : public ProtoDecodableMessage {
#endif
uint32_t epoch_seconds{0};
std::string timezone{};
StringRef timezone_ref_{};
void set_timezone(const StringRef &ref) { this->timezone_ref_ = ref; }
void encode(ProtoWriteBuffer buffer) const override;
void calculate_size(ProtoSize &size) const override;
#ifdef HAS_PROTO_MESSAGE_DUMP
void dump_to(std::string &out) const override;
#endif

View File

@@ -1113,13 +1113,7 @@ void GetTimeRequest::dump_to(std::string &out) const { out.append("GetTimeReques
void GetTimeResponse::dump_to(std::string &out) const {
MessageDumpHelper helper(out, "GetTimeResponse");
dump_field(out, "epoch_seconds", this->epoch_seconds);
out.append(" timezone: ");
if (!this->timezone_ref_.empty()) {
out.append("'").append(this->timezone_ref_.c_str()).append("'");
} else {
out.append("'").append(this->timezone).append("'");
}
out.append("\n");
dump_field(out, "timezone", this->timezone);
}
#ifdef USE_API_SERVICES
void ListEntitiesServicesArgument::dump_to(std::string &out) const {

View File

@@ -160,15 +160,6 @@ void APIServerConnectionBase::read_message(uint32_t msg_size, uint32_t msg_type,
break;
}
#endif
case GetTimeRequest::MESSAGE_TYPE: {
GetTimeRequest msg;
// Empty message: no decode needed
#ifdef HAS_PROTO_MESSAGE_DUMP
ESP_LOGVV(TAG, "on_get_time_request: %s", msg.dump().c_str());
#endif
this->on_get_time_request(msg);
break;
}
case GetTimeResponse::MESSAGE_TYPE: {
GetTimeResponse msg;
msg.decode(msg_data, msg_size);
@@ -656,11 +647,6 @@ void APIServerConnection::on_subscribe_home_assistant_states_request(const Subsc
}
}
#endif
void APIServerConnection::on_get_time_request(const GetTimeRequest &msg) {
if (this->check_connection_setup_() && !this->send_get_time_response(msg)) {
this->on_fatal_error();
}
}
#ifdef USE_API_SERVICES
void APIServerConnection::on_execute_service_request(const ExecuteServiceRequest &msg) {
if (this->check_authenticated_()) {

View File

@@ -71,7 +71,7 @@ class APIServerConnectionBase : public ProtoService {
#ifdef USE_API_HOMEASSISTANT_STATES
virtual void on_home_assistant_state_response(const HomeAssistantStateResponse &value){};
#endif
virtual void on_get_time_request(const GetTimeRequest &value){};
virtual void on_get_time_response(const GetTimeResponse &value){};
#ifdef USE_API_SERVICES
@@ -226,7 +226,6 @@ class APIServerConnection : public APIServerConnectionBase {
#ifdef USE_API_HOMEASSISTANT_STATES
virtual void subscribe_home_assistant_states(const SubscribeHomeAssistantStatesRequest &msg) = 0;
#endif
virtual bool send_get_time_response(const GetTimeRequest &msg) = 0;
#ifdef USE_API_SERVICES
virtual void execute_service(const ExecuteServiceRequest &msg) = 0;
#endif
@@ -348,7 +347,6 @@ class APIServerConnection : public APIServerConnectionBase {
#ifdef USE_API_HOMEASSISTANT_STATES
void on_subscribe_home_assistant_states_request(const SubscribeHomeAssistantStatesRequest &msg) override;
#endif
void on_get_time_request(const GetTimeRequest &msg) override;
#ifdef USE_API_SERVICES
void on_execute_service_request(const ExecuteServiceRequest &msg) override;
#endif

View File

@@ -130,7 +130,9 @@ class BluetoothProxy final : public esp32_ble_tracker::ESPBTDeviceListener, publ
std::string get_bluetooth_mac_address_pretty() {
const uint8_t *mac = esp_bt_dev_get_address();
return str_snprintf("%02X:%02X:%02X:%02X:%02X:%02X", 17, mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
char buf[18];
format_mac_addr_upper(mac, buf);
return std::string(buf);
}
protected:

View File

@@ -1,107 +0,0 @@
import esphome.codegen as cg
from esphome.components import display, esp32
import esphome.config_validation as cv
from esphome.const import (
CONF_FULL_UPDATE_EVERY,
CONF_ID,
CONF_LAMBDA,
CONF_MODEL,
CONF_PAGES,
)
from esphome.cpp_generator import MockObj
CODEOWNERS = ["@jesserockz"]
DEPENDENCIES = ["esp32", "psram"]
CONF_POWER_OFF_DELAY_ENABLED = "power_off_delay_enabled"
epdiy_ns = cg.esphome_ns.namespace("epdiy")
EPDiyDisplay = epdiy_ns.class_("EPDiyDisplay", display.Display)
class EpdBoardDefinition(MockObj):
def __str__(self):
return f"&{self.base}"
class EpdDisplay_t(MockObj):
def __str__(self):
return f"&{self.base}"
EpdInitOptions = cg.global_ns.enum("EpdInitOptions")
class Model:
def __init__(
self,
*,
board_definition: MockObj,
display_t: MockObj,
init_options: MockObj,
width: int,
height: int,
vcom_mv: int = 0,
):
self.board_definition = board_definition
self.display_t = display_t
self.init_options = init_options
self.width = width
self.height = height
self.vcom_mv = vcom_mv
MODELS: dict[str, Model] = {
"lilygo_t5_4.7": Model(
board_definition=EpdBoardDefinition("epd_board_lilygo_t5_47"),
display_t=EpdDisplay_t("ED047TC2"),
init_options=(EpdInitOptions.EPD_LUT_64K, EpdInitOptions.EPD_FEED_QUEUE_8),
width=960,
height=540,
),
}
CONFIG_SCHEMA = cv.All(
display.FULL_DISPLAY_SCHEMA.extend(
{
cv.GenerateID(): cv.declare_id(EPDiyDisplay),
cv.Required(CONF_MODEL): cv.one_of(*MODELS.keys()),
cv.Optional(CONF_FULL_UPDATE_EVERY, default=10): cv.uint32_t,
cv.Optional(CONF_POWER_OFF_DELAY_ENABLED, default=False): cv.boolean,
}
).extend(cv.polling_component_schema("60s")),
cv.has_at_most_one_key(CONF_PAGES, CONF_LAMBDA),
cv.only_with_esp_idf, # When trying to add library via platformio it breaks, using as an idf component works fine
)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await display.register_display(var, config)
model = MODELS[config[CONF_MODEL]]
cg.add(
var.set_model_details(
model.board_definition,
model.display_t,
cg.RawExpression(
f"static_cast<EpdInitOptions>({'|'.join(str(o) for o in model.init_options)})"
),
model.vcom_mv,
)
)
if CONF_LAMBDA in config:
lambda_ = await cg.process_lambda(
config[CONF_LAMBDA], [(display.DisplayRef, "it")], return_type=cg.void
)
cg.add(var.set_writer(lambda_))
cg.add(var.set_power_off_delay_enabled(config[CONF_POWER_OFF_DELAY_ENABLED]))
esp32.add_idf_component(
name="vroland/epdiy",
repo="https://github.com/vroland/epdiy",
ref="c61e9e923ce2418150d54f88cea5d196cdc40c54",
)

View File

@@ -1,76 +0,0 @@
#include "epdiy_display.h"
#include "esphome/core/application.h"
#ifdef USE_ESP_IDF
#include "esphome/core/log.h"
namespace esphome::epdiy {
static const char *const TAG = "epdiy";
static constexpr uint8_t TEMPERATURE = 23; // default temperature for e-paper displays
float EPDiyDisplay::get_setup_priority() const { return esphome::setup_priority::LATE; }
void EPDiyDisplay::setup() {
epd_init(this->board_definition_, this->display_t_, this->init_options_);
if (this->vcom_mv_ != 0) {
epd_set_vcom(this->vcom_mv_);
}
this->state_ = epd_hl_init(nullptr);
this->framebuffer_ = epd_hl_get_framebuffer(&this->state_);
}
void EPDiyDisplay::update() {
this->do_update_();
this->defer([this]() { this->flush_screen_changes_(); });
}
void EPDiyDisplay::fill(Color color) {
if (color == display::COLOR_OFF) {
memset(this->framebuffer_, 0xFF, this->get_buffer_length());
epd_poweron();
epd_hl_update_screen(&this->state_, MODE_GC16, TEMPERATURE);
epd_clear();
epd_poweroff();
App.feed_wdt();
} else {
Display::fill(color);
}
}
void EPDiyDisplay::flush_screen_changes_() {
epd_poweron();
epd_hl_update_screen(&this->state_, MODE_GC16, TEMPERATURE);
memset(this->state_.back_fb, 0xFF, this->get_buffer_length());
uint16_t delay = 0;
if (this->power_off_delay_enabled_) {
delay = 700;
}
this->set_timeout("poweroff", delay, []() { epd_poweroff(); });
}
void EPDiyDisplay::on_shutdown() {
epd_poweroff();
epd_deinit();
}
void HOT EPDiyDisplay::draw_pixel_at(int x, int y, Color color) {
if (color.red == 255 && color.green == 255 && color.blue == 255) {
epd_draw_pixel(x, y, 0, this->framebuffer_);
} else {
int col = (0.2126 * color.red) + (0.7152 * color.green) + (0.0722 * color.blue);
int cl = 255 - col;
epd_draw_pixel(x, y, cl, this->framebuffer_);
}
}
} // namespace esphome::epdiy
#endif // USE_ESP_IDF

View File

@@ -1,63 +0,0 @@
#pragma once
#ifdef USE_ESP_IDF
#include "esphome/core/component.h"
#include "esphome/components/display/display_buffer.h"
#include "esphome/components/display/display_color_utils.h"
#include "esphome/core/hal.h"
#include "esphome/core/version.h"
#include "epd_display.h"
#include "epd_highlevel.h"
namespace esphome::epdiy {
class EPDiyDisplay : public display::Display {
public:
float get_setup_priority() const override;
void setup() override;
void update() override;
void on_shutdown() override;
display::DisplayType get_display_type() override { return display::DisplayType::DISPLAY_TYPE_GRAYSCALE; }
int get_width_internal() override { return this->display_t_->width; };
int get_height_internal() override { return this->display_t_->height; };
size_t get_buffer_length() const { return this->display_t_->width / 2 * this->display_t_->height; }
void set_power_off_delay_enabled(bool power_off_delay_enabled) {
this->power_off_delay_enabled_ = power_off_delay_enabled;
}
void set_model_details(const EpdBoardDefinition *board_definition, const EpdDisplay_t *display_t,
enum EpdInitOptions init_options, uint16_t vcom) {
this->board_definition_ = board_definition;
this->display_t_ = display_t;
this->init_options_ = init_options;
this->vcom_mv_ = vcom;
}
void fill(Color color) override;
void draw_pixel_at(int x, int y, Color color) override;
protected:
void flush_screen_changes_();
EpdiyHighlevelState state_;
uint8_t *framebuffer_;
const EpdBoardDefinition *board_definition_;
const EpdDisplay_t *display_t_;
enum EpdInitOptions init_options_;
uint16_t vcom_mv_;
bool power_off_delay_enabled_;
};
} // namespace esphome::epdiy
#endif // USE_ESP_IDF

View File

@@ -353,7 +353,6 @@ SUPPORTED_PLATFORMIO_ESP_IDF_5X = [
# pioarduino versions that don't require a release number
# List based on https://github.com/pioarduino/esp-idf/releases
SUPPORTED_PIOARDUINO_ESP_IDF_5X = [
cv.Version(5, 5, 1),
cv.Version(5, 5, 0),
cv.Version(5, 4, 2),
cv.Version(5, 4, 1),

View File

@@ -7,6 +7,7 @@
#include <cstdio>
#include <cinttypes>
#include "esphome/core/log.h"
#include "esphome/core/helpers.h"
namespace esphome::esp32_ble {
@@ -169,22 +170,42 @@ bool ESPBTUUID::operator==(const ESPBTUUID &uuid) const {
}
esp_bt_uuid_t ESPBTUUID::get_uuid() const { return this->uuid_; }
std::string ESPBTUUID::to_string() const {
char buf[40]; // Enough for 128-bit UUID with dashes
char *pos = buf;
switch (this->uuid_.len) {
case ESP_UUID_LEN_16:
return str_snprintf("0x%02X%02X", 6, this->uuid_.uuid.uuid16 >> 8, this->uuid_.uuid.uuid16 & 0xff);
*pos++ = '0';
*pos++ = 'x';
*pos++ = format_hex_pretty_char(this->uuid_.uuid.uuid16 >> 12);
*pos++ = format_hex_pretty_char((this->uuid_.uuid.uuid16 >> 8) & 0x0F);
*pos++ = format_hex_pretty_char((this->uuid_.uuid.uuid16 >> 4) & 0x0F);
*pos++ = format_hex_pretty_char(this->uuid_.uuid.uuid16 & 0x0F);
*pos = '\0';
return std::string(buf);
case ESP_UUID_LEN_32:
return str_snprintf("0x%02" PRIX32 "%02" PRIX32 "%02" PRIX32 "%02" PRIX32, 10, (this->uuid_.uuid.uuid32 >> 24),
(this->uuid_.uuid.uuid32 >> 16 & 0xff), (this->uuid_.uuid.uuid32 >> 8 & 0xff),
this->uuid_.uuid.uuid32 & 0xff);
*pos++ = '0';
*pos++ = 'x';
for (int shift = 28; shift >= 0; shift -= 4) {
*pos++ = format_hex_pretty_char((this->uuid_.uuid.uuid32 >> shift) & 0x0F);
}
*pos = '\0';
return std::string(buf);
default:
case ESP_UUID_LEN_128:
std::string buf;
// Format: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
for (int8_t i = 15; i >= 0; i--) {
buf += str_snprintf("%02X", 2, this->uuid_.uuid.uuid128[i]);
if (i == 6 || i == 8 || i == 10 || i == 12)
buf += "-";
uint8_t byte = this->uuid_.uuid.uuid128[i];
*pos++ = format_hex_pretty_char(byte >> 4);
*pos++ = format_hex_pretty_char(byte & 0x0F);
if (i == 12 || i == 10 || i == 8 || i == 6) {
*pos++ = '-';
}
}
return buf;
*pos = '\0';
return std::string(buf);
}
return "";
}

View File

@@ -31,12 +31,13 @@ void ESP32BLEBeacon::dump_config() {
char uuid[37];
char *bpos = uuid;
for (int8_t ii = 0; ii < 16; ++ii) {
bpos += sprintf(bpos, "%02X", this->uuid_[ii]);
*bpos++ = format_hex_pretty_char(this->uuid_[ii] >> 4);
*bpos++ = format_hex_pretty_char(this->uuid_[ii] & 0x0F);
if (ii == 3 || ii == 5 || ii == 7 || ii == 9) {
bpos += sprintf(bpos, "-");
*bpos++ = '-';
}
}
uuid[36] = '\0';
*bpos = '\0';
ESP_LOGCONFIG(TAG,
" UUID: %s, Major: %u, Minor: %u, Min Interval: %ums, Max Interval: %ums, Measured Power: %d"
", TX Power: %ddBm",

View File

@@ -43,6 +43,13 @@ void BLEClientBase::setup() {
void BLEClientBase::set_state(espbt::ClientState st) {
ESP_LOGV(TAG, "[%d] [%s] Set state %d", this->connection_index_, this->address_str_.c_str(), (int) st);
ESPBTClient::set_state(st);
if (st == espbt::ClientState::READY_TO_CONNECT) {
// Enable loop for state processing
this->enable_loop();
// Connect immediately instead of waiting for next loop
this->connect();
}
}
void BLEClientBase::loop() {
@@ -58,8 +65,8 @@ void BLEClientBase::loop() {
}
this->set_state(espbt::ClientState::IDLE);
}
// If idle, we can disable the loop as connect()
// will enable it again when a connection is needed.
// If its idle, we can disable the loop as set_state
// will enable it again when we need to connect.
else if (this->state_ == espbt::ClientState::IDLE) {
this->disable_loop();
}
@@ -101,20 +108,9 @@ bool BLEClientBase::parse_device(const espbt::ESPBTDevice &device) {
#endif
void BLEClientBase::connect() {
// Prevent duplicate connection attempts
if (this->state_ == espbt::ClientState::CONNECTING || this->state_ == espbt::ClientState::CONNECTED ||
this->state_ == espbt::ClientState::ESTABLISHED) {
ESP_LOGW(TAG, "[%d] [%s] Connection already in progress, state=%s", this->connection_index_,
this->address_str_.c_str(), espbt::client_state_to_string(this->state_));
return;
}
ESP_LOGI(TAG, "[%d] [%s] 0x%02x Connecting", this->connection_index_, this->address_str_.c_str(),
this->remote_addr_type_);
this->paired_ = false;
// Enable loop for state processing
this->enable_loop();
// Immediately transition to CONNECTING to prevent duplicate connection attempts
this->set_state(espbt::ClientState::CONNECTING);
// Determine connection parameters based on connection type
if (this->connection_type_ == espbt::ConnectionType::V3_WITHOUT_CACHE) {
@@ -172,7 +168,7 @@ void BLEClientBase::unconditional_disconnect() {
this->log_gattc_warning_("esp_ble_gattc_close", err);
}
if (this->state_ == espbt::ClientState::DISCOVERED) {
if (this->state_ == espbt::ClientState::READY_TO_CONNECT || this->state_ == espbt::ClientState::DISCOVERED) {
this->set_address(0);
this->set_state(espbt::ClientState::IDLE);
} else {
@@ -216,6 +212,8 @@ void BLEClientBase::handle_connection_result_(esp_err_t ret) {
if (ret) {
this->log_gattc_warning_("esp_ble_gattc_open", ret);
this->set_state(espbt::ClientState::IDLE);
} else {
this->set_state(espbt::ClientState::CONNECTING);
}
}

View File

@@ -60,11 +60,14 @@ class BLEClientBase : public espbt::ESPBTClient, public Component {
if (address == 0) {
this->address_str_ = "";
} else {
this->address_str_ =
str_snprintf("%02X:%02X:%02X:%02X:%02X:%02X", 17, (uint8_t) (this->address_ >> 40) & 0xff,
(uint8_t) (this->address_ >> 32) & 0xff, (uint8_t) (this->address_ >> 24) & 0xff,
(uint8_t) (this->address_ >> 16) & 0xff, (uint8_t) (this->address_ >> 8) & 0xff,
(uint8_t) (this->address_ >> 0) & 0xff);
char buf[18];
uint8_t mac[6] = {
(uint8_t) ((this->address_ >> 40) & 0xff), (uint8_t) ((this->address_ >> 32) & 0xff),
(uint8_t) ((this->address_ >> 24) & 0xff), (uint8_t) ((this->address_ >> 16) & 0xff),
(uint8_t) ((this->address_ >> 8) & 0xff), (uint8_t) ((this->address_ >> 0) & 0xff),
};
format_mac_addr_upper(mac, buf);
this->address_str_ = buf;
}
}
const std::string &address_str() const { return this->address_str_; }

View File

@@ -51,6 +51,8 @@ const char *client_state_to_string(ClientState state) {
return "IDLE";
case ClientState::DISCOVERED:
return "DISCOVERED";
case ClientState::READY_TO_CONNECT:
return "READY_TO_CONNECT";
case ClientState::CONNECTING:
return "CONNECTING";
case ClientState::CONNECTED:
@@ -605,9 +607,8 @@ void ESPBTDevice::parse_adv_(const uint8_t *payload, uint8_t len) {
}
std::string ESPBTDevice::address_str() const {
char mac[24];
snprintf(mac, sizeof(mac), "%02X:%02X:%02X:%02X:%02X:%02X", this->address_[0], this->address_[1], this->address_[2],
this->address_[3], this->address_[4], this->address_[5]);
char mac[18];
format_mac_addr_upper(this->address_, mac);
return mac;
}
@@ -793,7 +794,7 @@ void ESP32BLETracker::try_promote_discovered_clients_() {
#ifdef USE_ESP32_BLE_SOFTWARE_COEXISTENCE
this->update_coex_preference_(true);
#endif
client->connect();
client->set_state(ClientState::READY_TO_CONNECT);
break;
}
}

View File

@@ -159,6 +159,8 @@ enum class ClientState : uint8_t {
IDLE,
// Device advertisement found.
DISCOVERED,
// Device is discovered and the scanner is stopped
READY_TO_CONNECT,
// Connection in progress.
CONNECTING,
// Initial connection established.
@@ -311,6 +313,7 @@ class ESP32BLETracker : public Component,
counts.discovered++;
break;
case ClientState::CONNECTING:
case ClientState::READY_TO_CONNECT:
counts.connecting++;
break;
default:

View File

@@ -300,6 +300,7 @@ void EthernetComponent::loop() {
this->state_ = EthernetComponentState::CONNECTING;
this->start_connect_();
} else {
this->finish_connect_();
// When connected and stable, disable the loop to save CPU cycles
this->disable_loop();
}
@@ -486,10 +487,35 @@ void EthernetComponent::got_ip6_event_handler(void *arg, esp_event_base_t event_
}
#endif /* USE_NETWORK_IPV6 */
void EthernetComponent::finish_connect_() {
#if USE_NETWORK_IPV6
// Retry IPv6 link-local setup if it failed during initial connect
// This handles the case where min_ipv6_addr_count is NOT set (or is 0),
// allowing us to reach CONNECTED state with just IPv4.
// If IPv6 setup failed in start_connect_() because the interface wasn't ready:
// - Bootup timing issues (#10281)
// - Cable unplugged/network interruption (#10705)
// We can now retry since we're in CONNECTED state and the interface is definitely up.
if (!this->ipv6_setup_done_) {
esp_err_t err = esp_netif_create_ip6_linklocal(this->eth_netif_);
if (err == ESP_OK) {
ESP_LOGD(TAG, "IPv6 link-local address created (retry succeeded)");
}
// Always set the flag to prevent continuous retries
// If IPv6 setup fails here with the interface up and stable, it's
// likely a persistent issue (IPv6 disabled at router, hardware
// limitation, etc.) that won't be resolved by further retries.
// The device continues to work with IPv4.
this->ipv6_setup_done_ = true;
}
#endif /* USE_NETWORK_IPV6 */
}
void EthernetComponent::start_connect_() {
global_eth_component->got_ipv4_address_ = false;
#if USE_NETWORK_IPV6
global_eth_component->ipv6_count_ = 0;
this->ipv6_setup_done_ = false;
#endif /* USE_NETWORK_IPV6 */
this->connect_begin_ = millis();
this->status_set_warning(LOG_STR("waiting for IP configuration"));
@@ -545,9 +571,27 @@ void EthernetComponent::start_connect_() {
}
}
#if USE_NETWORK_IPV6
// Attempt to create IPv6 link-local address
// We MUST attempt this here, not just in finish_connect_(), because with
// min_ipv6_addr_count set, the component won't reach CONNECTED state without IPv6.
// However, this may fail with ESP_FAIL if the interface is not up yet:
// - At bootup when link isn't ready (#10281)
// - After disconnection/cable unplugged (#10705)
// We'll retry in finish_connect_() if it fails here.
err = esp_netif_create_ip6_linklocal(this->eth_netif_);
if (err != ESP_OK) {
ESPHL_ERROR_CHECK(err, "Enable IPv6 link local failed");
if (err == ESP_ERR_ESP_NETIF_INVALID_PARAMS) {
// This is a programming error, not a transient failure
ESPHL_ERROR_CHECK(err, "esp_netif_create_ip6_linklocal invalid parameters");
} else {
// ESP_FAIL means the interface isn't up yet
// This is expected and non-fatal, happens in multiple scenarios:
// - During reconnection after network interruptions (#10705)
// - At bootup when the link isn't ready yet (#10281)
// We'll retry once we reach CONNECTED state and the interface is up
ESP_LOGW(TAG, "esp_netif_create_ip6_linklocal failed: %s", esp_err_to_name(err));
// Don't mark component as failed - this is a transient error
}
}
#endif /* USE_NETWORK_IPV6 */
@@ -638,7 +682,9 @@ void EthernetComponent::get_eth_mac_address_raw(uint8_t *mac) {
std::string EthernetComponent::get_eth_mac_address_pretty() {
uint8_t mac[6];
get_eth_mac_address_raw(mac);
return str_snprintf("%02X:%02X:%02X:%02X:%02X:%02X", 17, mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
char buf[18];
format_mac_addr_upper(mac, buf);
return std::string(buf);
}
eth_duplex_t EthernetComponent::get_duplex_mode() {

View File

@@ -102,6 +102,7 @@ class EthernetComponent : public Component {
#endif /* LWIP_IPV6 */
void start_connect_();
void finish_connect_();
void dump_connect_params_();
/// @brief Set `RMII Reference Clock Select` bit for KSZ8081.
void ksz8081_set_clock_reference_(esp_eth_mac_t *mac);
@@ -144,6 +145,7 @@ class EthernetComponent : public Component {
bool got_ipv4_address_{false};
#if LWIP_IPV6
uint8_t ipv6_count_{0};
bool ipv6_setup_done_{false};
#endif /* LWIP_IPV6 */
// Pointers at the end (naturally aligned)

View File

@@ -18,6 +18,7 @@ from esphome.const import (
DEVICE_CLASS_TEMPERATURE,
DEVICE_CLASS_VOLTAGE,
STATE_CLASS_MEASUREMENT,
STATE_CLASS_TOTAL_INCREASING,
UNIT_AMPERE,
UNIT_CELSIUS,
UNIT_VOLT,
@@ -162,7 +163,7 @@ INA2XX_SCHEMA = cv.Schema(
unit_of_measurement=UNIT_WATT_HOURS,
accuracy_decimals=8,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_MEASUREMENT,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
key=CONF_NAME,
),
@@ -170,7 +171,8 @@ INA2XX_SCHEMA = cv.Schema(
sensor.sensor_schema(
unit_of_measurement=UNIT_JOULE,
accuracy_decimals=8,
state_class=STATE_CLASS_MEASUREMENT,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
key=CONF_NAME,
),

View File

@@ -1,4 +1,3 @@
#include <cstdio>
#include <cstring>
#include "md5.h"
#ifdef USE_MD5
@@ -44,7 +43,9 @@ void MD5Digest::get_bytes(uint8_t *output) { memcpy(output, this->digest_, 16);
void MD5Digest::get_hex(char *output) {
for (size_t i = 0; i < 16; i++) {
sprintf(output + i * 2, "%02x", this->digest_[i]);
uint8_t byte = this->digest_[i];
output[i * 2] = format_hex_char(byte >> 4);
output[i * 2 + 1] = format_hex_char(byte & 0x0F);
}
}

View File

@@ -270,7 +270,6 @@ void PacketTransport::add_binary_data_(uint8_t key, const char *id, bool data) {
auto len = 1 + 1 + 1 + strlen(id);
if (len + this->header_.size() + this->data_.size() > this->get_max_packet_size()) {
this->flush_();
this->init_data_();
}
add(this->data_, key);
add(this->data_, (uint8_t) data);
@@ -285,7 +284,6 @@ void PacketTransport::add_data_(uint8_t key, const char *id, uint32_t data) {
auto len = 4 + 1 + 1 + strlen(id);
if (len + this->header_.size() + this->data_.size() > this->get_max_packet_size()) {
this->flush_();
this->init_data_();
}
add(this->data_, key);
add(this->data_, data);

View File

@@ -196,8 +196,8 @@ FILTER_SOURCE_FILES = filter_source_files_from_platform(
PlatformFramework.ESP32_ARDUINO,
PlatformFramework.ESP32_IDF,
},
"remote_receiver.cpp": {
PlatformFramework.ESP8266_ARDUINO,
"remote_receiver_esp8266.cpp": {PlatformFramework.ESP8266_ARDUINO},
"remote_receiver_libretiny.cpp": {
PlatformFramework.BK72XX_ARDUINO,
PlatformFramework.RTL87XX_ARDUINO,
PlatformFramework.LN882X_ARDUINO,

View File

@@ -3,12 +3,12 @@
#include "esphome/core/helpers.h"
#include "esphome/core/log.h"
#if defined(USE_LIBRETINY) || defined(USE_ESP8266)
#ifdef USE_ESP8266
namespace esphome {
namespace remote_receiver {
static const char *const TAG = "remote_receiver";
static const char *const TAG = "remote_receiver.esp8266";
void IRAM_ATTR HOT RemoteReceiverComponentStore::gpio_intr(RemoteReceiverComponentStore *arg) {
const uint32_t now = micros();

View File

@@ -0,0 +1,125 @@
#include "remote_receiver.h"
#include "esphome/core/hal.h"
#include "esphome/core/helpers.h"
#include "esphome/core/log.h"
#ifdef USE_LIBRETINY
namespace esphome {
namespace remote_receiver {
static const char *const TAG = "remote_receiver.libretiny";
void IRAM_ATTR HOT RemoteReceiverComponentStore::gpio_intr(RemoteReceiverComponentStore *arg) {
const uint32_t now = micros();
// If the lhs is 1 (rising edge) we should write to an uneven index and vice versa
const uint32_t next = (arg->buffer_write_at + 1) % arg->buffer_size;
const bool level = arg->pin.digital_read();
if (level != next % 2)
return;
// If next is buffer_read, we have hit an overflow
if (next == arg->buffer_read_at)
return;
const uint32_t last_change = arg->buffer[arg->buffer_write_at];
const uint32_t time_since_change = now - last_change;
if (time_since_change <= arg->filter_us)
return;
arg->buffer[arg->buffer_write_at = next] = now;
}
void RemoteReceiverComponent::setup() {
this->pin_->setup();
auto &s = this->store_;
s.filter_us = this->filter_us_;
s.pin = this->pin_->to_isr();
s.buffer_size = this->buffer_size_;
this->high_freq_.start();
if (s.buffer_size % 2 != 0) {
// Make sure divisible by two. This way, we know that every 0bxxx0 index is a space and every 0bxxx1 index is a mark
s.buffer_size++;
}
s.buffer = new uint32_t[s.buffer_size];
void *buf = (void *) s.buffer;
memset(buf, 0, s.buffer_size * sizeof(uint32_t));
// First index is a space.
if (this->pin_->digital_read()) {
s.buffer_write_at = s.buffer_read_at = 1;
} else {
s.buffer_write_at = s.buffer_read_at = 0;
}
this->pin_->attach_interrupt(RemoteReceiverComponentStore::gpio_intr, &this->store_, gpio::INTERRUPT_ANY_EDGE);
}
void RemoteReceiverComponent::dump_config() {
ESP_LOGCONFIG(TAG, "Remote Receiver:");
LOG_PIN(" Pin: ", this->pin_);
if (this->pin_->digital_read()) {
ESP_LOGW(TAG, "Remote Receiver Signal starts with a HIGH value. Usually this means you have to "
"invert the signal using 'inverted: True' in the pin schema!");
}
ESP_LOGCONFIG(TAG,
" Buffer Size: %u\n"
" Tolerance: %u%s\n"
" Filter out pulses shorter than: %u us\n"
" Signal is done after %u us of no changes",
this->buffer_size_, this->tolerance_,
(this->tolerance_mode_ == remote_base::TOLERANCE_MODE_TIME) ? " us" : "%", this->filter_us_,
this->idle_us_);
}
void RemoteReceiverComponent::loop() {
auto &s = this->store_;
// copy write at to local variables, as it's volatile
const uint32_t write_at = s.buffer_write_at;
const uint32_t dist = (s.buffer_size + write_at - s.buffer_read_at) % s.buffer_size;
// signals must at least one rising and one leading edge
if (dist <= 1)
return;
const uint32_t now = micros();
if (now - s.buffer[write_at] < this->idle_us_) {
// The last change was fewer than the configured idle time ago.
return;
}
ESP_LOGVV(TAG, "read_at=%u write_at=%u dist=%u now=%u end=%u", s.buffer_read_at, write_at, dist, now,
s.buffer[write_at]);
// Skip first value, it's from the previous idle level
s.buffer_read_at = (s.buffer_read_at + 1) % s.buffer_size;
uint32_t prev = s.buffer_read_at;
s.buffer_read_at = (s.buffer_read_at + 1) % s.buffer_size;
const uint32_t reserve_size = 1 + (s.buffer_size + write_at - s.buffer_read_at) % s.buffer_size;
this->temp_.clear();
this->temp_.reserve(reserve_size);
int32_t multiplier = s.buffer_read_at % 2 == 0 ? 1 : -1;
for (uint32_t i = 0; prev != write_at; i++) {
int32_t delta = s.buffer[s.buffer_read_at] - s.buffer[prev];
if (uint32_t(delta) >= this->idle_us_) {
// already found a space longer than idle. There must have been two pulses
break;
}
ESP_LOGVV(TAG, " i=%u buffer[%u]=%u - buffer[%u]=%u -> %d", i, s.buffer_read_at, s.buffer[s.buffer_read_at], prev,
s.buffer[prev], multiplier * delta);
this->temp_.push_back(multiplier * delta);
prev = s.buffer_read_at;
s.buffer_read_at = (s.buffer_read_at + 1) % s.buffer_size;
multiplier *= -1;
}
s.buffer_read_at = (s.buffer_size + s.buffer_read_at - 1) % s.buffer_size;
this->temp_.push_back(this->idle_us_ * multiplier);
this->call_listeners_dumpers_();
}
} // namespace remote_receiver
} // namespace esphome
#endif

View File

@@ -131,8 +131,8 @@ FILTER_SOURCE_FILES = filter_source_files_from_platform(
PlatformFramework.ESP32_ARDUINO,
PlatformFramework.ESP32_IDF,
},
"remote_transmitter.cpp": {
PlatformFramework.ESP8266_ARDUINO,
"remote_transmitter_esp8266.cpp": {PlatformFramework.ESP8266_ARDUINO},
"remote_transmitter_libretiny.cpp": {
PlatformFramework.BK72XX_ARDUINO,
PlatformFramework.RTL87XX_ARDUINO,
PlatformFramework.LN882X_ARDUINO,

View File

@@ -2,107 +2,10 @@
#include "esphome/core/log.h"
#include "esphome/core/application.h"
#if defined(USE_LIBRETINY) || defined(USE_ESP8266)
namespace esphome {
namespace remote_transmitter {
static const char *const TAG = "remote_transmitter";
void RemoteTransmitterComponent::setup() {
this->pin_->setup();
this->pin_->digital_write(false);
}
void RemoteTransmitterComponent::dump_config() {
ESP_LOGCONFIG(TAG,
"Remote Transmitter:\n"
" Carrier Duty: %u%%",
this->carrier_duty_percent_);
LOG_PIN(" Pin: ", this->pin_);
}
void RemoteTransmitterComponent::calculate_on_off_time_(uint32_t carrier_frequency, uint32_t *on_time_period,
uint32_t *off_time_period) {
if (carrier_frequency == 0) {
*on_time_period = 0;
*off_time_period = 0;
return;
}
uint32_t period = (1000000UL + carrier_frequency / 2) / carrier_frequency; // round(1000000/freq)
period = std::max(uint32_t(1), period);
*on_time_period = (period * this->carrier_duty_percent_) / 100;
*off_time_period = period - *on_time_period;
}
void RemoteTransmitterComponent::await_target_time_() {
const uint32_t current_time = micros();
if (this->target_time_ == 0) {
this->target_time_ = current_time;
} else if ((int32_t) (this->target_time_ - current_time) > 0) {
delayMicroseconds(this->target_time_ - current_time);
}
}
void RemoteTransmitterComponent::mark_(uint32_t on_time, uint32_t off_time, uint32_t usec) {
this->await_target_time_();
this->pin_->digital_write(true);
const uint32_t target = this->target_time_ + usec;
if (this->carrier_duty_percent_ < 100 && (on_time > 0 || off_time > 0)) {
while (true) { // Modulate with carrier frequency
this->target_time_ += on_time;
if ((int32_t) (this->target_time_ - target) >= 0)
break;
this->await_target_time_();
this->pin_->digital_write(false);
this->target_time_ += off_time;
if ((int32_t) (this->target_time_ - target) >= 0)
break;
this->await_target_time_();
this->pin_->digital_write(true);
}
}
this->target_time_ = target;
}
void RemoteTransmitterComponent::space_(uint32_t usec) {
this->await_target_time_();
this->pin_->digital_write(false);
this->target_time_ += usec;
}
void RemoteTransmitterComponent::digital_write(bool value) { this->pin_->digital_write(value); }
void RemoteTransmitterComponent::send_internal(uint32_t send_times, uint32_t send_wait) {
ESP_LOGD(TAG, "Sending remote code");
uint32_t on_time, off_time;
this->calculate_on_off_time_(this->temp_.get_carrier_frequency(), &on_time, &off_time);
this->target_time_ = 0;
this->transmit_trigger_->trigger();
for (uint32_t i = 0; i < send_times; i++) {
InterruptLock lock;
for (int32_t item : this->temp_.get_data()) {
if (item > 0) {
const auto length = uint32_t(item);
this->mark_(on_time, off_time, length);
} else {
const auto length = uint32_t(-item);
this->space_(length);
}
App.feed_wdt();
}
this->await_target_time_(); // wait for duration of last pulse
this->pin_->digital_write(false);
if (i + 1 < send_times)
this->target_time_ += send_wait;
}
this->complete_trigger_->trigger();
}
} // namespace remote_transmitter
} // namespace esphome
#endif

View File

@@ -0,0 +1,107 @@
#include "remote_transmitter.h"
#include "esphome/core/log.h"
#include "esphome/core/application.h"
#ifdef USE_ESP8266
namespace esphome {
namespace remote_transmitter {
static const char *const TAG = "remote_transmitter";
void RemoteTransmitterComponent::setup() {
this->pin_->setup();
this->pin_->digital_write(false);
}
void RemoteTransmitterComponent::dump_config() {
ESP_LOGCONFIG(TAG,
"Remote Transmitter:\n"
" Carrier Duty: %u%%",
this->carrier_duty_percent_);
LOG_PIN(" Pin: ", this->pin_);
}
void RemoteTransmitterComponent::calculate_on_off_time_(uint32_t carrier_frequency, uint32_t *on_time_period,
uint32_t *off_time_period) {
if (carrier_frequency == 0) {
*on_time_period = 0;
*off_time_period = 0;
return;
}
uint32_t period = (1000000UL + carrier_frequency / 2) / carrier_frequency; // round(1000000/freq)
period = std::max(uint32_t(1), period);
*on_time_period = (period * this->carrier_duty_percent_) / 100;
*off_time_period = period - *on_time_period;
}
void RemoteTransmitterComponent::await_target_time_() {
const uint32_t current_time = micros();
if (this->target_time_ == 0) {
this->target_time_ = current_time;
} else if ((int32_t) (this->target_time_ - current_time) > 0) {
delayMicroseconds(this->target_time_ - current_time);
}
}
void RemoteTransmitterComponent::mark_(uint32_t on_time, uint32_t off_time, uint32_t usec) {
this->await_target_time_();
this->pin_->digital_write(true);
const uint32_t target = this->target_time_ + usec;
if (this->carrier_duty_percent_ < 100 && (on_time > 0 || off_time > 0)) {
while (true) { // Modulate with carrier frequency
this->target_time_ += on_time;
if ((int32_t) (this->target_time_ - target) >= 0)
break;
this->await_target_time_();
this->pin_->digital_write(false);
this->target_time_ += off_time;
if ((int32_t) (this->target_time_ - target) >= 0)
break;
this->await_target_time_();
this->pin_->digital_write(true);
}
}
this->target_time_ = target;
}
void RemoteTransmitterComponent::space_(uint32_t usec) {
this->await_target_time_();
this->pin_->digital_write(false);
this->target_time_ += usec;
}
void RemoteTransmitterComponent::digital_write(bool value) { this->pin_->digital_write(value); }
void RemoteTransmitterComponent::send_internal(uint32_t send_times, uint32_t send_wait) {
ESP_LOGD(TAG, "Sending remote code");
uint32_t on_time, off_time;
this->calculate_on_off_time_(this->temp_.get_carrier_frequency(), &on_time, &off_time);
this->target_time_ = 0;
this->transmit_trigger_->trigger();
for (uint32_t i = 0; i < send_times; i++) {
for (int32_t item : this->temp_.get_data()) {
if (item > 0) {
const auto length = uint32_t(item);
this->mark_(on_time, off_time, length);
} else {
const auto length = uint32_t(-item);
this->space_(length);
}
App.feed_wdt();
}
this->await_target_time_(); // wait for duration of last pulse
this->pin_->digital_write(false);
if (i + 1 < send_times)
this->target_time_ += send_wait;
}
this->complete_trigger_->trigger();
}
} // namespace remote_transmitter
} // namespace esphome
#endif

View File

@@ -0,0 +1,110 @@
#include "remote_transmitter.h"
#include "esphome/core/log.h"
#include "esphome/core/application.h"
#ifdef USE_LIBRETINY
namespace esphome {
namespace remote_transmitter {
static const char *const TAG = "remote_transmitter";
void RemoteTransmitterComponent::setup() {
this->pin_->setup();
this->pin_->digital_write(false);
}
void RemoteTransmitterComponent::dump_config() {
ESP_LOGCONFIG(TAG,
"Remote Transmitter:\n"
" Carrier Duty: %u%%",
this->carrier_duty_percent_);
LOG_PIN(" Pin: ", this->pin_);
}
void RemoteTransmitterComponent::calculate_on_off_time_(uint32_t carrier_frequency, uint32_t *on_time_period,
uint32_t *off_time_period) {
if (carrier_frequency == 0) {
*on_time_period = 0;
*off_time_period = 0;
return;
}
uint32_t period = (1000000UL + carrier_frequency / 2) / carrier_frequency; // round(1000000/freq)
period = std::max(uint32_t(1), period);
*on_time_period = (period * this->carrier_duty_percent_) / 100;
*off_time_period = period - *on_time_period;
}
void RemoteTransmitterComponent::await_target_time_() {
const uint32_t current_time = micros();
if (this->target_time_ == 0) {
this->target_time_ = current_time;
} else {
while ((int32_t) (this->target_time_ - micros()) > 0) {
// busy loop that ensures micros is constantly called
}
}
}
void RemoteTransmitterComponent::mark_(uint32_t on_time, uint32_t off_time, uint32_t usec) {
this->await_target_time_();
this->pin_->digital_write(true);
const uint32_t target = this->target_time_ + usec;
if (this->carrier_duty_percent_ < 100 && (on_time > 0 || off_time > 0)) {
while (true) { // Modulate with carrier frequency
this->target_time_ += on_time;
if ((int32_t) (this->target_time_ - target) >= 0)
break;
this->await_target_time_();
this->pin_->digital_write(false);
this->target_time_ += off_time;
if ((int32_t) (this->target_time_ - target) >= 0)
break;
this->await_target_time_();
this->pin_->digital_write(true);
}
}
this->target_time_ = target;
}
void RemoteTransmitterComponent::space_(uint32_t usec) {
this->await_target_time_();
this->pin_->digital_write(false);
this->target_time_ += usec;
}
void RemoteTransmitterComponent::digital_write(bool value) { this->pin_->digital_write(value); }
void RemoteTransmitterComponent::send_internal(uint32_t send_times, uint32_t send_wait) {
ESP_LOGD(TAG, "Sending remote code");
uint32_t on_time, off_time;
this->calculate_on_off_time_(this->temp_.get_carrier_frequency(), &on_time, &off_time);
this->target_time_ = 0;
this->transmit_trigger_->trigger();
for (uint32_t i = 0; i < send_times; i++) {
InterruptLock lock;
for (int32_t item : this->temp_.get_data()) {
if (item > 0) {
const auto length = uint32_t(item);
this->mark_(on_time, off_time, length);
} else {
const auto length = uint32_t(-item);
this->space_(length);
}
App.feed_wdt();
}
this->await_target_time_(); // wait for duration of last pulse
this->pin_->digital_write(false);
if (i + 1 < send_times)
this->target_time_ += send_wait;
}
this->complete_trigger_->trigger();
}
} // namespace remote_transmitter
} // namespace esphome
#endif

View File

@@ -593,7 +593,7 @@ void WiFiComponent::check_scanning_finished() {
for (auto &res : this->scan_result_) {
char bssid_s[18];
auto bssid = res.get_bssid();
sprintf(bssid_s, "%02X:%02X:%02X:%02X:%02X:%02X", bssid[0], bssid[1], bssid[2], bssid[3], bssid[4], bssid[5]);
format_mac_addr_upper(bssid.data(), bssid_s);
if (res.get_matches()) {
ESP_LOGI(TAG, "- '%s' %s" LOG_SECRET("(%s) ") "%s", res.get_ssid().c_str(),

View File

@@ -1,6 +1,7 @@
#pragma once
#include "esphome/core/component.h"
#include "esphome/core/helpers.h"
#include "esphome/components/text_sensor/text_sensor.h"
#include "esphome/components/wifi/wifi_component.h"
#ifdef USE_WIFI
@@ -106,8 +107,8 @@ class BSSIDWiFiInfo : public PollingComponent, public text_sensor::TextSensor {
wifi::bssid_t bssid = wifi::global_wifi_component->wifi_bssid();
if (memcmp(bssid.data(), last_bssid_.data(), 6) != 0) {
std::copy(bssid.begin(), bssid.end(), last_bssid_.begin());
char buf[30];
sprintf(buf, "%02X:%02X:%02X:%02X:%02X:%02X", bssid[0], bssid[1], bssid[2], bssid[3], bssid[4], bssid[5]);
char buf[18];
format_mac_addr_upper(bssid.data(), buf);
this->publish_state(buf);
}
}

View File

@@ -4,7 +4,7 @@ from enum import Enum
from esphome.enum import StrEnum
__version__ = "2025.10.0-dev"
__version__ = "2025.9.0b2"
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
VALID_SUBSTITUTIONS_CHARACTERS = (

View File

@@ -255,23 +255,22 @@ size_t parse_hex(const char *str, size_t length, uint8_t *data, size_t count) {
}
std::string format_mac_address_pretty(const uint8_t *mac) {
return str_snprintf("%02X:%02X:%02X:%02X:%02X:%02X", 17, mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
char buf[18];
format_mac_addr_upper(mac, buf);
return std::string(buf);
}
static char format_hex_char(uint8_t v) { return v >= 10 ? 'a' + (v - 10) : '0' + v; }
std::string format_hex(const uint8_t *data, size_t length) {
std::string ret;
ret.resize(length * 2);
for (size_t i = 0; i < length; i++) {
ret[2 * i] = format_hex_char((data[i] & 0xF0) >> 4);
ret[2 * i] = format_hex_char(data[i] >> 4);
ret[2 * i + 1] = format_hex_char(data[i] & 0x0F);
}
return ret;
}
std::string format_hex(const std::vector<uint8_t> &data) { return format_hex(data.data(), data.size()); }
static char format_hex_pretty_char(uint8_t v) { return v >= 10 ? 'A' + (v - 10) : '0' + v; }
// Shared implementation for uint8_t and string hex formatting
static std::string format_hex_pretty_uint8(const uint8_t *data, size_t length, char separator, bool show_length) {
if (data == nullptr || length == 0)
@@ -280,7 +279,7 @@ static std::string format_hex_pretty_uint8(const uint8_t *data, size_t length, c
uint8_t multiple = separator ? 3 : 2; // 3 if separator is not \0, 2 otherwise
ret.resize(multiple * length - (separator ? 1 : 0));
for (size_t i = 0; i < length; i++) {
ret[multiple * i] = format_hex_pretty_char((data[i] & 0xF0) >> 4);
ret[multiple * i] = format_hex_pretty_char(data[i] >> 4);
ret[multiple * i + 1] = format_hex_pretty_char(data[i] & 0x0F);
if (separator && i != length - 1)
ret[multiple * i + 2] = separator;
@@ -591,7 +590,9 @@ bool HighFrequencyLoopRequester::is_high_frequency() { return num_requests > 0;
std::string get_mac_address() {
uint8_t mac[6];
get_mac_address_raw(mac);
return str_snprintf("%02x%02x%02x%02x%02x%02x", 12, mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
char buf[13];
format_mac_addr_lower_no_sep(mac, buf);
return std::string(buf);
}
std::string get_mac_address_pretty() {

View File

@@ -380,6 +380,35 @@ template<typename T, enable_if_t<std::is_unsigned<T>::value, int> = 0> optional<
return parse_hex<T>(str.c_str(), str.length());
}
/// Convert a nibble (0-15) to lowercase hex char
inline char format_hex_char(uint8_t v) { return v >= 10 ? 'a' + (v - 10) : '0' + v; }
/// Convert a nibble (0-15) to uppercase hex char (used for pretty printing)
/// This always uses uppercase (A-F) for pretty/human-readable output
inline char format_hex_pretty_char(uint8_t v) { return v >= 10 ? 'A' + (v - 10) : '0' + v; }
/// Format MAC address as XX:XX:XX:XX:XX:XX (uppercase)
inline void format_mac_addr_upper(const uint8_t *mac, char *output) {
for (size_t i = 0; i < 6; i++) {
uint8_t byte = mac[i];
output[i * 3] = format_hex_pretty_char(byte >> 4);
output[i * 3 + 1] = format_hex_pretty_char(byte & 0x0F);
if (i < 5)
output[i * 3 + 2] = ':';
}
output[17] = '\0';
}
/// Format MAC address as xxxxxxxxxxxxxx (lowercase, no separators)
inline void format_mac_addr_lower_no_sep(const uint8_t *mac, char *output) {
for (size_t i = 0; i < 6; i++) {
uint8_t byte = mac[i];
output[i * 2] = format_hex_char(byte >> 4);
output[i * 2 + 1] = format_hex_char(byte & 0x0F);
}
output[12] = '\0';
}
/// Format the six-byte array \p mac into a MAC address.
std::string format_mac_address_pretty(const uint8_t mac[6]);
/// Format the byte array \p data of length \p len in lowercased hex.

View File

@@ -345,7 +345,7 @@ void HOT Scheduler::call(uint32_t now) {
// Execute callback without holding lock to prevent deadlocks
// if the callback tries to call defer() again
if (!this->should_skip_item_(item.get())) {
this->execute_item_(item.get(), now);
now = this->execute_item_(item.get(), now);
}
// Recycle the defer item after execution
this->recycle_item_(std::move(item));
@@ -483,7 +483,7 @@ void HOT Scheduler::call(uint32_t now) {
// Warning: During callback(), a lot of stuff can happen, including:
// - timeouts/intervals get added, potentially invalidating vector pointers
// - timeouts/intervals get cancelled
this->execute_item_(item.get(), now);
now = this->execute_item_(item.get(), now);
LockGuard guard{this->lock_};
@@ -568,11 +568,11 @@ void HOT Scheduler::pop_raw_() {
}
// Helper to execute a scheduler item
void HOT Scheduler::execute_item_(SchedulerItem *item, uint32_t now) {
uint32_t HOT Scheduler::execute_item_(SchedulerItem *item, uint32_t now) {
App.set_current_component(item->component);
WarnIfComponentBlockingGuard guard{item->component, now};
item->callback();
guard.finish();
return guard.finish();
}
// Common implementation for cancel operations

View File

@@ -254,7 +254,7 @@ class Scheduler {
}
// Helper to execute a scheduler item
void execute_item_(SchedulerItem *item, uint32_t now);
uint32_t execute_item_(SchedulerItem *item, uint32_t now);
// Helper to check if item should be skipped
bool should_skip_item_(SchedulerItem *item) const {

View File

@@ -19,6 +19,3 @@ dependencies:
- if: "target in [esp32h2, esp32p4]"
zorxx/multipart-parser:
version: 1.0.1
vroland/epdiy:
git: https://github.com/vroland/epdiy.git
version: c61e9e923ce2418150d54f88cea5d196cdc40c54

View File

@@ -12,7 +12,7 @@ platformio==6.1.18 # When updating platformio, also update /docker/Dockerfile
esptool==5.0.2
click==8.1.7
esphome-dashboard==20250904.0
aioesphomeapi==40.1.0
aioesphomeapi==40.2.1
zeroconf==0.147.2
puremagic==1.30
ruamel.yaml==0.18.15 # dashboard_import

View File

@@ -1,6 +1,6 @@
pylint==3.3.8
flake8==7.3.0 # also change in .pre-commit-config.yaml when updating
ruff==0.13.0 # also change in .pre-commit-config.yaml when updating
ruff==0.12.12 # also change in .pre-commit-config.yaml when updating
pyupgrade==3.20.0 # also change in .pre-commit-config.yaml when updating
pre-commit
@@ -8,7 +8,7 @@ pre-commit
pytest==8.4.2
pytest-cov==7.0.0
pytest-mock==3.15.0
pytest-asyncio==1.2.0
pytest-asyncio==1.1.0
pytest-xdist==3.8.0
asyncmock==0.4.2
hypothesis==6.92.1

View File

@@ -1,203 +0,0 @@
"""Tests for dashboard entries Path-related functionality."""
from __future__ import annotations
from pathlib import Path
import tempfile
from unittest.mock import MagicMock
import pytest
import pytest_asyncio
from esphome.core import CORE
from esphome.dashboard.entries import DashboardEntries, DashboardEntry
def create_cache_key() -> tuple[int, int, float, int]:
"""Helper to create a valid DashboardCacheKeyType."""
return (0, 0, 0.0, 0)
@pytest.fixture(autouse=True)
def setup_core():
"""Set up CORE for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
CORE.config_path = str(Path(tmpdir) / "test.yaml")
yield
CORE.reset()
@pytest.fixture
def mock_settings() -> MagicMock:
"""Create mock dashboard settings."""
settings = MagicMock()
settings.config_dir = "/test/config"
settings.absolute_config_dir = Path("/test/config")
return settings
@pytest_asyncio.fixture
async def dashboard_entries(mock_settings: MagicMock) -> DashboardEntries:
"""Create a DashboardEntries instance for testing."""
return DashboardEntries(mock_settings)
def test_dashboard_entry_path_initialization() -> None:
"""Test DashboardEntry initializes with path correctly."""
test_path = "/test/config/device.yaml"
cache_key = create_cache_key()
entry = DashboardEntry(test_path, cache_key)
assert entry.path == test_path
assert entry.cache_key == cache_key
def test_dashboard_entry_path_with_absolute_path() -> None:
"""Test DashboardEntry handles absolute paths."""
# Use a truly absolute path for the platform
test_path = Path.cwd() / "absolute" / "path" / "to" / "config.yaml"
cache_key = create_cache_key()
entry = DashboardEntry(str(test_path), cache_key)
assert entry.path == str(test_path)
assert Path(entry.path).is_absolute()
def test_dashboard_entry_path_with_relative_path() -> None:
"""Test DashboardEntry handles relative paths."""
test_path = "configs/device.yaml"
cache_key = create_cache_key()
entry = DashboardEntry(test_path, cache_key)
assert entry.path == test_path
assert not Path(entry.path).is_absolute()
@pytest.mark.asyncio
async def test_dashboard_entries_get_by_path(
dashboard_entries: DashboardEntries,
) -> None:
"""Test getting entry by path."""
test_path = "/test/config/device.yaml"
entry = DashboardEntry(test_path, create_cache_key())
dashboard_entries._entries[test_path] = entry
result = dashboard_entries.get(test_path)
assert result == entry
@pytest.mark.asyncio
async def test_dashboard_entries_get_nonexistent_path(
dashboard_entries: DashboardEntries,
) -> None:
"""Test getting non-existent entry returns None."""
result = dashboard_entries.get("/nonexistent/path.yaml")
assert result is None
@pytest.mark.asyncio
async def test_dashboard_entries_path_normalization(
dashboard_entries: DashboardEntries,
) -> None:
"""Test that paths are handled consistently."""
path1 = "/test/config/device.yaml"
entry = DashboardEntry(path1, create_cache_key())
dashboard_entries._entries[path1] = entry
result = dashboard_entries.get(path1)
assert result == entry
@pytest.mark.asyncio
async def test_dashboard_entries_path_with_spaces(
dashboard_entries: DashboardEntries,
) -> None:
"""Test handling paths with spaces."""
test_path = "/test/config/my device.yaml"
entry = DashboardEntry(test_path, create_cache_key())
dashboard_entries._entries[test_path] = entry
result = dashboard_entries.get(test_path)
assert result == entry
assert result.path == test_path
@pytest.mark.asyncio
async def test_dashboard_entries_path_with_special_chars(
dashboard_entries: DashboardEntries,
) -> None:
"""Test handling paths with special characters."""
test_path = "/test/config/device-01_test.yaml"
entry = DashboardEntry(test_path, create_cache_key())
dashboard_entries._entries[test_path] = entry
result = dashboard_entries.get(test_path)
assert result == entry
def test_dashboard_entries_windows_path() -> None:
"""Test handling Windows-style paths."""
test_path = r"C:\Users\test\esphome\device.yaml"
cache_key = create_cache_key()
entry = DashboardEntry(test_path, cache_key)
assert entry.path == test_path
@pytest.mark.asyncio
async def test_dashboard_entries_path_to_cache_key_mapping(
dashboard_entries: DashboardEntries,
) -> None:
"""Test internal entries storage with paths and cache keys."""
path1 = "/test/config/device1.yaml"
path2 = "/test/config/device2.yaml"
entry1 = DashboardEntry(path1, create_cache_key())
entry2 = DashboardEntry(path2, (1, 1, 1.0, 1))
dashboard_entries._entries[path1] = entry1
dashboard_entries._entries[path2] = entry2
assert path1 in dashboard_entries._entries
assert path2 in dashboard_entries._entries
assert dashboard_entries._entries[path1].cache_key == create_cache_key()
assert dashboard_entries._entries[path2].cache_key == (1, 1, 1.0, 1)
def test_dashboard_entry_path_property() -> None:
"""Test that path property returns expected value."""
test_path = "/test/config/device.yaml"
entry = DashboardEntry(test_path, create_cache_key())
assert entry.path == test_path
assert isinstance(entry.path, str)
@pytest.mark.asyncio
async def test_dashboard_entries_all_returns_entries_with_paths(
dashboard_entries: DashboardEntries,
) -> None:
"""Test that all() returns entries with their paths intact."""
paths = [
"/test/config/device1.yaml",
"/test/config/device2.yaml",
"/test/config/subfolder/device3.yaml",
]
for path in paths:
entry = DashboardEntry(path, create_cache_key())
dashboard_entries._entries[path] = entry
all_entries = dashboard_entries.async_all()
assert len(all_entries) == len(paths)
retrieved_paths = [entry.path for entry in all_entries]
assert set(retrieved_paths) == set(paths)

View File

@@ -1,168 +0,0 @@
"""Tests for dashboard settings Path-related functionality."""
from __future__ import annotations
import os
from pathlib import Path
import tempfile
import pytest
from esphome.dashboard.settings import DashboardSettings
@pytest.fixture
def dashboard_settings(tmp_path: Path) -> DashboardSettings:
"""Create DashboardSettings instance with temp directory."""
settings = DashboardSettings()
# Resolve symlinks to ensure paths match
resolved_dir = tmp_path.resolve()
settings.config_dir = str(resolved_dir)
settings.absolute_config_dir = resolved_dir
return settings
def test_rel_path_simple(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path with simple relative path."""
result = dashboard_settings.rel_path("config.yaml")
expected = str(Path(dashboard_settings.config_dir) / "config.yaml")
assert result == expected
def test_rel_path_multiple_components(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path with multiple path components."""
result = dashboard_settings.rel_path("subfolder", "device", "config.yaml")
expected = str(
Path(dashboard_settings.config_dir) / "subfolder" / "device" / "config.yaml"
)
assert result == expected
def test_rel_path_with_dots(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path prevents directory traversal."""
# This should raise ValueError as it tries to go outside config_dir
with pytest.raises(ValueError):
dashboard_settings.rel_path("..", "outside.yaml")
def test_rel_path_absolute_path_within_config(
dashboard_settings: DashboardSettings,
) -> None:
"""Test rel_path with absolute path that's within config dir."""
internal_path = dashboard_settings.absolute_config_dir / "internal.yaml"
internal_path.touch()
result = dashboard_settings.rel_path("internal.yaml")
expected = str(Path(dashboard_settings.config_dir) / "internal.yaml")
assert result == expected
def test_rel_path_absolute_path_outside_config(
dashboard_settings: DashboardSettings,
) -> None:
"""Test rel_path with absolute path outside config dir raises error."""
outside_path = "/tmp/outside/config.yaml"
with pytest.raises(ValueError):
dashboard_settings.rel_path(outside_path)
def test_rel_path_empty_args(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path with no arguments returns config_dir."""
result = dashboard_settings.rel_path()
assert result == dashboard_settings.config_dir
def test_rel_path_with_pathlib_path(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path works with Path objects as arguments."""
path_obj = Path("subfolder") / "config.yaml"
result = dashboard_settings.rel_path(path_obj)
expected = str(Path(dashboard_settings.config_dir) / "subfolder" / "config.yaml")
assert result == expected
def test_rel_path_normalizes_slashes(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path normalizes path separators."""
# os.path.join normalizes slashes on Windows but preserves them on Unix
# Test that providing components separately gives same result
result1 = dashboard_settings.rel_path("folder", "subfolder", "file.yaml")
result2 = dashboard_settings.rel_path("folder", "subfolder", "file.yaml")
assert result1 == result2
# Also test that the result is as expected
expected = os.path.join(
dashboard_settings.config_dir, "folder", "subfolder", "file.yaml"
)
assert result1 == expected
def test_rel_path_handles_spaces(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path handles paths with spaces."""
result = dashboard_settings.rel_path("my folder", "my config.yaml")
expected = str(Path(dashboard_settings.config_dir) / "my folder" / "my config.yaml")
assert result == expected
def test_rel_path_handles_special_chars(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path handles paths with special characters."""
result = dashboard_settings.rel_path("device-01_test", "config.yaml")
expected = str(
Path(dashboard_settings.config_dir) / "device-01_test" / "config.yaml"
)
assert result == expected
def test_config_dir_as_path_property(dashboard_settings: DashboardSettings) -> None:
"""Test that config_dir can be accessed and used with Path operations."""
config_path = Path(dashboard_settings.config_dir)
assert config_path.exists()
assert config_path.is_dir()
assert config_path.is_absolute()
def test_absolute_config_dir_property(dashboard_settings: DashboardSettings) -> None:
"""Test absolute_config_dir is a Path object."""
assert isinstance(dashboard_settings.absolute_config_dir, Path)
assert dashboard_settings.absolute_config_dir.exists()
assert dashboard_settings.absolute_config_dir.is_dir()
assert dashboard_settings.absolute_config_dir.is_absolute()
def test_rel_path_symlink_inside_config(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path with symlink that points inside config dir."""
target = dashboard_settings.absolute_config_dir / "target.yaml"
target.touch()
symlink = dashboard_settings.absolute_config_dir / "link.yaml"
symlink.symlink_to(target)
result = dashboard_settings.rel_path("link.yaml")
expected = str(Path(dashboard_settings.config_dir) / "link.yaml")
assert result == expected
def test_rel_path_symlink_outside_config(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path with symlink that points outside config dir."""
with tempfile.NamedTemporaryFile(suffix=".yaml") as tmp:
symlink = dashboard_settings.absolute_config_dir / "external_link.yaml"
symlink.symlink_to(tmp.name)
with pytest.raises(ValueError):
dashboard_settings.rel_path("external_link.yaml")
def test_rel_path_with_none_arg(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path handles None arguments gracefully."""
result = dashboard_settings.rel_path("None")
expected = str(Path(dashboard_settings.config_dir) / "None")
assert result == expected
def test_rel_path_with_numeric_args(dashboard_settings: DashboardSettings) -> None:
"""Test rel_path handles numeric arguments."""
result = dashboard_settings.rel_path("123", "456.789")
expected = str(Path(dashboard_settings.config_dir) / "123" / "456.789")
assert result == expected

View File

@@ -1,230 +0,0 @@
"""Tests for dashboard web_server Path-related functionality."""
from __future__ import annotations
import gzip
import os
from pathlib import Path
from unittest.mock import MagicMock, patch
from esphome.dashboard import web_server
def test_get_base_frontend_path_production() -> None:
"""Test get_base_frontend_path in production mode."""
mock_module = MagicMock()
mock_module.where.return_value = "/usr/local/lib/esphome_dashboard"
with (
patch.dict(os.environ, {}, clear=True),
patch.dict("sys.modules", {"esphome_dashboard": mock_module}),
):
result = web_server.get_base_frontend_path()
assert result == "/usr/local/lib/esphome_dashboard"
mock_module.where.assert_called_once()
def test_get_base_frontend_path_dev_mode() -> None:
"""Test get_base_frontend_path in development mode."""
test_path = "/home/user/esphome/dashboard"
with patch.dict(os.environ, {"ESPHOME_DASHBOARD_DEV": test_path}):
result = web_server.get_base_frontend_path()
# The function uses os.path.abspath which doesn't resolve symlinks
# We need to match that behavior
# The actual function adds "/" to the path, so we simulate that
test_path_with_slash = test_path if test_path.endswith("/") else test_path + "/"
expected = os.path.abspath(
os.path.join(os.getcwd(), test_path_with_slash, "esphome_dashboard")
)
assert result == expected
def test_get_base_frontend_path_dev_mode_with_trailing_slash() -> None:
"""Test get_base_frontend_path in dev mode with trailing slash."""
test_path = "/home/user/esphome/dashboard/"
with patch.dict(os.environ, {"ESPHOME_DASHBOARD_DEV": test_path}):
result = web_server.get_base_frontend_path()
# The function uses os.path.abspath which doesn't resolve symlinks
expected = os.path.abspath(str(Path.cwd() / test_path / "esphome_dashboard"))
assert result == expected
def test_get_base_frontend_path_dev_mode_relative_path() -> None:
"""Test get_base_frontend_path with relative dev path."""
test_path = "./dashboard"
with patch.dict(os.environ, {"ESPHOME_DASHBOARD_DEV": test_path}):
result = web_server.get_base_frontend_path()
# The function uses os.path.abspath which doesn't resolve symlinks
# We need to match that behavior
# The actual function adds "/" to the path, so we simulate that
test_path_with_slash = test_path if test_path.endswith("/") else test_path + "/"
expected = os.path.abspath(
os.path.join(os.getcwd(), test_path_with_slash, "esphome_dashboard")
)
assert result == expected
assert Path(result).is_absolute()
def test_get_static_path_single_component() -> None:
"""Test get_static_path with single path component."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = "/base/frontend"
result = web_server.get_static_path("file.js")
assert result == os.path.join("/base/frontend", "static", "file.js")
def test_get_static_path_multiple_components() -> None:
"""Test get_static_path with multiple path components."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = "/base/frontend"
result = web_server.get_static_path("js", "esphome", "index.js")
assert result == os.path.join(
"/base/frontend", "static", "js", "esphome", "index.js"
)
def test_get_static_path_empty_args() -> None:
"""Test get_static_path with no arguments."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = "/base/frontend"
result = web_server.get_static_path()
assert result == os.path.join("/base/frontend", "static")
def test_get_static_path_with_pathlib_path() -> None:
"""Test get_static_path with Path objects."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = "/base/frontend"
path_obj = Path("js") / "app.js"
result = web_server.get_static_path(str(path_obj))
assert result == os.path.join("/base/frontend", "static", "js", "app.js")
def test_get_static_file_url_production() -> None:
"""Test get_static_file_url in production mode."""
web_server.get_static_file_url.cache_clear()
mock_module = MagicMock()
mock_file = MagicMock()
mock_file.read.return_value = b"test content"
mock_file.__enter__ = MagicMock(return_value=mock_file)
mock_file.__exit__ = MagicMock(return_value=None)
with (
patch.dict(os.environ, {}, clear=True),
patch.dict("sys.modules", {"esphome_dashboard": mock_module}),
patch("esphome.dashboard.web_server.get_static_path") as mock_get_path,
patch("esphome.dashboard.web_server.open", create=True, return_value=mock_file),
):
mock_get_path.return_value = "/fake/path/js/app.js"
result = web_server.get_static_file_url("js/app.js")
assert result.startswith("./static/js/app.js?hash=")
def test_get_static_file_url_dev_mode() -> None:
"""Test get_static_file_url in development mode."""
with patch.dict(os.environ, {"ESPHOME_DASHBOARD_DEV": "/dev/path"}):
web_server.get_static_file_url.cache_clear()
result = web_server.get_static_file_url("js/app.js")
assert result == "./static/js/app.js"
def test_get_static_file_url_index_js_special_case() -> None:
"""Test get_static_file_url replaces index.js with entrypoint."""
web_server.get_static_file_url.cache_clear()
mock_module = MagicMock()
mock_module.entrypoint.return_value = "main.js"
with (
patch.dict(os.environ, {}, clear=True),
patch.dict("sys.modules", {"esphome_dashboard": mock_module}),
):
result = web_server.get_static_file_url("js/esphome/index.js")
assert result == "./static/js/esphome/main.js"
def test_load_file_path(tmp_path: Path) -> None:
"""Test loading a file."""
test_file = tmp_path / "test.txt"
test_file.write_bytes(b"test content")
with open(test_file, "rb") as f:
content = f.read()
assert content == b"test content"
def test_load_file_compressed_path(tmp_path: Path) -> None:
"""Test loading a compressed file."""
test_file = tmp_path / "test.txt.gz"
with gzip.open(test_file, "wb") as gz:
gz.write(b"compressed content")
with gzip.open(test_file, "rb") as gz:
content = gz.read()
assert content == b"compressed content"
def test_path_normalization_in_static_path() -> None:
"""Test that paths are normalized correctly."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = "/base/frontend"
# Test with separate components
result1 = web_server.get_static_path("js", "app.js")
result2 = web_server.get_static_path("js", "app.js")
assert result1 == result2
assert result1 == os.path.join("/base/frontend", "static", "js", "app.js")
def test_windows_path_handling() -> None:
"""Test handling of Windows-style paths."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = r"C:\Program Files\esphome\frontend"
result = web_server.get_static_path("js", "app.js")
# os.path.join should handle this correctly on the platform
expected = os.path.join(
r"C:\Program Files\esphome\frontend", "static", "js", "app.js"
)
assert result == expected
def test_path_with_special_characters() -> None:
"""Test paths with special characters."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = "/base/frontend"
result = web_server.get_static_path("js-modules", "app_v1.0.js")
assert result == os.path.join(
"/base/frontend", "static", "js-modules", "app_v1.0.js"
)
def test_path_with_spaces() -> None:
"""Test paths with spaces."""
with patch("esphome.dashboard.web_server.get_base_frontend_path") as mock_base:
mock_base.return_value = "/base/my frontend"
result = web_server.get_static_path("my js", "my app.js")
assert result == os.path.join(
"/base/my frontend", "static", "my js", "my app.js"
)

View File

@@ -36,10 +36,3 @@ def fixture_path() -> Path:
Location of all fixture files.
"""
return here / "fixtures"
@pytest.fixture
def setup_core(tmp_path: Path) -> Path:
"""Set up CORE with test paths."""
CORE.config_path = str(tmp_path / "test.yaml")
return tmp_path

View File

@@ -1,3 +0,0 @@
# This file should be ignored
platform: template
name: "Hidden Sensor"

View File

@@ -1 +0,0 @@
This is not a YAML file and should be ignored

View File

@@ -1,4 +0,0 @@
platform: template
name: "Sensor 1"
lambda: |-
return 42.0;

View File

@@ -1,4 +0,0 @@
platform: template
name: "Sensor 2"
lambda: |-
return 100.0;

View File

@@ -1,4 +0,0 @@
platform: template
name: "Sensor 3 in subdir"
lambda: |-
return 200.0;

View File

@@ -1,4 +0,0 @@
test_secret: "my_secret_value"
another_secret: "another_value"
wifi_password: "super_secret_wifi"
api_key: "0123456789abcdef"

View File

@@ -1,17 +0,0 @@
esphome:
name: test_device
platform: ESP32
board: esp32dev
wifi:
ssid: "TestNetwork"
password: !secret wifi_password
api:
encryption:
key: !secret api_key
sensor:
- platform: template
name: "Test Sensor"
id: !secret test_secret

View File

@@ -1,187 +0,0 @@
"""Tests for config_validation.py path-related functions."""
from pathlib import Path
import pytest
import voluptuous as vol
from esphome import config_validation as cv
def test_directory_valid_path(setup_core: Path) -> None:
"""Test directory validator with valid directory."""
test_dir = setup_core / "test_directory"
test_dir.mkdir()
result = cv.directory("test_directory")
assert result == "test_directory"
def test_directory_absolute_path(setup_core: Path) -> None:
"""Test directory validator with absolute path."""
test_dir = setup_core / "test_directory"
test_dir.mkdir()
result = cv.directory(str(test_dir))
assert result == str(test_dir)
def test_directory_nonexistent_path(setup_core: Path) -> None:
"""Test directory validator raises error for non-existent directory."""
with pytest.raises(
vol.Invalid, match="Could not find directory.*nonexistent_directory"
):
cv.directory("nonexistent_directory")
def test_directory_file_instead_of_directory(setup_core: Path) -> None:
"""Test directory validator raises error when path is a file."""
test_file = setup_core / "test_file.txt"
test_file.write_text("content")
with pytest.raises(vol.Invalid, match="is not a directory"):
cv.directory("test_file.txt")
def test_directory_with_parent_directory(setup_core: Path) -> None:
"""Test directory validator with nested directory structure."""
nested_dir = setup_core / "parent" / "child" / "grandchild"
nested_dir.mkdir(parents=True)
result = cv.directory("parent/child/grandchild")
assert result == "parent/child/grandchild"
def test_file_valid_path(setup_core: Path) -> None:
"""Test file_ validator with valid file."""
test_file = setup_core / "test_file.yaml"
test_file.write_text("test content")
result = cv.file_("test_file.yaml")
assert result == "test_file.yaml"
def test_file_absolute_path(setup_core: Path) -> None:
"""Test file_ validator with absolute path."""
test_file = setup_core / "test_file.yaml"
test_file.write_text("test content")
result = cv.file_(str(test_file))
assert result == str(test_file)
def test_file_nonexistent_path(setup_core: Path) -> None:
"""Test file_ validator raises error for non-existent file."""
with pytest.raises(vol.Invalid, match="Could not find file.*nonexistent_file.yaml"):
cv.file_("nonexistent_file.yaml")
def test_file_directory_instead_of_file(setup_core: Path) -> None:
"""Test file_ validator raises error when path is a directory."""
test_dir = setup_core / "test_directory"
test_dir.mkdir()
with pytest.raises(vol.Invalid, match="is not a file"):
cv.file_("test_directory")
def test_file_with_parent_directory(setup_core: Path) -> None:
"""Test file_ validator with file in nested directory."""
nested_dir = setup_core / "configs" / "sensors"
nested_dir.mkdir(parents=True)
test_file = nested_dir / "temperature.yaml"
test_file.write_text("sensor config")
result = cv.file_("configs/sensors/temperature.yaml")
assert result == "configs/sensors/temperature.yaml"
def test_directory_handles_trailing_slash(setup_core: Path) -> None:
"""Test directory validator handles trailing slashes correctly."""
test_dir = setup_core / "test_dir"
test_dir.mkdir()
result = cv.directory("test_dir/")
assert result == "test_dir/"
result = cv.directory("test_dir")
assert result == "test_dir"
def test_file_handles_various_extensions(setup_core: Path) -> None:
"""Test file_ validator works with different file extensions."""
yaml_file = setup_core / "config.yaml"
yaml_file.write_text("yaml content")
assert cv.file_("config.yaml") == "config.yaml"
yml_file = setup_core / "config.yml"
yml_file.write_text("yml content")
assert cv.file_("config.yml") == "config.yml"
txt_file = setup_core / "readme.txt"
txt_file.write_text("text content")
assert cv.file_("readme.txt") == "readme.txt"
no_ext_file = setup_core / "LICENSE"
no_ext_file.write_text("license content")
assert cv.file_("LICENSE") == "LICENSE"
def test_directory_with_symlink(setup_core: Path) -> None:
"""Test directory validator follows symlinks."""
actual_dir = setup_core / "actual_directory"
actual_dir.mkdir()
symlink_dir = setup_core / "symlink_directory"
symlink_dir.symlink_to(actual_dir)
result = cv.directory("symlink_directory")
assert result == "symlink_directory"
def test_file_with_symlink(setup_core: Path) -> None:
"""Test file_ validator follows symlinks."""
actual_file = setup_core / "actual_file.txt"
actual_file.write_text("content")
symlink_file = setup_core / "symlink_file.txt"
symlink_file.symlink_to(actual_file)
result = cv.file_("symlink_file.txt")
assert result == "symlink_file.txt"
def test_directory_error_shows_full_path(setup_core: Path) -> None:
"""Test directory validator error message includes full path."""
with pytest.raises(vol.Invalid, match=".*missing_dir.*full path:.*"):
cv.directory("missing_dir")
def test_file_error_shows_full_path(setup_core: Path) -> None:
"""Test file_ validator error message includes full path."""
with pytest.raises(vol.Invalid, match=".*missing_file.yaml.*full path:.*"):
cv.file_("missing_file.yaml")
def test_directory_with_spaces_in_name(setup_core: Path) -> None:
"""Test directory validator handles spaces in directory names."""
dir_with_spaces = setup_core / "my test directory"
dir_with_spaces.mkdir()
result = cv.directory("my test directory")
assert result == "my test directory"
def test_file_with_spaces_in_name(setup_core: Path) -> None:
"""Test file_ validator handles spaces in file names."""
file_with_spaces = setup_core / "my test file.yaml"
file_with_spaces.write_text("content")
result = cv.file_("my test file.yaml")
assert result == "my test file.yaml"

View File

@@ -1,196 +0,0 @@
"""Tests for external_files.py functions."""
from pathlib import Path
import time
from unittest.mock import MagicMock, patch
import pytest
import requests
from esphome import external_files
from esphome.config_validation import Invalid
from esphome.core import CORE, TimePeriod
def test_compute_local_file_dir(setup_core: Path) -> None:
"""Test compute_local_file_dir creates and returns correct path."""
domain = "font"
result = external_files.compute_local_file_dir(domain)
assert isinstance(result, Path)
assert result == Path(CORE.data_dir) / domain
assert result.exists()
assert result.is_dir()
def test_compute_local_file_dir_nested(setup_core: Path) -> None:
"""Test compute_local_file_dir works with nested domains."""
domain = "images/icons"
result = external_files.compute_local_file_dir(domain)
assert result == Path(CORE.data_dir) / "images" / "icons"
assert result.exists()
assert result.is_dir()
def test_is_file_recent_with_recent_file(setup_core: Path) -> None:
"""Test is_file_recent returns True for recently created file."""
test_file = setup_core / "recent.txt"
test_file.write_text("content")
refresh = TimePeriod(seconds=3600)
result = external_files.is_file_recent(str(test_file), refresh)
assert result is True
def test_is_file_recent_with_old_file(setup_core: Path) -> None:
"""Test is_file_recent returns False for old file."""
test_file = setup_core / "old.txt"
test_file.write_text("content")
old_time = time.time() - 7200
with patch("os.path.getctime", return_value=old_time):
refresh = TimePeriod(seconds=3600)
result = external_files.is_file_recent(str(test_file), refresh)
assert result is False
def test_is_file_recent_nonexistent_file(setup_core: Path) -> None:
"""Test is_file_recent returns False for non-existent file."""
test_file = setup_core / "nonexistent.txt"
refresh = TimePeriod(seconds=3600)
result = external_files.is_file_recent(str(test_file), refresh)
assert result is False
def test_is_file_recent_with_zero_refresh(setup_core: Path) -> None:
"""Test is_file_recent with zero refresh period returns False."""
test_file = setup_core / "test.txt"
test_file.write_text("content")
# Mock getctime to return a time 10 seconds ago
with patch("os.path.getctime", return_value=time.time() - 10):
refresh = TimePeriod(seconds=0)
result = external_files.is_file_recent(str(test_file), refresh)
assert result is False
@patch("esphome.external_files.requests.head")
def test_has_remote_file_changed_not_modified(
mock_head: MagicMock, setup_core: Path
) -> None:
"""Test has_remote_file_changed returns False when file not modified."""
test_file = setup_core / "cached.txt"
test_file.write_text("cached content")
mock_response = MagicMock()
mock_response.status_code = 304
mock_head.return_value = mock_response
url = "https://example.com/file.txt"
result = external_files.has_remote_file_changed(url, str(test_file))
assert result is False
mock_head.assert_called_once()
call_args = mock_head.call_args
headers = call_args[1]["headers"]
assert external_files.IF_MODIFIED_SINCE in headers
assert external_files.CACHE_CONTROL in headers
@patch("esphome.external_files.requests.head")
def test_has_remote_file_changed_modified(
mock_head: MagicMock, setup_core: Path
) -> None:
"""Test has_remote_file_changed returns True when file modified."""
test_file = setup_core / "cached.txt"
test_file.write_text("cached content")
mock_response = MagicMock()
mock_response.status_code = 200
mock_head.return_value = mock_response
url = "https://example.com/file.txt"
result = external_files.has_remote_file_changed(url, str(test_file))
assert result is True
def test_has_remote_file_changed_no_local_file(setup_core: Path) -> None:
"""Test has_remote_file_changed returns True when local file doesn't exist."""
test_file = setup_core / "nonexistent.txt"
url = "https://example.com/file.txt"
result = external_files.has_remote_file_changed(url, str(test_file))
assert result is True
@patch("esphome.external_files.requests.head")
def test_has_remote_file_changed_network_error(
mock_head: MagicMock, setup_core: Path
) -> None:
"""Test has_remote_file_changed handles network errors gracefully."""
test_file = setup_core / "cached.txt"
test_file.write_text("cached content")
mock_head.side_effect = requests.exceptions.RequestException("Network error")
url = "https://example.com/file.txt"
with pytest.raises(Invalid, match="Could not check if.*Network error"):
external_files.has_remote_file_changed(url, str(test_file))
@patch("esphome.external_files.requests.head")
def test_has_remote_file_changed_timeout(
mock_head: MagicMock, setup_core: Path
) -> None:
"""Test has_remote_file_changed respects timeout."""
test_file = setup_core / "cached.txt"
test_file.write_text("cached content")
mock_response = MagicMock()
mock_response.status_code = 304
mock_head.return_value = mock_response
url = "https://example.com/file.txt"
external_files.has_remote_file_changed(url, str(test_file))
call_args = mock_head.call_args
assert call_args[1]["timeout"] == external_files.NETWORK_TIMEOUT
def test_compute_local_file_dir_creates_parent_dirs(setup_core: Path) -> None:
"""Test compute_local_file_dir creates parent directories."""
domain = "level1/level2/level3/level4"
result = external_files.compute_local_file_dir(domain)
assert result.exists()
assert result.is_dir()
assert result.parent.name == "level3"
assert result.parent.parent.name == "level2"
assert result.parent.parent.parent.name == "level1"
def test_is_file_recent_handles_float_seconds(setup_core: Path) -> None:
"""Test is_file_recent works with float seconds in TimePeriod."""
test_file = setup_core / "test.txt"
test_file.write_text("content")
refresh = TimePeriod(seconds=3600.5)
result = external_files.is_file_recent(str(test_file), refresh)
assert result is True

View File

@@ -1,129 +0,0 @@
"""Tests for platformio_api.py path functions."""
from pathlib import Path
from unittest.mock import patch
from esphome import platformio_api
from esphome.core import CORE
def test_idedata_firmware_elf_path(setup_core: Path) -> None:
"""Test IDEData.firmware_elf_path returns correct path."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
raw_data = {"prog_path": "/path/to/firmware.elf"}
idedata = platformio_api.IDEData(raw_data)
assert idedata.firmware_elf_path == "/path/to/firmware.elf"
def test_idedata_firmware_bin_path(setup_core: Path) -> None:
"""Test IDEData.firmware_bin_path returns Path with .bin extension."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
prog_path = str(Path("/path/to/firmware.elf"))
raw_data = {"prog_path": prog_path}
idedata = platformio_api.IDEData(raw_data)
result = idedata.firmware_bin_path
assert isinstance(result, str)
expected = str(Path("/path/to/firmware.bin"))
assert result == expected
assert result.endswith(".bin")
def test_idedata_firmware_bin_path_preserves_directory(setup_core: Path) -> None:
"""Test firmware_bin_path preserves the directory structure."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
prog_path = str(Path("/complex/path/to/build/firmware.elf"))
raw_data = {"prog_path": prog_path}
idedata = platformio_api.IDEData(raw_data)
result = idedata.firmware_bin_path
expected = str(Path("/complex/path/to/build/firmware.bin"))
assert result == expected
def test_idedata_extra_flash_images(setup_core: Path) -> None:
"""Test IDEData.extra_flash_images returns list of FlashImage objects."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
raw_data = {
"prog_path": "/path/to/firmware.elf",
"extra": {
"flash_images": [
{"path": "/path/to/bootloader.bin", "offset": "0x1000"},
{"path": "/path/to/partition.bin", "offset": "0x8000"},
]
},
}
idedata = platformio_api.IDEData(raw_data)
images = idedata.extra_flash_images
assert len(images) == 2
assert all(isinstance(img, platformio_api.FlashImage) for img in images)
assert images[0].path == "/path/to/bootloader.bin"
assert images[0].offset == "0x1000"
assert images[1].path == "/path/to/partition.bin"
assert images[1].offset == "0x8000"
def test_idedata_extra_flash_images_empty(setup_core: Path) -> None:
"""Test extra_flash_images returns empty list when no extra images."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
raw_data = {"prog_path": "/path/to/firmware.elf", "extra": {"flash_images": []}}
idedata = platformio_api.IDEData(raw_data)
images = idedata.extra_flash_images
assert images == []
def test_idedata_cc_path(setup_core: Path) -> None:
"""Test IDEData.cc_path returns compiler path."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
raw_data = {
"prog_path": "/path/to/firmware.elf",
"cc_path": "/Users/test/.platformio/packages/toolchain-xtensa32/bin/xtensa-esp32-elf-gcc",
}
idedata = platformio_api.IDEData(raw_data)
assert (
idedata.cc_path
== "/Users/test/.platformio/packages/toolchain-xtensa32/bin/xtensa-esp32-elf-gcc"
)
def test_flash_image_dataclass() -> None:
"""Test FlashImage dataclass stores path and offset correctly."""
image = platformio_api.FlashImage(path="/path/to/image.bin", offset="0x10000")
assert image.path == "/path/to/image.bin"
assert image.offset == "0x10000"
def test_load_idedata_returns_dict(setup_core: Path) -> None:
"""Test _load_idedata returns parsed idedata dict when successful."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
# Create required files
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
platformio_ini.touch()
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
idedata_path.parent.mkdir(parents=True, exist_ok=True)
idedata_path.write_text('{"prog_path": "/test/firmware.elf"}')
with patch("esphome.platformio_api.run_platformio_cli_run") as mock_run:
mock_run.return_value = '{"prog_path": "/test/firmware.elf"}'
config = {"name": "test"}
result = platformio_api._load_idedata(config)
assert result is not None
assert isinstance(result, dict)
assert result["prog_path"] == "/test/firmware.elf"

View File

@@ -1,182 +0,0 @@
"""Tests for storage_json.py path functions."""
from pathlib import Path
import sys
from unittest.mock import patch
import pytest
from esphome import storage_json
from esphome.core import CORE
def test_storage_path(setup_core: Path) -> None:
"""Test storage_path returns correct path for current config."""
CORE.config_path = str(setup_core / "my_device.yaml")
result = storage_json.storage_path()
data_dir = Path(CORE.data_dir)
expected = str(data_dir / "storage" / "my_device.yaml.json")
assert result == expected
def test_ext_storage_path(setup_core: Path) -> None:
"""Test ext_storage_path returns correct path for given filename."""
result = storage_json.ext_storage_path("other_device.yaml")
data_dir = Path(CORE.data_dir)
expected = str(data_dir / "storage" / "other_device.yaml.json")
assert result == expected
def test_ext_storage_path_handles_various_extensions(setup_core: Path) -> None:
"""Test ext_storage_path works with different file extensions."""
result_yml = storage_json.ext_storage_path("device.yml")
assert result_yml.endswith("device.yml.json")
result_no_ext = storage_json.ext_storage_path("device")
assert result_no_ext.endswith("device.json")
result_path = storage_json.ext_storage_path("my/device.yaml")
assert result_path.endswith("device.yaml.json")
def test_esphome_storage_path(setup_core: Path) -> None:
"""Test esphome_storage_path returns correct path."""
result = storage_json.esphome_storage_path()
data_dir = Path(CORE.data_dir)
expected = str(data_dir / "esphome.json")
assert result == expected
def test_ignored_devices_storage_path(setup_core: Path) -> None:
"""Test ignored_devices_storage_path returns correct path."""
result = storage_json.ignored_devices_storage_path()
data_dir = Path(CORE.data_dir)
expected = str(data_dir / "ignored-devices.json")
assert result == expected
def test_trash_storage_path(setup_core: Path) -> None:
"""Test trash_storage_path returns correct path."""
CORE.config_path = str(setup_core / "configs" / "device.yaml")
result = storage_json.trash_storage_path()
expected = str(setup_core / "configs" / "trash")
assert result == expected
def test_archive_storage_path(setup_core: Path) -> None:
"""Test archive_storage_path returns correct path."""
CORE.config_path = str(setup_core / "configs" / "device.yaml")
result = storage_json.archive_storage_path()
expected = str(setup_core / "configs" / "archive")
assert result == expected
def test_storage_path_with_subdirectory(setup_core: Path) -> None:
"""Test storage paths work correctly when config is in subdirectory."""
subdir = setup_core / "configs" / "basement"
subdir.mkdir(parents=True, exist_ok=True)
CORE.config_path = str(subdir / "sensor.yaml")
result = storage_json.storage_path()
data_dir = Path(CORE.data_dir)
expected = str(data_dir / "storage" / "sensor.yaml.json")
assert result == expected
def test_storage_json_firmware_bin_path_property(setup_core: Path) -> None:
"""Test StorageJSON firmware_bin_path property."""
storage = storage_json.StorageJSON(
storage_version=1,
name="test_device",
friendly_name="Test Device",
comment=None,
esphome_version="2024.1.0",
src_version=None,
address="192.168.1.100",
web_port=80,
target_platform="ESP32",
build_path="build/test_device",
firmware_bin_path="/path/to/firmware.bin",
loaded_integrations={"wifi", "api"},
loaded_platforms=set(),
no_mdns=False,
)
assert storage.firmware_bin_path == "/path/to/firmware.bin"
def test_storage_json_save_creates_directory(setup_core: Path, tmp_path: Path) -> None:
"""Test StorageJSON.save creates storage directory if it doesn't exist."""
storage_dir = tmp_path / "new_data" / "storage"
storage_file = storage_dir / "test.json"
assert not storage_dir.exists()
storage = storage_json.StorageJSON(
storage_version=1,
name="test",
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=None,
address="test.local",
web_port=None,
target_platform="ESP8266",
build_path=None,
firmware_bin_path=None,
loaded_integrations=set(),
loaded_platforms=set(),
no_mdns=False,
)
with patch("esphome.storage_json.write_file_if_changed") as mock_write:
storage.save(str(storage_file))
mock_write.assert_called_once()
call_args = mock_write.call_args[0]
assert call_args[0] == str(storage_file)
def test_storage_json_from_wizard(setup_core: Path) -> None:
"""Test StorageJSON.from_wizard creates correct storage object."""
storage = storage_json.StorageJSON.from_wizard(
name="my_device",
friendly_name="My Device",
address="my_device.local",
platform="ESP32",
)
assert storage.name == "my_device"
assert storage.friendly_name == "My Device"
assert storage.address == "my_device.local"
assert storage.target_platform == "ESP32"
assert storage.build_path is None
assert storage.firmware_bin_path is None
@pytest.mark.skipif(sys.platform == "win32", reason="HA addons don't run on Windows")
@patch("esphome.core.is_ha_addon")
def test_storage_paths_with_ha_addon(mock_is_ha_addon: bool, tmp_path: Path) -> None:
"""Test storage paths when running as Home Assistant addon."""
mock_is_ha_addon.return_value = True
CORE.config_path = str(tmp_path / "test.yaml")
result = storage_json.storage_path()
# When is_ha_addon is True, CORE.data_dir returns "/data"
# This is the standard mount point for HA addon containers
expected = str(Path("/data") / "storage" / "test.yaml.json")
assert result == expected
result = storage_json.esphome_storage_path()
expected = str(Path("/data") / "esphome.json")
assert result == expected

View File

@@ -141,170 +141,3 @@ def test_list_yaml_files_mixed_extensions(tmp_path: Path) -> None:
str(yaml_file),
str(yml_file),
}
def test_list_yaml_files_does_not_recurse_into_subdirectories(tmp_path: Path) -> None:
"""Test that list_yaml_files only finds files in specified directory, not subdirectories."""
# Create directory structure with YAML files at different depths
root = tmp_path / "configs"
root.mkdir()
# Create YAML files in the root directory
(root / "config1.yaml").write_text("test: 1")
(root / "config2.yml").write_text("test: 2")
(root / "device.yaml").write_text("test: device")
# Create subdirectory with YAML files (should NOT be found)
subdir = root / "subdir"
subdir.mkdir()
(subdir / "nested1.yaml").write_text("test: nested1")
(subdir / "nested2.yml").write_text("test: nested2")
# Create deeper subdirectory (should NOT be found)
deep_subdir = subdir / "deeper"
deep_subdir.mkdir()
(deep_subdir / "very_nested.yaml").write_text("test: very_nested")
# Test listing files from the root directory
result = util.list_yaml_files([str(root)])
# Should only find the 3 files in root, not the 3 in subdirectories
assert len(result) == 3
# Check that only root-level files are found
assert str(root / "config1.yaml") in result
assert str(root / "config2.yml") in result
assert str(root / "device.yaml") in result
# Ensure nested files are NOT found
for r in result:
assert "subdir" not in r
assert "deeper" not in r
assert "nested1.yaml" not in r
assert "nested2.yml" not in r
assert "very_nested.yaml" not in r
def test_list_yaml_files_excludes_secrets(tmp_path: Path) -> None:
"""Test that secrets.yaml and secrets.yml are excluded."""
root = tmp_path / "configs"
root.mkdir()
# Create various YAML files including secrets
(root / "config.yaml").write_text("test: config")
(root / "secrets.yaml").write_text("wifi_password: secret123")
(root / "secrets.yml").write_text("api_key: secret456")
(root / "device.yaml").write_text("test: device")
result = util.list_yaml_files([str(root)])
# Should find 2 files (config.yaml and device.yaml), not secrets
assert len(result) == 2
assert str(root / "config.yaml") in result
assert str(root / "device.yaml") in result
assert str(root / "secrets.yaml") not in result
assert str(root / "secrets.yml") not in result
def test_list_yaml_files_excludes_hidden_files(tmp_path: Path) -> None:
"""Test that hidden files (starting with .) are excluded."""
root = tmp_path / "configs"
root.mkdir()
# Create regular and hidden YAML files
(root / "config.yaml").write_text("test: config")
(root / ".hidden.yaml").write_text("test: hidden")
(root / ".backup.yml").write_text("test: backup")
(root / "device.yaml").write_text("test: device")
result = util.list_yaml_files([str(root)])
# Should find only non-hidden files
assert len(result) == 2
assert str(root / "config.yaml") in result
assert str(root / "device.yaml") in result
assert str(root / ".hidden.yaml") not in result
assert str(root / ".backup.yml") not in result
def test_filter_yaml_files_basic() -> None:
"""Test filter_yaml_files function."""
files = [
"/path/to/config.yaml",
"/path/to/device.yml",
"/path/to/readme.txt",
"/path/to/script.py",
"/path/to/data.json",
"/path/to/another.yaml",
]
result = util.filter_yaml_files(files)
assert len(result) == 3
assert "/path/to/config.yaml" in result
assert "/path/to/device.yml" in result
assert "/path/to/another.yaml" in result
assert "/path/to/readme.txt" not in result
assert "/path/to/script.py" not in result
assert "/path/to/data.json" not in result
def test_filter_yaml_files_excludes_secrets() -> None:
"""Test that filter_yaml_files excludes secrets files."""
files = [
"/path/to/config.yaml",
"/path/to/secrets.yaml",
"/path/to/secrets.yml",
"/path/to/device.yaml",
"/some/dir/secrets.yaml",
]
result = util.filter_yaml_files(files)
assert len(result) == 2
assert "/path/to/config.yaml" in result
assert "/path/to/device.yaml" in result
assert "/path/to/secrets.yaml" not in result
assert "/path/to/secrets.yml" not in result
assert "/some/dir/secrets.yaml" not in result
def test_filter_yaml_files_excludes_hidden() -> None:
"""Test that filter_yaml_files excludes hidden files."""
files = [
"/path/to/config.yaml",
"/path/to/.hidden.yaml",
"/path/to/.backup.yml",
"/path/to/device.yaml",
"/some/dir/.config.yaml",
]
result = util.filter_yaml_files(files)
assert len(result) == 2
assert "/path/to/config.yaml" in result
assert "/path/to/device.yaml" in result
assert "/path/to/.hidden.yaml" not in result
assert "/path/to/.backup.yml" not in result
assert "/some/dir/.config.yaml" not in result
def test_filter_yaml_files_case_sensitive() -> None:
"""Test that filter_yaml_files is case-sensitive for extensions."""
files = [
"/path/to/config.yaml",
"/path/to/config.YAML",
"/path/to/config.YML",
"/path/to/config.Yaml",
"/path/to/config.yml",
]
result = util.filter_yaml_files(files)
# Should only match lowercase .yaml and .yml
assert len(result) == 2
assert "/path/to/config.yaml" in result
assert "/path/to/config.yml" in result
assert "/path/to/config.YAML" not in result
assert "/path/to/config.YML" not in result
assert "/path/to/config.Yaml" not in result

View File

@@ -1,34 +1,13 @@
"""Test writer module functionality."""
from collections.abc import Callable
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from esphome.core import EsphomeError
from esphome.storage_json import StorageJSON
from esphome.writer import (
CPP_AUTO_GENERATE_BEGIN,
CPP_AUTO_GENERATE_END,
CPP_INCLUDE_BEGIN,
CPP_INCLUDE_END,
GITIGNORE_CONTENT,
clean_build,
clean_cmake_cache,
storage_should_clean,
update_storage_json,
write_cpp,
write_gitignore,
)
@pytest.fixture
def mock_copy_src_tree():
"""Mock copy_src_tree to avoid side effects during tests."""
with patch("esphome.writer.copy_src_tree"):
yield
from esphome.writer import storage_should_clean, update_storage_json
@pytest.fixture
@@ -239,396 +218,3 @@ def test_update_storage_json_logging_components_removed(
# Verify save was called
new_storage.save.assert_called_once_with("/test/path")
@patch("esphome.writer.CORE")
def test_clean_cmake_cache(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_cmake_cache removes CMakeCache.txt file."""
# Create directory structure
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
device_dir = pioenvs_dir / "test_device"
device_dir.mkdir()
cmake_cache_file = device_dir / "CMakeCache.txt"
cmake_cache_file.write_text("# CMake cache file")
# Setup mocks
mock_core.relative_pioenvs_path.side_effect = [
str(pioenvs_dir), # First call for directory check
str(cmake_cache_file), # Second call for file path
]
mock_core.name = "test_device"
# Verify file exists before
assert cmake_cache_file.exists()
# Call the function
with caplog.at_level("INFO"):
clean_cmake_cache()
# Verify file was removed
assert not cmake_cache_file.exists()
# Verify logging
assert "Deleting" in caplog.text
assert "CMakeCache.txt" in caplog.text
@patch("esphome.writer.CORE")
def test_clean_cmake_cache_no_pioenvs_dir(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test clean_cmake_cache when pioenvs directory doesn't exist."""
# Setup non-existent directory path
pioenvs_dir = tmp_path / ".pioenvs"
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
# Verify directory doesn't exist
assert not pioenvs_dir.exists()
# Call the function - should not crash
clean_cmake_cache()
# Verify directory still doesn't exist
assert not pioenvs_dir.exists()
@patch("esphome.writer.CORE")
def test_clean_cmake_cache_no_cmake_file(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test clean_cmake_cache when CMakeCache.txt doesn't exist."""
# Create directory structure without CMakeCache.txt
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
device_dir = pioenvs_dir / "test_device"
device_dir.mkdir()
cmake_cache_file = device_dir / "CMakeCache.txt"
# Setup mocks
mock_core.relative_pioenvs_path.side_effect = [
str(pioenvs_dir), # First call for directory check
str(cmake_cache_file), # Second call for file path
]
mock_core.name = "test_device"
# Verify file doesn't exist
assert not cmake_cache_file.exists()
# Call the function - should not crash
clean_cmake_cache()
# Verify file still doesn't exist
assert not cmake_cache_file.exists()
@patch("esphome.writer.CORE")
def test_clean_build(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_build removes all build artifacts."""
# Create directory structure and files
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
(pioenvs_dir / "test_file.o").write_text("object file")
piolibdeps_dir = tmp_path / ".piolibdeps"
piolibdeps_dir.mkdir()
(piolibdeps_dir / "library").mkdir()
dependencies_lock = tmp_path / "dependencies.lock"
dependencies_lock.write_text("lock file")
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
mock_core.relative_build_path.return_value = str(dependencies_lock)
# Verify all exist before
assert pioenvs_dir.exists()
assert piolibdeps_dir.exists()
assert dependencies_lock.exists()
# Call the function
with caplog.at_level("INFO"):
clean_build()
# Verify all were removed
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Verify logging
assert "Deleting" in caplog.text
assert ".pioenvs" in caplog.text
assert ".piolibdeps" in caplog.text
assert "dependencies.lock" in caplog.text
@patch("esphome.writer.CORE")
def test_clean_build_partial_exists(
mock_core: MagicMock,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test clean_build when only some paths exist."""
# Create only pioenvs directory
pioenvs_dir = tmp_path / ".pioenvs"
pioenvs_dir.mkdir()
(pioenvs_dir / "test_file.o").write_text("object file")
piolibdeps_dir = tmp_path / ".piolibdeps"
dependencies_lock = tmp_path / "dependencies.lock"
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
mock_core.relative_build_path.return_value = str(dependencies_lock)
# Verify only pioenvs exists
assert pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Call the function
with caplog.at_level("INFO"):
clean_build()
# Verify only existing path was removed
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Verify logging - only pioenvs should be logged
assert "Deleting" in caplog.text
assert ".pioenvs" in caplog.text
assert ".piolibdeps" not in caplog.text
assert "dependencies.lock" not in caplog.text
@patch("esphome.writer.CORE")
def test_clean_build_nothing_exists(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test clean_build when no build artifacts exist."""
# Setup paths that don't exist
pioenvs_dir = tmp_path / ".pioenvs"
piolibdeps_dir = tmp_path / ".piolibdeps"
dependencies_lock = tmp_path / "dependencies.lock"
# Setup mocks
mock_core.relative_pioenvs_path.return_value = str(pioenvs_dir)
mock_core.relative_piolibdeps_path.return_value = str(piolibdeps_dir)
mock_core.relative_build_path.return_value = str(dependencies_lock)
# Verify nothing exists
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
# Call the function - should not crash
clean_build()
# Verify nothing was created
assert not pioenvs_dir.exists()
assert not piolibdeps_dir.exists()
assert not dependencies_lock.exists()
@patch("esphome.writer.CORE")
def test_write_gitignore_creates_new_file(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_gitignore creates a new .gitignore file when it doesn't exist."""
gitignore_path = tmp_path / ".gitignore"
# Setup mocks
mock_core.relative_config_path.return_value = str(gitignore_path)
# Verify file doesn't exist
assert not gitignore_path.exists()
# Call the function
write_gitignore()
# Verify file was created with correct content
assert gitignore_path.exists()
assert gitignore_path.read_text() == GITIGNORE_CONTENT
@patch("esphome.writer.CORE")
def test_write_gitignore_skips_existing_file(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_gitignore doesn't overwrite existing .gitignore file."""
gitignore_path = tmp_path / ".gitignore"
existing_content = "# Custom gitignore\n/custom_dir/\n"
gitignore_path.write_text(existing_content)
# Setup mocks
mock_core.relative_config_path.return_value = str(gitignore_path)
# Verify file exists with custom content
assert gitignore_path.exists()
assert gitignore_path.read_text() == existing_content
# Call the function
write_gitignore()
# Verify file was not modified
assert gitignore_path.exists()
assert gitignore_path.read_text() == existing_content
@patch("esphome.writer.write_file_if_changed") # Mock to capture output
@patch("esphome.writer.copy_src_tree") # Keep this mock as it's complex
@patch("esphome.writer.CORE")
def test_write_cpp_with_existing_file(
mock_core: MagicMock,
mock_copy_src_tree: MagicMock,
mock_write_file: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp when main.cpp already exists."""
# Create a real file with markers
main_cpp = tmp_path / "main.cpp"
existing_content = f"""#include "esphome.h"
{CPP_INCLUDE_BEGIN}
// Old includes
{CPP_INCLUDE_END}
void setup() {{
{CPP_AUTO_GENERATE_BEGIN}
// Old code
{CPP_AUTO_GENERATE_END}
}}
void loop() {{}}"""
main_cpp.write_text(existing_content)
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
mock_core.cpp_global_section = "// Global section"
# Call the function
test_code = " // New generated code"
write_cpp(test_code)
# Verify copy_src_tree was called
mock_copy_src_tree.assert_called_once()
# Get the content that would be written
mock_write_file.assert_called_once()
written_path, written_content = mock_write_file.call_args[0]
# Check that markers are preserved and content is updated
assert CPP_INCLUDE_BEGIN in written_content
assert CPP_INCLUDE_END in written_content
assert CPP_AUTO_GENERATE_BEGIN in written_content
assert CPP_AUTO_GENERATE_END in written_content
assert test_code in written_content
assert "// Global section" in written_content
@patch("esphome.writer.write_file_if_changed") # Mock to capture output
@patch("esphome.writer.copy_src_tree") # Keep this mock as it's complex
@patch("esphome.writer.CORE")
def test_write_cpp_creates_new_file(
mock_core: MagicMock,
mock_copy_src_tree: MagicMock,
mock_write_file: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp when main.cpp doesn't exist."""
# Setup path for new file
main_cpp = tmp_path / "main.cpp"
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
mock_core.cpp_global_section = "// Global section"
# Verify file doesn't exist
assert not main_cpp.exists()
# Call the function
test_code = " // Generated code"
write_cpp(test_code)
# Verify copy_src_tree was called
mock_copy_src_tree.assert_called_once()
# Get the content that would be written
mock_write_file.assert_called_once()
written_path, written_content = mock_write_file.call_args[0]
assert written_path == str(main_cpp)
# Check that all necessary parts are in the new file
assert '#include "esphome.h"' in written_content
assert CPP_INCLUDE_BEGIN in written_content
assert CPP_INCLUDE_END in written_content
assert CPP_AUTO_GENERATE_BEGIN in written_content
assert CPP_AUTO_GENERATE_END in written_content
assert test_code in written_content
assert "void setup()" in written_content
assert "void loop()" in written_content
assert "App.setup();" in written_content
assert "App.loop();" in written_content
@pytest.mark.usefixtures("mock_copy_src_tree")
@patch("esphome.writer.CORE")
def test_write_cpp_with_missing_end_marker(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp raises error when end marker is missing."""
# Create a file with begin marker but no end marker
main_cpp = tmp_path / "main.cpp"
existing_content = f"""#include "esphome.h"
{CPP_AUTO_GENERATE_BEGIN}
// Code without end marker"""
main_cpp.write_text(existing_content)
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
# Call should raise an error
with pytest.raises(EsphomeError, match="Could not find auto generated code end"):
write_cpp("// New code")
@pytest.mark.usefixtures("mock_copy_src_tree")
@patch("esphome.writer.CORE")
def test_write_cpp_with_duplicate_markers(
mock_core: MagicMock,
tmp_path: Path,
) -> None:
"""Test write_cpp raises error when duplicate markers exist."""
# Create a file with duplicate begin markers
main_cpp = tmp_path / "main.cpp"
existing_content = f"""#include "esphome.h"
{CPP_AUTO_GENERATE_BEGIN}
// First section
{CPP_AUTO_GENERATE_END}
{CPP_AUTO_GENERATE_BEGIN}
// Duplicate section
{CPP_AUTO_GENERATE_END}"""
main_cpp.write_text(existing_content)
# Setup mocks
mock_core.relative_src_path.return_value = str(main_cpp)
# Call should raise an error
with pytest.raises(EsphomeError, match="Found multiple auto generate code begins"):
write_cpp("// New code")

View File

@@ -1,26 +1,9 @@
from pathlib import Path
import shutil
from unittest.mock import patch
import pytest
from esphome import core, yaml_util
from esphome import yaml_util
from esphome.components import substitutions
from esphome.core import EsphomeError
from esphome.util import OrderedDict
@pytest.fixture(autouse=True)
def clear_secrets_cache() -> None:
"""Clear the secrets cache before each test."""
yaml_util._SECRET_VALUES.clear()
yaml_util._SECRET_CACHE.clear()
yield
yaml_util._SECRET_VALUES.clear()
yaml_util._SECRET_CACHE.clear()
def test_include_with_vars(fixture_path: Path) -> None:
def test_include_with_vars(fixture_path):
yaml_file = fixture_path / "yaml_util" / "includetest.yaml"
actual = yaml_util.load_yaml(yaml_file)
@@ -79,202 +62,3 @@ def test_parsing_with_custom_loader(fixture_path):
assert loader_calls[0].endswith("includes/included.yaml")
assert loader_calls[1].endswith("includes/list.yaml")
assert loader_calls[2].endswith("includes/scalar.yaml")
def test_construct_secret_simple(fixture_path: Path) -> None:
"""Test loading a YAML file with !secret tags."""
yaml_file = fixture_path / "yaml_util" / "test_secret.yaml"
actual = yaml_util.load_yaml(yaml_file)
# Check that secrets were properly loaded
assert actual["wifi"]["password"] == "super_secret_wifi"
assert actual["api"]["encryption"]["key"] == "0123456789abcdef"
assert actual["sensor"][0]["id"] == "my_secret_value"
def test_construct_secret_missing(fixture_path: Path, tmp_path: Path) -> None:
"""Test that missing secrets raise proper errors."""
# Create a YAML file with a secret that doesn't exist
test_yaml = tmp_path / "test.yaml"
test_yaml.write_text("""
esphome:
name: test
wifi:
password: !secret nonexistent_secret
""")
# Create an empty secrets file
secrets_yaml = tmp_path / "secrets.yaml"
secrets_yaml.write_text("some_other_secret: value")
with pytest.raises(EsphomeError, match="Secret 'nonexistent_secret' not defined"):
yaml_util.load_yaml(str(test_yaml))
def test_construct_secret_no_secrets_file(tmp_path: Path) -> None:
"""Test that missing secrets.yaml file raises proper error."""
# Create a YAML file with a secret but no secrets.yaml
test_yaml = tmp_path / "test.yaml"
test_yaml.write_text("""
wifi:
password: !secret some_secret
""")
# Mock CORE.config_path to avoid NoneType error
with (
patch.object(core.CORE, "config_path", str(tmp_path / "main.yaml")),
pytest.raises(EsphomeError, match="secrets.yaml"),
):
yaml_util.load_yaml(str(test_yaml))
def test_construct_secret_fallback_to_main_config_dir(
fixture_path: Path, tmp_path: Path
) -> None:
"""Test fallback to main config directory for secrets."""
# Create a subdirectory with a YAML file that uses secrets
subdir = tmp_path / "subdir"
subdir.mkdir()
test_yaml = subdir / "test.yaml"
test_yaml.write_text("""
wifi:
password: !secret test_secret
""")
# Create secrets.yaml in the main directory
main_secrets = tmp_path / "secrets.yaml"
main_secrets.write_text("test_secret: main_secret_value")
# Mock CORE.config_path to point to main directory
with patch.object(core.CORE, "config_path", str(tmp_path / "main.yaml")):
actual = yaml_util.load_yaml(str(test_yaml))
assert actual["wifi"]["password"] == "main_secret_value"
def test_construct_include_dir_named(fixture_path: Path, tmp_path: Path) -> None:
"""Test !include_dir_named directive."""
# Copy fixture directory to temporary location
src_dir = fixture_path / "yaml_util"
dst_dir = tmp_path / "yaml_util"
shutil.copytree(src_dir, dst_dir)
# Create test YAML that uses include_dir_named
test_yaml = dst_dir / "test_include_named.yaml"
test_yaml.write_text("""
sensor: !include_dir_named named_dir
""")
actual = yaml_util.load_yaml(str(test_yaml))
actual_sensor = actual["sensor"]
# Check that files were loaded with their names as keys
assert isinstance(actual_sensor, OrderedDict)
assert "sensor1" in actual_sensor
assert "sensor2" in actual_sensor
assert "sensor3" in actual_sensor # Files from subdirs are included with basename
# Check content of loaded files
assert actual_sensor["sensor1"]["platform"] == "template"
assert actual_sensor["sensor1"]["name"] == "Sensor 1"
assert actual_sensor["sensor2"]["platform"] == "template"
assert actual_sensor["sensor2"]["name"] == "Sensor 2"
# Check that subdirectory files are included with their basename
assert actual_sensor["sensor3"]["platform"] == "template"
assert actual_sensor["sensor3"]["name"] == "Sensor 3 in subdir"
# Check that hidden files and non-YAML files are not included
assert ".hidden" not in actual_sensor
assert "not_yaml" not in actual_sensor
def test_construct_include_dir_named_empty_dir(tmp_path: Path) -> None:
"""Test !include_dir_named with empty directory."""
# Create empty directory
empty_dir = tmp_path / "empty_dir"
empty_dir.mkdir()
test_yaml = tmp_path / "test.yaml"
test_yaml.write_text("""
sensor: !include_dir_named empty_dir
""")
actual = yaml_util.load_yaml(str(test_yaml))
# Should return empty OrderedDict
assert isinstance(actual["sensor"], OrderedDict)
assert len(actual["sensor"]) == 0
def test_construct_include_dir_named_with_dots(tmp_path: Path) -> None:
"""Test that include_dir_named ignores files starting with dots."""
# Create directory with various files
test_dir = tmp_path / "test_dir"
test_dir.mkdir()
# Create visible file
visible_file = test_dir / "visible.yaml"
visible_file.write_text("key: visible_value")
# Create hidden file
hidden_file = test_dir / ".hidden.yaml"
hidden_file.write_text("key: hidden_value")
# Create hidden directory with files
hidden_dir = test_dir / ".hidden_dir"
hidden_dir.mkdir()
hidden_subfile = hidden_dir / "subfile.yaml"
hidden_subfile.write_text("key: hidden_subfile_value")
test_yaml = tmp_path / "test.yaml"
test_yaml.write_text("""
test: !include_dir_named test_dir
""")
actual = yaml_util.load_yaml(str(test_yaml))
# Should only include visible file
assert "visible" in actual["test"]
assert actual["test"]["visible"]["key"] == "visible_value"
# Should not include hidden files or directories
assert ".hidden" not in actual["test"]
assert ".hidden_dir" not in actual["test"]
def test_find_files_recursive(fixture_path: Path, tmp_path: Path) -> None:
"""Test that _find_files works recursively through include_dir_named."""
# Copy fixture directory to temporary location
src_dir = fixture_path / "yaml_util"
dst_dir = tmp_path / "yaml_util"
shutil.copytree(src_dir, dst_dir)
# This indirectly tests _find_files by using include_dir_named
test_yaml = dst_dir / "test_include_recursive.yaml"
test_yaml.write_text("""
all_sensors: !include_dir_named named_dir
""")
actual = yaml_util.load_yaml(str(test_yaml))
# Should find sensor1.yaml, sensor2.yaml, and subdir/sensor3.yaml (all flattened)
assert len(actual["all_sensors"]) == 3
assert "sensor1" in actual["all_sensors"]
assert "sensor2" in actual["all_sensors"]
assert "sensor3" in actual["all_sensors"]
def test_secret_values_tracking(fixture_path: Path) -> None:
"""Test that secret values are properly tracked for dumping."""
yaml_file = fixture_path / "yaml_util" / "test_secret.yaml"
yaml_util.load_yaml(yaml_file)
# Check that secret values are tracked
assert "super_secret_wifi" in yaml_util._SECRET_VALUES
assert yaml_util._SECRET_VALUES["super_secret_wifi"] == "wifi_password"
assert "0123456789abcdef" in yaml_util._SECRET_VALUES
assert yaml_util._SECRET_VALUES["0123456789abcdef"] == "api_key"