1
0
mirror of https://github.com/esphome/esphome.git synced 2025-09-16 18:22:22 +01:00

Merge remote-tracking branch 'upstream/dev' into zwave_proxy

This commit is contained in:
kbx81
2025-09-15 01:26:19 -05:00
34 changed files with 2419 additions and 124 deletions

View File

@@ -27,9 +27,6 @@ service APIConnection {
rpc subscribe_logs (SubscribeLogsRequest) returns (void) {}
rpc subscribe_homeassistant_services (SubscribeHomeassistantServicesRequest) returns (void) {}
rpc subscribe_home_assistant_states (SubscribeHomeAssistantStatesRequest) returns (void) {}
rpc get_time (GetTimeRequest) returns (GetTimeResponse) {
option (needs_authentication) = false;
}
rpc execute_service (ExecuteServiceRequest) returns (void) {}
rpc noise_encryption_set_key (NoiseEncryptionSetKeyRequest) returns (NoiseEncryptionSetKeyResponse) {}
@@ -139,6 +136,7 @@ message ConnectRequest {
option (id) = 3;
option (source) = SOURCE_CLIENT;
option (no_delay) = true;
option (ifdef) = "USE_API_PASSWORD";
// The password to log in with
string password = 1;
@@ -150,6 +148,7 @@ message ConnectResponse {
option (id) = 4;
option (source) = SOURCE_SERVER;
option (no_delay) = true;
option (ifdef) = "USE_API_PASSWORD";
bool invalid_password = 1;
}
@@ -815,12 +814,12 @@ message HomeAssistantStateResponse {
// ==================== IMPORT TIME ====================
message GetTimeRequest {
option (id) = 36;
option (source) = SOURCE_BOTH;
option (source) = SOURCE_SERVER;
}
message GetTimeResponse {
option (id) = 37;
option (source) = SOURCE_BOTH;
option (source) = SOURCE_CLIENT;
option (no_delay) = true;
fixed32 epoch_seconds = 1;

View File

@@ -45,6 +45,8 @@ static constexpr uint8_t MAX_PING_RETRIES = 60;
static constexpr uint16_t PING_RETRY_INTERVAL = 1000;
static constexpr uint32_t KEEPALIVE_DISCONNECT_TIMEOUT = (KEEPALIVE_TIMEOUT_MS * 5) / 2;
static constexpr auto ESPHOME_VERSION_REF = StringRef::from_lit(ESPHOME_VERSION);
static const char *const TAG = "api.connection";
#ifdef USE_CAMERA
static const int CAMERA_STOP_STREAM = 5000;
@@ -1084,12 +1086,6 @@ void APIConnection::on_get_time_response(const GetTimeResponse &value) {
}
#endif
bool APIConnection::send_get_time_response(const GetTimeRequest &msg) {
GetTimeResponse resp;
resp.epoch_seconds = ::time(nullptr);
return this->send_message(resp, GetTimeResponse::MESSAGE_TYPE);
}
#ifdef USE_BLUETOOTH_PROXY
void APIConnection::subscribe_bluetooth_le_advertisements(const SubscribeBluetoothLEAdvertisementsRequest &msg) {
bluetooth_proxy::global_bluetooth_proxy->subscribe_api_connection(this, msg.flags);
@@ -1388,9 +1384,8 @@ bool APIConnection::send_hello_response(const HelloRequest &msg) {
HelloResponse resp;
resp.api_version_major = 1;
resp.api_version_minor = 12;
// Temporary string for concatenation - will be valid during send_message call
std::string server_info = App.get_name() + " (esphome v" ESPHOME_VERSION ")";
resp.set_server_info(StringRef(server_info));
// Send only the version string - the client only logs this for debugging and doesn't use it otherwise
resp.set_server_info(ESPHOME_VERSION_REF);
resp.set_name(StringRef(App.get_name()));
#ifdef USE_API_PASSWORD
@@ -1403,20 +1398,17 @@ bool APIConnection::send_hello_response(const HelloRequest &msg) {
return this->send_message(resp, HelloResponse::MESSAGE_TYPE);
}
bool APIConnection::send_connect_response(const ConnectRequest &msg) {
bool correct = true;
#ifdef USE_API_PASSWORD
correct = this->parent_->check_password(msg.password);
#endif
bool APIConnection::send_connect_response(const ConnectRequest &msg) {
ConnectResponse resp;
// bool invalid_password = 1;
resp.invalid_password = !correct;
if (correct) {
resp.invalid_password = !this->parent_->check_password(msg.password);
if (!resp.invalid_password) {
this->complete_authentication_();
}
return this->send_message(resp, ConnectResponse::MESSAGE_TYPE);
}
#endif // USE_API_PASSWORD
bool APIConnection::send_ping_response(const PingRequest &msg) {
PingResponse resp;
@@ -1437,8 +1429,6 @@ bool APIConnection::send_device_info_response(const DeviceInfoRequest &msg) {
std::string mac_address = get_mac_address_pretty();
resp.set_mac_address(StringRef(mac_address));
// Compile-time StringRef constants
static constexpr auto ESPHOME_VERSION_REF = StringRef::from_lit(ESPHOME_VERSION);
resp.set_esphome_version(ESPHOME_VERSION_REF);
resp.set_compilation_time(App.get_compilation_time_ref());

View File

@@ -202,7 +202,9 @@ class APIConnection final : public APIServerConnection {
void on_get_time_response(const GetTimeResponse &value) override;
#endif
bool send_hello_response(const HelloRequest &msg) override;
#ifdef USE_API_PASSWORD
bool send_connect_response(const ConnectRequest &msg) override;
#endif
bool send_disconnect_response(const DisconnectRequest &msg) override;
bool send_ping_response(const PingRequest &msg) override;
bool send_device_info_response(const DeviceInfoRequest &msg) override;
@@ -224,7 +226,6 @@ class APIConnection final : public APIServerConnection {
#ifdef USE_API_HOMEASSISTANT_STATES
void subscribe_home_assistant_states(const SubscribeHomeAssistantStatesRequest &msg) override;
#endif
bool send_get_time_response(const GetTimeRequest &msg) override;
#ifdef USE_API_SERVICES
void execute_service(const ExecuteServiceRequest &msg) override;
#endif

View File

@@ -42,6 +42,7 @@ void HelloResponse::calculate_size(ProtoSize &size) const {
size.add_length(1, this->server_info_ref_.size());
size.add_length(1, this->name_ref_.size());
}
#ifdef USE_API_PASSWORD
bool ConnectRequest::decode_length(uint32_t field_id, ProtoLengthDelimited value) {
switch (field_id) {
case 1:
@@ -54,6 +55,7 @@ bool ConnectRequest::decode_length(uint32_t field_id, ProtoLengthDelimited value
}
void ConnectResponse::encode(ProtoWriteBuffer buffer) const { buffer.encode_bool(1, this->invalid_password); }
void ConnectResponse::calculate_size(ProtoSize &size) const { size.add_bool(1, this->invalid_password); }
#endif
#ifdef USE_AREAS
void AreaInfo::encode(ProtoWriteBuffer buffer) const {
buffer.encode_uint32(1, this->area_id);
@@ -927,14 +929,6 @@ bool GetTimeResponse::decode_32bit(uint32_t field_id, Proto32Bit value) {
}
return true;
}
void GetTimeResponse::encode(ProtoWriteBuffer buffer) const {
buffer.encode_fixed32(1, this->epoch_seconds);
buffer.encode_string(2, this->timezone_ref_);
}
void GetTimeResponse::calculate_size(ProtoSize &size) const {
size.add_fixed32(1, this->epoch_seconds);
size.add_length(1, this->timezone_ref_.size());
}
#ifdef USE_API_SERVICES
void ListEntitiesServicesArgument::encode(ProtoWriteBuffer buffer) const {
buffer.encode_string(1, this->name_ref_);

View File

@@ -366,6 +366,7 @@ class HelloResponse final : public ProtoMessage {
protected:
};
#ifdef USE_API_PASSWORD
class ConnectRequest final : public ProtoDecodableMessage {
public:
static constexpr uint8_t MESSAGE_TYPE = 3;
@@ -397,6 +398,7 @@ class ConnectResponse final : public ProtoMessage {
protected:
};
#endif
class DisconnectRequest final : public ProtoMessage {
public:
static constexpr uint8_t MESSAGE_TYPE = 5;
@@ -1189,10 +1191,6 @@ class GetTimeResponse final : public ProtoDecodableMessage {
#endif
uint32_t epoch_seconds{0};
std::string timezone{};
StringRef timezone_ref_{};
void set_timezone(const StringRef &ref) { this->timezone_ref_ = ref; }
void encode(ProtoWriteBuffer buffer) const override;
void calculate_size(ProtoSize &size) const override;
#ifdef HAS_PROTO_MESSAGE_DUMP
void dump_to(std::string &out) const override;
#endif

View File

@@ -681,8 +681,10 @@ void HelloResponse::dump_to(std::string &out) const {
dump_field(out, "server_info", this->server_info_ref_);
dump_field(out, "name", this->name_ref_);
}
#ifdef USE_API_PASSWORD
void ConnectRequest::dump_to(std::string &out) const { dump_field(out, "password", this->password); }
void ConnectResponse::dump_to(std::string &out) const { dump_field(out, "invalid_password", this->invalid_password); }
#endif
void DisconnectRequest::dump_to(std::string &out) const { out.append("DisconnectRequest {}"); }
void DisconnectResponse::dump_to(std::string &out) const { out.append("DisconnectResponse {}"); }
void PingRequest::dump_to(std::string &out) const { out.append("PingRequest {}"); }
@@ -1128,13 +1130,7 @@ void GetTimeRequest::dump_to(std::string &out) const { out.append("GetTimeReques
void GetTimeResponse::dump_to(std::string &out) const {
MessageDumpHelper helper(out, "GetTimeResponse");
dump_field(out, "epoch_seconds", this->epoch_seconds);
out.append(" timezone: ");
if (!this->timezone_ref_.empty()) {
out.append("'").append(this->timezone_ref_.c_str()).append("'");
} else {
out.append("'").append(this->timezone).append("'");
}
out.append("\n");
dump_field(out, "timezone", this->timezone);
}
#ifdef USE_API_SERVICES
void ListEntitiesServicesArgument::dump_to(std::string &out) const {

View File

@@ -24,6 +24,7 @@ void APIServerConnectionBase::read_message(uint32_t msg_size, uint32_t msg_type,
this->on_hello_request(msg);
break;
}
#ifdef USE_API_PASSWORD
case ConnectRequest::MESSAGE_TYPE: {
ConnectRequest msg;
msg.decode(msg_data, msg_size);
@@ -33,6 +34,7 @@ void APIServerConnectionBase::read_message(uint32_t msg_size, uint32_t msg_type,
this->on_connect_request(msg);
break;
}
#endif
case DisconnectRequest::MESSAGE_TYPE: {
DisconnectRequest msg;
// Empty message: no decode needed
@@ -160,15 +162,6 @@ void APIServerConnectionBase::read_message(uint32_t msg_size, uint32_t msg_type,
break;
}
#endif
case GetTimeRequest::MESSAGE_TYPE: {
GetTimeRequest msg;
// Empty message: no decode needed
#ifdef HAS_PROTO_MESSAGE_DUMP
ESP_LOGVV(TAG, "on_get_time_request: %s", msg.dump().c_str());
#endif
this->on_get_time_request(msg);
break;
}
case GetTimeResponse::MESSAGE_TYPE: {
GetTimeResponse msg;
msg.decode(msg_data, msg_size);
@@ -628,11 +621,13 @@ void APIServerConnection::on_hello_request(const HelloRequest &msg) {
this->on_fatal_error();
}
}
#ifdef USE_API_PASSWORD
void APIServerConnection::on_connect_request(const ConnectRequest &msg) {
if (!this->send_connect_response(msg)) {
this->on_fatal_error();
}
}
#endif
void APIServerConnection::on_disconnect_request(const DisconnectRequest &msg) {
if (!this->send_disconnect_response(msg)) {
this->on_fatal_error();
@@ -678,11 +673,6 @@ void APIServerConnection::on_subscribe_home_assistant_states_request(const Subsc
}
}
#endif
void APIServerConnection::on_get_time_request(const GetTimeRequest &msg) {
if (this->check_connection_setup_() && !this->send_get_time_response(msg)) {
this->on_fatal_error();
}
}
#ifdef USE_API_SERVICES
void APIServerConnection::on_execute_service_request(const ExecuteServiceRequest &msg) {
if (this->check_authenticated_()) {

View File

@@ -26,7 +26,9 @@ class APIServerConnectionBase : public ProtoService {
virtual void on_hello_request(const HelloRequest &value){};
#ifdef USE_API_PASSWORD
virtual void on_connect_request(const ConnectRequest &value){};
#endif
virtual void on_disconnect_request(const DisconnectRequest &value){};
virtual void on_disconnect_response(const DisconnectResponse &value){};
@@ -71,7 +73,7 @@ class APIServerConnectionBase : public ProtoService {
#ifdef USE_API_HOMEASSISTANT_STATES
virtual void on_home_assistant_state_response(const HomeAssistantStateResponse &value){};
#endif
virtual void on_get_time_request(const GetTimeRequest &value){};
virtual void on_get_time_response(const GetTimeResponse &value){};
#ifdef USE_API_SERVICES
@@ -219,7 +221,9 @@ class APIServerConnectionBase : public ProtoService {
class APIServerConnection : public APIServerConnectionBase {
public:
virtual bool send_hello_response(const HelloRequest &msg) = 0;
#ifdef USE_API_PASSWORD
virtual bool send_connect_response(const ConnectRequest &msg) = 0;
#endif
virtual bool send_disconnect_response(const DisconnectRequest &msg) = 0;
virtual bool send_ping_response(const PingRequest &msg) = 0;
virtual bool send_device_info_response(const DeviceInfoRequest &msg) = 0;
@@ -232,7 +236,6 @@ class APIServerConnection : public APIServerConnectionBase {
#ifdef USE_API_HOMEASSISTANT_STATES
virtual void subscribe_home_assistant_states(const SubscribeHomeAssistantStatesRequest &msg) = 0;
#endif
virtual bool send_get_time_response(const GetTimeRequest &msg) = 0;
#ifdef USE_API_SERVICES
virtual void execute_service(const ExecuteServiceRequest &msg) = 0;
#endif
@@ -347,7 +350,9 @@ class APIServerConnection : public APIServerConnectionBase {
#endif
protected:
void on_hello_request(const HelloRequest &msg) override;
#ifdef USE_API_PASSWORD
void on_connect_request(const ConnectRequest &msg) override;
#endif
void on_disconnect_request(const DisconnectRequest &msg) override;
void on_ping_request(const PingRequest &msg) override;
void on_device_info_request(const DeviceInfoRequest &msg) override;
@@ -360,7 +365,6 @@ class APIServerConnection : public APIServerConnectionBase {
#ifdef USE_API_HOMEASSISTANT_STATES
void on_subscribe_home_assistant_states_request(const SubscribeHomeAssistantStatesRequest &msg) override;
#endif
void on_get_time_request(const GetTimeRequest &msg) override;
#ifdef USE_API_SERVICES
void on_execute_service_request(const ExecuteServiceRequest &msg) override;
#endif

View File

@@ -130,7 +130,9 @@ class BluetoothProxy final : public esp32_ble_tracker::ESPBTDeviceListener, publ
std::string get_bluetooth_mac_address_pretty() {
const uint8_t *mac = esp_bt_dev_get_address();
return str_snprintf("%02X:%02X:%02X:%02X:%02X:%02X", 17, mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
char buf[18];
format_mac_addr_upper(mac, buf);
return std::string(buf);
}
protected:

View File

@@ -7,6 +7,7 @@
#include <cstdio>
#include <cinttypes>
#include "esphome/core/log.h"
#include "esphome/core/helpers.h"
namespace esphome::esp32_ble {
@@ -169,22 +170,42 @@ bool ESPBTUUID::operator==(const ESPBTUUID &uuid) const {
}
esp_bt_uuid_t ESPBTUUID::get_uuid() const { return this->uuid_; }
std::string ESPBTUUID::to_string() const {
char buf[40]; // Enough for 128-bit UUID with dashes
char *pos = buf;
switch (this->uuid_.len) {
case ESP_UUID_LEN_16:
return str_snprintf("0x%02X%02X", 6, this->uuid_.uuid.uuid16 >> 8, this->uuid_.uuid.uuid16 & 0xff);
*pos++ = '0';
*pos++ = 'x';
*pos++ = format_hex_pretty_char(this->uuid_.uuid.uuid16 >> 12);
*pos++ = format_hex_pretty_char((this->uuid_.uuid.uuid16 >> 8) & 0x0F);
*pos++ = format_hex_pretty_char((this->uuid_.uuid.uuid16 >> 4) & 0x0F);
*pos++ = format_hex_pretty_char(this->uuid_.uuid.uuid16 & 0x0F);
*pos = '\0';
return std::string(buf);
case ESP_UUID_LEN_32:
return str_snprintf("0x%02" PRIX32 "%02" PRIX32 "%02" PRIX32 "%02" PRIX32, 10, (this->uuid_.uuid.uuid32 >> 24),
(this->uuid_.uuid.uuid32 >> 16 & 0xff), (this->uuid_.uuid.uuid32 >> 8 & 0xff),
this->uuid_.uuid.uuid32 & 0xff);
*pos++ = '0';
*pos++ = 'x';
for (int shift = 28; shift >= 0; shift -= 4) {
*pos++ = format_hex_pretty_char((this->uuid_.uuid.uuid32 >> shift) & 0x0F);
}
*pos = '\0';
return std::string(buf);
default:
case ESP_UUID_LEN_128:
std::string buf;
// Format: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
for (int8_t i = 15; i >= 0; i--) {
buf += str_snprintf("%02X", 2, this->uuid_.uuid.uuid128[i]);
if (i == 6 || i == 8 || i == 10 || i == 12)
buf += "-";
uint8_t byte = this->uuid_.uuid.uuid128[i];
*pos++ = format_hex_pretty_char(byte >> 4);
*pos++ = format_hex_pretty_char(byte & 0x0F);
if (i == 12 || i == 10 || i == 8 || i == 6) {
*pos++ = '-';
}
}
return buf;
*pos = '\0';
return std::string(buf);
}
return "";
}

View File

@@ -31,12 +31,13 @@ void ESP32BLEBeacon::dump_config() {
char uuid[37];
char *bpos = uuid;
for (int8_t ii = 0; ii < 16; ++ii) {
bpos += sprintf(bpos, "%02X", this->uuid_[ii]);
*bpos++ = format_hex_pretty_char(this->uuid_[ii] >> 4);
*bpos++ = format_hex_pretty_char(this->uuid_[ii] & 0x0F);
if (ii == 3 || ii == 5 || ii == 7 || ii == 9) {
bpos += sprintf(bpos, "-");
*bpos++ = '-';
}
}
uuid[36] = '\0';
*bpos = '\0';
ESP_LOGCONFIG(TAG,
" UUID: %s, Major: %u, Minor: %u, Min Interval: %ums, Max Interval: %ums, Measured Power: %d"
", TX Power: %ddBm",

View File

@@ -60,11 +60,14 @@ class BLEClientBase : public espbt::ESPBTClient, public Component {
if (address == 0) {
this->address_str_ = "";
} else {
this->address_str_ =
str_snprintf("%02X:%02X:%02X:%02X:%02X:%02X", 17, (uint8_t) (this->address_ >> 40) & 0xff,
(uint8_t) (this->address_ >> 32) & 0xff, (uint8_t) (this->address_ >> 24) & 0xff,
(uint8_t) (this->address_ >> 16) & 0xff, (uint8_t) (this->address_ >> 8) & 0xff,
(uint8_t) (this->address_ >> 0) & 0xff);
char buf[18];
uint8_t mac[6] = {
(uint8_t) ((this->address_ >> 40) & 0xff), (uint8_t) ((this->address_ >> 32) & 0xff),
(uint8_t) ((this->address_ >> 24) & 0xff), (uint8_t) ((this->address_ >> 16) & 0xff),
(uint8_t) ((this->address_ >> 8) & 0xff), (uint8_t) ((this->address_ >> 0) & 0xff),
};
format_mac_addr_upper(mac, buf);
this->address_str_ = buf;
}
}
const std::string &address_str() const { return this->address_str_; }

View File

@@ -605,9 +605,8 @@ void ESPBTDevice::parse_adv_(const uint8_t *payload, uint8_t len) {
}
std::string ESPBTDevice::address_str() const {
char mac[24];
snprintf(mac, sizeof(mac), "%02X:%02X:%02X:%02X:%02X:%02X", this->address_[0], this->address_[1], this->address_[2],
this->address_[3], this->address_[4], this->address_[5]);
char mac[18];
format_mac_addr_upper(this->address_, mac);
return mac;
}

View File

@@ -300,6 +300,7 @@ void EthernetComponent::loop() {
this->state_ = EthernetComponentState::CONNECTING;
this->start_connect_();
} else {
this->finish_connect_();
// When connected and stable, disable the loop to save CPU cycles
this->disable_loop();
}
@@ -486,10 +487,35 @@ void EthernetComponent::got_ip6_event_handler(void *arg, esp_event_base_t event_
}
#endif /* USE_NETWORK_IPV6 */
void EthernetComponent::finish_connect_() {
#if USE_NETWORK_IPV6
// Retry IPv6 link-local setup if it failed during initial connect
// This handles the case where min_ipv6_addr_count is NOT set (or is 0),
// allowing us to reach CONNECTED state with just IPv4.
// If IPv6 setup failed in start_connect_() because the interface wasn't ready:
// - Bootup timing issues (#10281)
// - Cable unplugged/network interruption (#10705)
// We can now retry since we're in CONNECTED state and the interface is definitely up.
if (!this->ipv6_setup_done_) {
esp_err_t err = esp_netif_create_ip6_linklocal(this->eth_netif_);
if (err == ESP_OK) {
ESP_LOGD(TAG, "IPv6 link-local address created (retry succeeded)");
}
// Always set the flag to prevent continuous retries
// If IPv6 setup fails here with the interface up and stable, it's
// likely a persistent issue (IPv6 disabled at router, hardware
// limitation, etc.) that won't be resolved by further retries.
// The device continues to work with IPv4.
this->ipv6_setup_done_ = true;
}
#endif /* USE_NETWORK_IPV6 */
}
void EthernetComponent::start_connect_() {
global_eth_component->got_ipv4_address_ = false;
#if USE_NETWORK_IPV6
global_eth_component->ipv6_count_ = 0;
this->ipv6_setup_done_ = false;
#endif /* USE_NETWORK_IPV6 */
this->connect_begin_ = millis();
this->status_set_warning(LOG_STR("waiting for IP configuration"));
@@ -545,9 +571,27 @@ void EthernetComponent::start_connect_() {
}
}
#if USE_NETWORK_IPV6
// Attempt to create IPv6 link-local address
// We MUST attempt this here, not just in finish_connect_(), because with
// min_ipv6_addr_count set, the component won't reach CONNECTED state without IPv6.
// However, this may fail with ESP_FAIL if the interface is not up yet:
// - At bootup when link isn't ready (#10281)
// - After disconnection/cable unplugged (#10705)
// We'll retry in finish_connect_() if it fails here.
err = esp_netif_create_ip6_linklocal(this->eth_netif_);
if (err != ESP_OK) {
ESPHL_ERROR_CHECK(err, "Enable IPv6 link local failed");
if (err == ESP_ERR_ESP_NETIF_INVALID_PARAMS) {
// This is a programming error, not a transient failure
ESPHL_ERROR_CHECK(err, "esp_netif_create_ip6_linklocal invalid parameters");
} else {
// ESP_FAIL means the interface isn't up yet
// This is expected and non-fatal, happens in multiple scenarios:
// - During reconnection after network interruptions (#10705)
// - At bootup when the link isn't ready yet (#10281)
// We'll retry once we reach CONNECTED state and the interface is up
ESP_LOGW(TAG, "esp_netif_create_ip6_linklocal failed: %s", esp_err_to_name(err));
// Don't mark component as failed - this is a transient error
}
}
#endif /* USE_NETWORK_IPV6 */
@@ -638,7 +682,9 @@ void EthernetComponent::get_eth_mac_address_raw(uint8_t *mac) {
std::string EthernetComponent::get_eth_mac_address_pretty() {
uint8_t mac[6];
get_eth_mac_address_raw(mac);
return str_snprintf("%02X:%02X:%02X:%02X:%02X:%02X", 17, mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
char buf[18];
format_mac_addr_upper(mac, buf);
return std::string(buf);
}
eth_duplex_t EthernetComponent::get_duplex_mode() {

View File

@@ -102,6 +102,7 @@ class EthernetComponent : public Component {
#endif /* LWIP_IPV6 */
void start_connect_();
void finish_connect_();
void dump_connect_params_();
/// @brief Set `RMII Reference Clock Select` bit for KSZ8081.
void ksz8081_set_clock_reference_(esp_eth_mac_t *mac);
@@ -144,6 +145,7 @@ class EthernetComponent : public Component {
bool got_ipv4_address_{false};
#if LWIP_IPV6
uint8_t ipv6_count_{0};
bool ipv6_setup_done_{false};
#endif /* LWIP_IPV6 */
// Pointers at the end (naturally aligned)

View File

@@ -18,6 +18,7 @@ from esphome.const import (
DEVICE_CLASS_TEMPERATURE,
DEVICE_CLASS_VOLTAGE,
STATE_CLASS_MEASUREMENT,
STATE_CLASS_TOTAL_INCREASING,
UNIT_AMPERE,
UNIT_CELSIUS,
UNIT_VOLT,
@@ -162,7 +163,7 @@ INA2XX_SCHEMA = cv.Schema(
unit_of_measurement=UNIT_WATT_HOURS,
accuracy_decimals=8,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_MEASUREMENT,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
key=CONF_NAME,
),
@@ -170,7 +171,8 @@ INA2XX_SCHEMA = cv.Schema(
sensor.sensor_schema(
unit_of_measurement=UNIT_JOULE,
accuracy_decimals=8,
state_class=STATE_CLASS_MEASUREMENT,
device_class=DEVICE_CLASS_ENERGY,
state_class=STATE_CLASS_TOTAL_INCREASING,
),
key=CONF_NAME,
),

View File

@@ -1,4 +1,3 @@
#include <cstdio>
#include <cstring>
#include "md5.h"
#ifdef USE_MD5
@@ -44,7 +43,9 @@ void MD5Digest::get_bytes(uint8_t *output) { memcpy(output, this->digest_, 16);
void MD5Digest::get_hex(char *output) {
for (size_t i = 0; i < 16; i++) {
sprintf(output + i * 2, "%02x", this->digest_[i]);
uint8_t byte = this->digest_[i];
output[i * 2] = format_hex_char(byte >> 4);
output[i * 2 + 1] = format_hex_char(byte & 0x0F);
}
}

View File

@@ -593,7 +593,7 @@ void WiFiComponent::check_scanning_finished() {
for (auto &res : this->scan_result_) {
char bssid_s[18];
auto bssid = res.get_bssid();
sprintf(bssid_s, "%02X:%02X:%02X:%02X:%02X:%02X", bssid[0], bssid[1], bssid[2], bssid[3], bssid[4], bssid[5]);
format_mac_addr_upper(bssid.data(), bssid_s);
if (res.get_matches()) {
ESP_LOGI(TAG, "- '%s' %s" LOG_SECRET("(%s) ") "%s", res.get_ssid().c_str(),

View File

@@ -1,6 +1,7 @@
#pragma once
#include "esphome/core/component.h"
#include "esphome/core/helpers.h"
#include "esphome/components/text_sensor/text_sensor.h"
#include "esphome/components/wifi/wifi_component.h"
#ifdef USE_WIFI
@@ -106,8 +107,8 @@ class BSSIDWiFiInfo : public PollingComponent, public text_sensor::TextSensor {
wifi::bssid_t bssid = wifi::global_wifi_component->wifi_bssid();
if (memcmp(bssid.data(), last_bssid_.data(), 6) != 0) {
std::copy(bssid.begin(), bssid.end(), last_bssid_.begin());
char buf[30];
sprintf(buf, "%02X:%02X:%02X:%02X:%02X:%02X", bssid[0], bssid[1], bssid[2], bssid[3], bssid[4], bssid[5]);
char buf[18];
format_mac_addr_upper(bssid.data(), buf);
this->publish_state(buf);
}
}

View File

@@ -255,23 +255,22 @@ size_t parse_hex(const char *str, size_t length, uint8_t *data, size_t count) {
}
std::string format_mac_address_pretty(const uint8_t *mac) {
return str_snprintf("%02X:%02X:%02X:%02X:%02X:%02X", 17, mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
char buf[18];
format_mac_addr_upper(mac, buf);
return std::string(buf);
}
static char format_hex_char(uint8_t v) { return v >= 10 ? 'a' + (v - 10) : '0' + v; }
std::string format_hex(const uint8_t *data, size_t length) {
std::string ret;
ret.resize(length * 2);
for (size_t i = 0; i < length; i++) {
ret[2 * i] = format_hex_char((data[i] & 0xF0) >> 4);
ret[2 * i] = format_hex_char(data[i] >> 4);
ret[2 * i + 1] = format_hex_char(data[i] & 0x0F);
}
return ret;
}
std::string format_hex(const std::vector<uint8_t> &data) { return format_hex(data.data(), data.size()); }
static char format_hex_pretty_char(uint8_t v) { return v >= 10 ? 'A' + (v - 10) : '0' + v; }
// Shared implementation for uint8_t and string hex formatting
static std::string format_hex_pretty_uint8(const uint8_t *data, size_t length, char separator, bool show_length) {
if (data == nullptr || length == 0)
@@ -280,7 +279,7 @@ static std::string format_hex_pretty_uint8(const uint8_t *data, size_t length, c
uint8_t multiple = separator ? 3 : 2; // 3 if separator is not \0, 2 otherwise
ret.resize(multiple * length - (separator ? 1 : 0));
for (size_t i = 0; i < length; i++) {
ret[multiple * i] = format_hex_pretty_char((data[i] & 0xF0) >> 4);
ret[multiple * i] = format_hex_pretty_char(data[i] >> 4);
ret[multiple * i + 1] = format_hex_pretty_char(data[i] & 0x0F);
if (separator && i != length - 1)
ret[multiple * i + 2] = separator;
@@ -591,7 +590,9 @@ bool HighFrequencyLoopRequester::is_high_frequency() { return num_requests > 0;
std::string get_mac_address() {
uint8_t mac[6];
get_mac_address_raw(mac);
return str_snprintf("%02x%02x%02x%02x%02x%02x", 12, mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
char buf[13];
format_mac_addr_lower_no_sep(mac, buf);
return std::string(buf);
}
std::string get_mac_address_pretty() {

View File

@@ -380,6 +380,35 @@ template<typename T, enable_if_t<std::is_unsigned<T>::value, int> = 0> optional<
return parse_hex<T>(str.c_str(), str.length());
}
/// Convert a nibble (0-15) to lowercase hex char
inline char format_hex_char(uint8_t v) { return v >= 10 ? 'a' + (v - 10) : '0' + v; }
/// Convert a nibble (0-15) to uppercase hex char (used for pretty printing)
/// This always uses uppercase (A-F) for pretty/human-readable output
inline char format_hex_pretty_char(uint8_t v) { return v >= 10 ? 'A' + (v - 10) : '0' + v; }
/// Format MAC address as XX:XX:XX:XX:XX:XX (uppercase)
inline void format_mac_addr_upper(const uint8_t *mac, char *output) {
for (size_t i = 0; i < 6; i++) {
uint8_t byte = mac[i];
output[i * 3] = format_hex_pretty_char(byte >> 4);
output[i * 3 + 1] = format_hex_pretty_char(byte & 0x0F);
if (i < 5)
output[i * 3 + 2] = ':';
}
output[17] = '\0';
}
/// Format MAC address as xxxxxxxxxxxxxx (lowercase, no separators)
inline void format_mac_addr_lower_no_sep(const uint8_t *mac, char *output) {
for (size_t i = 0; i < 6; i++) {
uint8_t byte = mac[i];
output[i * 2] = format_hex_char(byte >> 4);
output[i * 2 + 1] = format_hex_char(byte & 0x0F);
}
output[12] = '\0';
}
/// Format the six-byte array \p mac into a MAC address.
std::string format_mac_address_pretty(const uint8_t mac[6]);
/// Format the byte array \p data of length \p len in lowercased hex.

View File

@@ -345,7 +345,7 @@ void HOT Scheduler::call(uint32_t now) {
// Execute callback without holding lock to prevent deadlocks
// if the callback tries to call defer() again
if (!this->should_skip_item_(item.get())) {
this->execute_item_(item.get(), now);
now = this->execute_item_(item.get(), now);
}
// Recycle the defer item after execution
this->recycle_item_(std::move(item));
@@ -483,7 +483,7 @@ void HOT Scheduler::call(uint32_t now) {
// Warning: During callback(), a lot of stuff can happen, including:
// - timeouts/intervals get added, potentially invalidating vector pointers
// - timeouts/intervals get cancelled
this->execute_item_(item.get(), now);
now = this->execute_item_(item.get(), now);
LockGuard guard{this->lock_};
@@ -568,11 +568,11 @@ void HOT Scheduler::pop_raw_() {
}
// Helper to execute a scheduler item
void HOT Scheduler::execute_item_(SchedulerItem *item, uint32_t now) {
uint32_t HOT Scheduler::execute_item_(SchedulerItem *item, uint32_t now) {
App.set_current_component(item->component);
WarnIfComponentBlockingGuard guard{item->component, now};
item->callback();
guard.finish();
return guard.finish();
}
// Common implementation for cancel operations

View File

@@ -254,7 +254,7 @@ class Scheduler {
}
// Helper to execute a scheduler item
void execute_item_(SchedulerItem *item, uint32_t now);
uint32_t execute_item_(SchedulerItem *item, uint32_t now);
// Helper to check if item should be skipped
bool should_skip_item_(SchedulerItem *item) const {

View File

@@ -12,7 +12,7 @@ platformio==6.1.18 # When updating platformio, also update /docker/Dockerfile
esptool==5.0.2
click==8.1.7
esphome-dashboard==20250904.0
aioesphomeapi==40.1.0
aioesphomeapi==40.2.1
zeroconf==0.147.2
puremagic==1.30
ruamel.yaml==0.18.15 # dashboard_import

View File

@@ -1,8 +1,4 @@
substitutions:
network_enable_ipv6: "true"
bk72xx:
framework:
version: 1.7.0
<<: !include common.yaml

View File

@@ -0,0 +1 @@
<<: !include common.yaml

View File

@@ -0,0 +1,188 @@
"""Tests for esphome.build_gen.platformio module."""
from __future__ import annotations
from collections.abc import Generator
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from esphome.build_gen import platformio
from esphome.core import CORE
@pytest.fixture
def mock_update_storage_json() -> Generator[MagicMock]:
"""Mock update_storage_json for all tests."""
with patch("esphome.build_gen.platformio.update_storage_json") as mock:
yield mock
@pytest.fixture
def mock_write_file_if_changed() -> Generator[MagicMock]:
"""Mock write_file_if_changed for tests."""
with patch("esphome.build_gen.platformio.write_file_if_changed") as mock:
yield mock
def test_write_ini_creates_new_file(
tmp_path: Path, mock_update_storage_json: MagicMock
) -> None:
"""Test write_ini creates a new platformio.ini file."""
CORE.build_path = str(tmp_path)
content = """
[env:test]
platform = espressif32
board = esp32dev
framework = arduino
"""
platformio.write_ini(content)
ini_file = tmp_path / "platformio.ini"
assert ini_file.exists()
file_content = ini_file.read_text()
assert content in file_content
assert platformio.INI_AUTO_GENERATE_BEGIN in file_content
assert platformio.INI_AUTO_GENERATE_END in file_content
def test_write_ini_updates_existing_file(
tmp_path: Path, mock_update_storage_json: MagicMock
) -> None:
"""Test write_ini updates existing platformio.ini file."""
CORE.build_path = str(tmp_path)
# Create existing file with custom content
ini_file = tmp_path / "platformio.ini"
existing_content = f"""
; Custom header
[platformio]
default_envs = test
{platformio.INI_AUTO_GENERATE_BEGIN}
; Old auto-generated content
[env:old]
platform = old
{platformio.INI_AUTO_GENERATE_END}
; Custom footer
"""
ini_file.write_text(existing_content)
# New content to write
new_content = """
[env:test]
platform = espressif32
board = esp32dev
framework = arduino
"""
platformio.write_ini(new_content)
file_content = ini_file.read_text()
# Check that custom parts are preserved
assert "; Custom header" in file_content
assert "[platformio]" in file_content
assert "default_envs = test" in file_content
assert "; Custom footer" in file_content
# Check that new content replaced old auto-generated content
assert new_content in file_content
assert "[env:old]" not in file_content
assert "platform = old" not in file_content
def test_write_ini_preserves_custom_sections(
tmp_path: Path, mock_update_storage_json: MagicMock
) -> None:
"""Test write_ini preserves custom sections outside auto-generate markers."""
CORE.build_path = str(tmp_path)
# Create existing file with multiple custom sections
ini_file = tmp_path / "platformio.ini"
existing_content = f"""
[platformio]
src_dir = .
include_dir = .
[common]
lib_deps =
Wire
SPI
{platformio.INI_AUTO_GENERATE_BEGIN}
[env:old]
platform = old
{platformio.INI_AUTO_GENERATE_END}
[env:custom]
upload_speed = 921600
monitor_speed = 115200
"""
ini_file.write_text(existing_content)
new_content = "[env:auto]\nplatform = new"
platformio.write_ini(new_content)
file_content = ini_file.read_text()
# All custom sections should be preserved
assert "[platformio]" in file_content
assert "src_dir = ." in file_content
assert "[common]" in file_content
assert "lib_deps" in file_content
assert "[env:custom]" in file_content
assert "upload_speed = 921600" in file_content
# New auto-generated content should replace old
assert "[env:auto]" in file_content
assert "platform = new" in file_content
assert "[env:old]" not in file_content
def test_write_ini_no_change_when_content_same(
tmp_path: Path,
mock_update_storage_json: MagicMock,
mock_write_file_if_changed: MagicMock,
) -> None:
"""Test write_ini doesn't rewrite file when content is unchanged."""
CORE.build_path = str(tmp_path)
content = "[env:test]\nplatform = esp32"
full_content = (
f"{platformio.INI_BASE_FORMAT[0]}"
f"{platformio.INI_AUTO_GENERATE_BEGIN}\n"
f"{content}"
f"{platformio.INI_AUTO_GENERATE_END}"
f"{platformio.INI_BASE_FORMAT[1]}"
)
ini_file = tmp_path / "platformio.ini"
ini_file.write_text(full_content)
mock_write_file_if_changed.return_value = False # Indicate no change
platformio.write_ini(content)
# write_file_if_changed should be called with the same content
mock_write_file_if_changed.assert_called_once()
call_args = mock_write_file_if_changed.call_args[0]
assert call_args[0] == str(ini_file)
assert content in call_args[1]
def test_write_ini_calls_update_storage_json(
tmp_path: Path, mock_update_storage_json: MagicMock
) -> None:
"""Test write_ini calls update_storage_json."""
CORE.build_path = str(tmp_path)
content = "[env:test]\nplatform = esp32"
platformio.write_ini(content)
mock_update_storage_json.assert_called_once()

View File

@@ -9,8 +9,10 @@ not be part of a unit test suite.
"""
from collections.abc import Generator
from pathlib import Path
import sys
from unittest.mock import Mock, patch
import pytest
@@ -43,3 +45,45 @@ def setup_core(tmp_path: Path) -> Path:
"""Set up CORE with test paths."""
CORE.config_path = str(tmp_path / "test.yaml")
return tmp_path
@pytest.fixture
def mock_write_file_if_changed() -> Generator[Mock, None, None]:
"""Mock write_file_if_changed for storage_json."""
with patch("esphome.storage_json.write_file_if_changed") as mock:
yield mock
@pytest.fixture
def mock_copy_file_if_changed() -> Generator[Mock, None, None]:
"""Mock copy_file_if_changed for core.config."""
with patch("esphome.core.config.copy_file_if_changed") as mock:
yield mock
@pytest.fixture
def mock_run_platformio_cli() -> Generator[Mock, None, None]:
"""Mock run_platformio_cli for platformio_api."""
with patch("esphome.platformio_api.run_platformio_cli") as mock:
yield mock
@pytest.fixture
def mock_run_platformio_cli_run() -> Generator[Mock, None, None]:
"""Mock run_platformio_cli_run for platformio_api."""
with patch("esphome.platformio_api.run_platformio_cli_run") as mock:
yield mock
@pytest.fixture
def mock_decode_pc() -> Generator[Mock, None, None]:
"""Mock _decode_pc for platformio_api."""
with patch("esphome.platformio_api._decode_pc") as mock:
yield mock
@pytest.fixture
def mock_run_external_command() -> Generator[Mock, None, None]:
"""Mock run_external_command for platformio_api."""
with patch("esphome.platformio_api.run_external_command") as mock:
yield mock

View File

@@ -1,21 +1,56 @@
"""Unit tests for core config functionality including areas and devices."""
from collections.abc import Callable
import os
from pathlib import Path
import types
from typing import Any
from unittest.mock import MagicMock, Mock, patch
import pytest
from esphome import config_validation as cv, core
from esphome.const import CONF_AREA, CONF_AREAS, CONF_DEVICES
from esphome.core import config
from esphome.core.config import Area, validate_area_config
from esphome.const import (
CONF_AREA,
CONF_AREAS,
CONF_BUILD_PATH,
CONF_DEVICES,
CONF_ESPHOME,
CONF_NAME,
CONF_NAME_ADD_MAC_SUFFIX,
KEY_CORE,
)
from esphome.core import CORE, config
from esphome.core.config import (
Area,
preload_core_config,
valid_include,
valid_project_name,
validate_area_config,
validate_hostname,
)
from .common import load_config_from_fixture
FIXTURES_DIR = Path(__file__).parent.parent / "fixtures" / "core" / "config"
@pytest.fixture
def mock_cg_with_include_capture() -> tuple[Mock, list[str]]:
"""Mock code generation with include capture."""
includes_added: list[str] = []
with patch("esphome.core.config.cg") as mock_cg:
mock_raw_statement = MagicMock()
def capture_include(text: str) -> MagicMock:
includes_added.append(text)
return mock_raw_statement
mock_cg.RawStatement.side_effect = capture_include
yield mock_cg, includes_added
def test_validate_area_config_with_string() -> None:
"""Test that string area config is converted to structured format."""
result = validate_area_config("Living Room")
@@ -245,3 +280,566 @@ def test_add_platform_defines_priority() -> None:
f"_add_platform_defines priority ({config._add_platform_defines.priority}) must be lower than "
f"globals priority ({globals_to_code.priority}) to fix issue #10431 (sensor count bug with lambdas)"
)
def test_valid_include_with_angle_brackets() -> None:
"""Test valid_include accepts angle bracket includes."""
assert valid_include("<ArduinoJson.h>") == "<ArduinoJson.h>"
def test_valid_include_with_valid_file(tmp_path: Path) -> None:
"""Test valid_include accepts valid include files."""
CORE.config_path = str(tmp_path / "test.yaml")
include_file = tmp_path / "include.h"
include_file.touch()
assert valid_include(str(include_file)) == str(include_file)
def test_valid_include_with_valid_directory(tmp_path: Path) -> None:
"""Test valid_include accepts valid directories."""
CORE.config_path = str(tmp_path / "test.yaml")
include_dir = tmp_path / "includes"
include_dir.mkdir()
assert valid_include(str(include_dir)) == str(include_dir)
def test_valid_include_invalid_extension(tmp_path: Path) -> None:
"""Test valid_include rejects files with invalid extensions."""
CORE.config_path = str(tmp_path / "test.yaml")
invalid_file = tmp_path / "file.txt"
invalid_file.touch()
with pytest.raises(cv.Invalid, match="Include has invalid file extension"):
valid_include(str(invalid_file))
def test_valid_project_name_valid() -> None:
"""Test valid_project_name accepts valid project names."""
assert valid_project_name("esphome.my_project") == "esphome.my_project"
def test_valid_project_name_no_namespace() -> None:
"""Test valid_project_name rejects names without namespace."""
with pytest.raises(cv.Invalid, match="project name needs to have a namespace"):
valid_project_name("my_project")
def test_valid_project_name_multiple_dots() -> None:
"""Test valid_project_name rejects names with multiple dots."""
with pytest.raises(cv.Invalid, match="project name needs to have a namespace"):
valid_project_name("esphome.my.project")
def test_validate_hostname_valid() -> None:
"""Test validate_hostname accepts valid hostnames."""
config = {CONF_NAME: "my-device", CONF_NAME_ADD_MAC_SUFFIX: False}
assert validate_hostname(config) == config
def test_validate_hostname_too_long() -> None:
"""Test validate_hostname rejects hostnames that are too long."""
config = {
CONF_NAME: "a" * 32, # 32 chars, max is 31
CONF_NAME_ADD_MAC_SUFFIX: False,
}
with pytest.raises(cv.Invalid, match="Hostnames can only be 31 characters long"):
validate_hostname(config)
def test_validate_hostname_too_long_with_mac_suffix() -> None:
"""Test validate_hostname accounts for MAC suffix length."""
config = {
CONF_NAME: "a" * 25, # 25 chars, max is 24 with MAC suffix
CONF_NAME_ADD_MAC_SUFFIX: True,
}
with pytest.raises(cv.Invalid, match="Hostnames can only be 24 characters long"):
validate_hostname(config)
def test_validate_hostname_with_underscore(caplog) -> None:
"""Test validate_hostname warns about underscores."""
config = {CONF_NAME: "my_device", CONF_NAME_ADD_MAC_SUFFIX: False}
assert validate_hostname(config) == config
assert (
"Using the '_' (underscore) character in the hostname is discouraged"
in caplog.text
)
def test_preload_core_config_basic(setup_core: Path) -> None:
"""Test preload_core_config sets basic CORE attributes."""
config = {
CONF_ESPHOME: {
CONF_NAME: "test_device",
},
"esp32": {},
}
result = {}
platform = preload_core_config(config, result)
assert CORE.name == "test_device"
assert platform == "esp32"
assert KEY_CORE in CORE.data
assert CONF_BUILD_PATH in config[CONF_ESPHOME]
def test_preload_core_config_with_build_path(setup_core: Path) -> None:
"""Test preload_core_config uses provided build path."""
config = {
CONF_ESPHOME: {
CONF_NAME: "test_device",
CONF_BUILD_PATH: "/custom/build/path",
},
"esp8266": {},
}
result = {}
platform = preload_core_config(config, result)
assert config[CONF_ESPHOME][CONF_BUILD_PATH] == "/custom/build/path"
assert platform == "esp8266"
def test_preload_core_config_env_build_path(setup_core: Path) -> None:
"""Test preload_core_config uses ESPHOME_BUILD_PATH env var."""
config = {
CONF_ESPHOME: {
CONF_NAME: "test_device",
},
"rp2040": {},
}
result = {}
with patch.dict(os.environ, {"ESPHOME_BUILD_PATH": "/env/build"}):
platform = preload_core_config(config, result)
assert CONF_BUILD_PATH in config[CONF_ESPHOME]
assert "test_device" in config[CONF_ESPHOME][CONF_BUILD_PATH]
assert platform == "rp2040"
def test_preload_core_config_no_platform(setup_core: Path) -> None:
"""Test preload_core_config raises when no platform is specified."""
config = {
CONF_ESPHOME: {
CONF_NAME: "test_device",
},
}
result = {}
# Mock _is_target_platform to avoid expensive component loading
with patch("esphome.core.config._is_target_platform") as mock_is_platform:
# Return True for known platforms
mock_is_platform.side_effect = lambda name: name in [
"esp32",
"esp8266",
"rp2040",
]
with pytest.raises(cv.Invalid, match="Platform missing"):
preload_core_config(config, result)
def test_preload_core_config_multiple_platforms(setup_core: Path) -> None:
"""Test preload_core_config raises when multiple platforms are specified."""
config = {
CONF_ESPHOME: {
CONF_NAME: "test_device",
},
"esp32": {},
"esp8266": {},
}
result = {}
# Mock _is_target_platform to avoid expensive component loading
with patch("esphome.core.config._is_target_platform") as mock_is_platform:
# Return True for known platforms
mock_is_platform.side_effect = lambda name: name in [
"esp32",
"esp8266",
"rp2040",
]
with pytest.raises(cv.Invalid, match="Found multiple target platform blocks"):
preload_core_config(config, result)
def test_include_file_header(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None:
"""Test include_file adds include statement for header files."""
src_file = tmp_path / "source.h"
src_file.write_text("// Header content")
CORE.build_path = str(tmp_path / "build")
with patch("esphome.core.config.cg") as mock_cg:
# Mock RawStatement to capture the text
mock_raw_statement = MagicMock()
mock_raw_statement.text = ""
def raw_statement_side_effect(text):
mock_raw_statement.text = text
return mock_raw_statement
mock_cg.RawStatement.side_effect = raw_statement_side_effect
config.include_file(str(src_file), "test.h")
mock_copy_file_if_changed.assert_called_once()
mock_cg.add_global.assert_called_once()
# Check that include statement was added
assert '#include "test.h"' in mock_raw_statement.text
def test_include_file_cpp(tmp_path: Path, mock_copy_file_if_changed: Mock) -> None:
"""Test include_file does not add include for cpp files."""
src_file = tmp_path / "source.cpp"
src_file.write_text("// CPP content")
CORE.build_path = str(tmp_path / "build")
with patch("esphome.core.config.cg") as mock_cg:
config.include_file(str(src_file), "test.cpp")
mock_copy_file_if_changed.assert_called_once()
# Should not add include statement for .cpp files
mock_cg.add_global.assert_not_called()
def test_get_usable_cpu_count() -> None:
"""Test get_usable_cpu_count returns CPU count."""
count = config.get_usable_cpu_count()
assert isinstance(count, int)
assert count > 0
def test_get_usable_cpu_count_with_process_cpu_count() -> None:
"""Test get_usable_cpu_count uses process_cpu_count when available."""
# Test with process_cpu_count (Python 3.13+)
# Create a mock os module with process_cpu_count
mock_os = types.SimpleNamespace(process_cpu_count=lambda: 8, cpu_count=lambda: 4)
with patch("esphome.core.config.os", mock_os):
# When process_cpu_count exists, it should be used
count = config.get_usable_cpu_count()
assert count == 8
# Test fallback to cpu_count when process_cpu_count not available
mock_os_no_process = types.SimpleNamespace(cpu_count=lambda: 4)
with patch("esphome.core.config.os", mock_os_no_process):
count = config.get_usable_cpu_count()
assert count == 4
def test_list_target_platforms(tmp_path: Path) -> None:
"""Test _list_target_platforms returns available platforms."""
# Create mock components directory structure
components_dir = tmp_path / "components"
components_dir.mkdir()
# Create platform and non-platform directories with __init__.py
platforms = ["esp32", "esp8266", "rp2040", "libretiny", "host"]
non_platforms = ["sensor"]
for component in platforms + non_platforms:
component_dir = components_dir / component
component_dir.mkdir()
(component_dir / "__init__.py").touch()
# Create a file (not a directory)
(components_dir / "README.md").touch()
# Create a directory without __init__.py
(components_dir / "no_init").mkdir()
# Mock Path(__file__).parents[1] to return our tmp_path
with patch("esphome.core.config.Path") as mock_path:
mock_file_path = MagicMock()
mock_file_path.parents = [MagicMock(), tmp_path]
mock_path.return_value = mock_file_path
platforms = config._list_target_platforms()
assert isinstance(platforms, list)
# Should include platform components
assert "esp32" in platforms
assert "esp8266" in platforms
assert "rp2040" in platforms
assert "libretiny" in platforms
assert "host" in platforms
# Should not include non-platform components
assert "sensor" not in platforms
assert "README.md" not in platforms
assert "no_init" not in platforms
def test_is_target_platform() -> None:
"""Test _is_target_platform identifies valid platforms."""
assert config._is_target_platform("esp32") is True
assert config._is_target_platform("esp8266") is True
assert config._is_target_platform("rp2040") is True
assert config._is_target_platform("invalid_platform") is False
assert config._is_target_platform("api") is False # Component but not platform
@pytest.mark.asyncio
async def test_add_includes_with_single_file(
tmp_path: Path,
mock_copy_file_if_changed: Mock,
mock_cg_with_include_capture: tuple[Mock, list[str]],
) -> None:
"""Test add_includes copies a single header file to build directory."""
CORE.config_path = str(tmp_path / "config.yaml")
CORE.build_path = str(tmp_path / "build")
os.makedirs(CORE.build_path, exist_ok=True)
# Create include file
include_file = tmp_path / "my_header.h"
include_file.write_text("#define MY_CONSTANT 42")
mock_cg, includes_added = mock_cg_with_include_capture
await config.add_includes([str(include_file)])
# Verify copy_file_if_changed was called to copy the file
# Note: add_includes adds files to a src/ subdirectory
mock_copy_file_if_changed.assert_called_once_with(
str(include_file), str(Path(CORE.build_path) / "src" / "my_header.h")
)
# Verify include statement was added
assert any('#include "my_header.h"' in inc for inc in includes_added)
@pytest.mark.asyncio
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
async def test_add_includes_with_directory_unix(
tmp_path: Path,
mock_copy_file_if_changed: Mock,
mock_cg_with_include_capture: tuple[Mock, list[str]],
) -> None:
"""Test add_includes copies all files from a directory on Unix."""
CORE.config_path = str(tmp_path / "config.yaml")
CORE.build_path = str(tmp_path / "build")
os.makedirs(CORE.build_path, exist_ok=True)
# Create include directory with files
include_dir = tmp_path / "includes"
include_dir.mkdir()
(include_dir / "header1.h").write_text("#define HEADER1")
(include_dir / "header2.hpp").write_text("#define HEADER2")
(include_dir / "source.cpp").write_text("// Implementation")
(include_dir / "README.md").write_text(
"# Documentation"
) # Should be copied but not included
# Create subdirectory with files
subdir = include_dir / "subdir"
subdir.mkdir()
(subdir / "nested.h").write_text("#define NESTED")
mock_cg, includes_added = mock_cg_with_include_capture
await config.add_includes([str(include_dir)])
# Verify copy_file_if_changed was called for all files
assert mock_copy_file_if_changed.call_count == 5 # 4 code files + 1 README
# Verify include statements were added for valid extensions
include_strings = " ".join(includes_added)
assert "includes/header1.h" in include_strings
assert "includes/header2.hpp" in include_strings
assert "includes/subdir/nested.h" in include_strings
# CPP files are copied but not included
assert "source.cpp" not in include_strings or "#include" not in include_strings
# README.md should not have an include statement
assert "README.md" not in include_strings
@pytest.mark.asyncio
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
async def test_add_includes_with_directory_windows(
tmp_path: Path,
mock_copy_file_if_changed: Mock,
mock_cg_with_include_capture: tuple[Mock, list[str]],
) -> None:
"""Test add_includes copies all files from a directory on Windows."""
CORE.config_path = str(tmp_path / "config.yaml")
CORE.build_path = str(tmp_path / "build")
os.makedirs(CORE.build_path, exist_ok=True)
# Create include directory with files
include_dir = tmp_path / "includes"
include_dir.mkdir()
(include_dir / "header1.h").write_text("#define HEADER1")
(include_dir / "header2.hpp").write_text("#define HEADER2")
(include_dir / "source.cpp").write_text("// Implementation")
(include_dir / "README.md").write_text(
"# Documentation"
) # Should be copied but not included
# Create subdirectory with files
subdir = include_dir / "subdir"
subdir.mkdir()
(subdir / "nested.h").write_text("#define NESTED")
mock_cg, includes_added = mock_cg_with_include_capture
await config.add_includes([str(include_dir)])
# Verify copy_file_if_changed was called for all files
assert mock_copy_file_if_changed.call_count == 5 # 4 code files + 1 README
# Verify include statements were added for valid extensions
include_strings = " ".join(includes_added)
assert "includes\\header1.h" in include_strings
assert "includes\\header2.hpp" in include_strings
assert "includes\\subdir\\nested.h" in include_strings
# CPP files are copied but not included
assert "source.cpp" not in include_strings or "#include" not in include_strings
# README.md should not have an include statement
assert "README.md" not in include_strings
@pytest.mark.asyncio
async def test_add_includes_with_multiple_sources(
tmp_path: Path, mock_copy_file_if_changed: Mock
) -> None:
"""Test add_includes with multiple files and directories."""
CORE.config_path = str(tmp_path / "config.yaml")
CORE.build_path = str(tmp_path / "build")
os.makedirs(CORE.build_path, exist_ok=True)
# Create various include sources
single_file = tmp_path / "single.h"
single_file.write_text("#define SINGLE")
dir1 = tmp_path / "dir1"
dir1.mkdir()
(dir1 / "file1.h").write_text("#define FILE1")
dir2 = tmp_path / "dir2"
dir2.mkdir()
(dir2 / "file2.cpp").write_text("// File2")
with patch("esphome.core.config.cg"):
await config.add_includes([str(single_file), str(dir1), str(dir2)])
# Verify copy_file_if_changed was called for all files
assert mock_copy_file_if_changed.call_count == 3 # 3 files total
@pytest.mark.asyncio
async def test_add_includes_empty_directory(
tmp_path: Path, mock_copy_file_if_changed: Mock
) -> None:
"""Test add_includes with an empty directory doesn't fail."""
CORE.config_path = str(tmp_path / "config.yaml")
CORE.build_path = str(tmp_path / "build")
os.makedirs(CORE.build_path, exist_ok=True)
# Create empty directory
empty_dir = tmp_path / "empty"
empty_dir.mkdir()
with patch("esphome.core.config.cg"):
# Should not raise any errors
await config.add_includes([str(empty_dir)])
# No files to copy from empty directory
mock_copy_file_if_changed.assert_not_called()
@pytest.mark.asyncio
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
async def test_add_includes_preserves_directory_structure_unix(
tmp_path: Path, mock_copy_file_if_changed: Mock
) -> None:
"""Test that add_includes preserves relative directory structure on Unix."""
CORE.config_path = str(tmp_path / "config.yaml")
CORE.build_path = str(tmp_path / "build")
os.makedirs(CORE.build_path, exist_ok=True)
# Create nested directory structure
lib_dir = tmp_path / "lib"
lib_dir.mkdir()
src_dir = lib_dir / "src"
src_dir.mkdir()
(src_dir / "core.h").write_text("#define CORE")
utils_dir = lib_dir / "utils"
utils_dir.mkdir()
(utils_dir / "helper.h").write_text("#define HELPER")
with patch("esphome.core.config.cg"):
await config.add_includes([str(lib_dir)])
# Verify copy_file_if_changed was called with correct paths
calls = mock_copy_file_if_changed.call_args_list
dest_paths = [call[0][1] for call in calls]
# Check that relative paths are preserved
assert any("lib/src/core.h" in path for path in dest_paths)
assert any("lib/utils/helper.h" in path for path in dest_paths)
@pytest.mark.asyncio
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
async def test_add_includes_preserves_directory_structure_windows(
tmp_path: Path, mock_copy_file_if_changed: Mock
) -> None:
"""Test that add_includes preserves relative directory structure on Windows."""
CORE.config_path = str(tmp_path / "config.yaml")
CORE.build_path = str(tmp_path / "build")
os.makedirs(CORE.build_path, exist_ok=True)
# Create nested directory structure
lib_dir = tmp_path / "lib"
lib_dir.mkdir()
src_dir = lib_dir / "src"
src_dir.mkdir()
(src_dir / "core.h").write_text("#define CORE")
utils_dir = lib_dir / "utils"
utils_dir.mkdir()
(utils_dir / "helper.h").write_text("#define HELPER")
with patch("esphome.core.config.cg"):
await config.add_includes([str(lib_dir)])
# Verify copy_file_if_changed was called with correct paths
calls = mock_copy_file_if_changed.call_args_list
dest_paths = [call[0][1] for call in calls]
# Check that relative paths are preserved
assert any("lib\\src\\core.h" in path for path in dest_paths)
assert any("lib\\utils\\helper.h" in path for path in dest_paths)
@pytest.mark.asyncio
async def test_add_includes_overwrites_existing_files(
tmp_path: Path, mock_copy_file_if_changed: Mock
) -> None:
"""Test that add_includes overwrites existing files in build directory."""
CORE.config_path = str(tmp_path / "config.yaml")
CORE.build_path = str(tmp_path / "build")
os.makedirs(CORE.build_path, exist_ok=True)
# Create include file
include_file = tmp_path / "header.h"
include_file.write_text("#define NEW_VALUE 42")
with patch("esphome.core.config.cg"):
await config.add_includes([str(include_file)])
# Verify copy_file_if_changed was called (it handles overwriting)
# Note: add_includes adds files to a src/ subdirectory
mock_copy_file_if_changed.assert_called_once_with(
str(include_file), str(Path(CORE.build_path) / "src" / "header.h")
)

View File

@@ -1,3 +1,6 @@
import os
from unittest.mock import patch
from hypothesis import given
import pytest
from strategies import mac_addr_strings
@@ -577,3 +580,83 @@ class TestEsphomeCore:
assert target.is_esp32 is False
assert target.is_esp8266 is True
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
def test_data_dir_default_unix(self, target):
"""Test data_dir returns .esphome in config directory by default on Unix."""
target.config_path = "/home/user/config.yaml"
assert target.data_dir == "/home/user/.esphome"
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
def test_data_dir_default_windows(self, target):
"""Test data_dir returns .esphome in config directory by default on Windows."""
target.config_path = "D:\\home\\user\\config.yaml"
assert target.data_dir == "D:\\home\\user\\.esphome"
def test_data_dir_ha_addon(self, target):
"""Test data_dir returns /data when running as Home Assistant addon."""
target.config_path = "/config/test.yaml"
with patch.dict(os.environ, {"ESPHOME_IS_HA_ADDON": "true"}):
assert target.data_dir == "/data"
def test_data_dir_env_override(self, target):
"""Test data_dir uses ESPHOME_DATA_DIR environment variable when set."""
target.config_path = "/home/user/config.yaml"
with patch.dict(os.environ, {"ESPHOME_DATA_DIR": "/custom/data/path"}):
assert target.data_dir == "/custom/data/path"
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
def test_data_dir_priority_unix(self, target):
"""Test data_dir priority on Unix: HA addon > env var > default."""
target.config_path = "/config/test.yaml"
expected_default = "/config/.esphome"
# Test HA addon takes priority over env var
with patch.dict(
os.environ,
{"ESPHOME_IS_HA_ADDON": "true", "ESPHOME_DATA_DIR": "/custom/path"},
):
assert target.data_dir == "/data"
# Test env var is used when not HA addon
with patch.dict(
os.environ,
{"ESPHOME_IS_HA_ADDON": "false", "ESPHOME_DATA_DIR": "/custom/path"},
):
assert target.data_dir == "/custom/path"
# Test default when neither is set
with patch.dict(os.environ, {}, clear=True):
# Ensure these env vars are not set
os.environ.pop("ESPHOME_IS_HA_ADDON", None)
os.environ.pop("ESPHOME_DATA_DIR", None)
assert target.data_dir == expected_default
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
def test_data_dir_priority_windows(self, target):
"""Test data_dir priority on Windows: HA addon > env var > default."""
target.config_path = "D:\\config\\test.yaml"
expected_default = "D:\\config\\.esphome"
# Test HA addon takes priority over env var
with patch.dict(
os.environ,
{"ESPHOME_IS_HA_ADDON": "true", "ESPHOME_DATA_DIR": "/custom/path"},
):
assert target.data_dir == "/data"
# Test env var is used when not HA addon
with patch.dict(
os.environ,
{"ESPHOME_IS_HA_ADDON": "false", "ESPHOME_DATA_DIR": "/custom/path"},
):
assert target.data_dir == "/custom/path"
# Test default when neither is set
with patch.dict(os.environ, {}, clear=True):
# Ensure these env vars are not set
os.environ.pop("ESPHOME_IS_HA_ADDON", None)
os.environ.pop("ESPHOME_DATA_DIR", None)
assert target.data_dir == expected_default

View File

@@ -1,5 +1,8 @@
import logging
import os
from pathlib import Path
import socket
import stat
from unittest.mock import patch
from aioesphomeapi.host_resolver import AddrInfo, IPv4Sockaddr, IPv6Sockaddr
@@ -554,6 +557,239 @@ def test_addr_preference_ipv6_link_local_with_scope() -> None:
assert helpers.addr_preference_(addr_info) == 1 # Has scope, so it's usable
def test_mkdir_p(tmp_path: Path) -> None:
"""Test mkdir_p creates directories recursively."""
# Test creating nested directories
nested_path = tmp_path / "level1" / "level2" / "level3"
helpers.mkdir_p(nested_path)
assert nested_path.exists()
assert nested_path.is_dir()
# Test that mkdir_p is idempotent (doesn't fail if directory exists)
helpers.mkdir_p(nested_path)
assert nested_path.exists()
# Test with empty path (should do nothing)
helpers.mkdir_p("")
# Test with existing directory
existing_dir = tmp_path / "existing"
existing_dir.mkdir()
helpers.mkdir_p(existing_dir)
assert existing_dir.exists()
def test_mkdir_p_file_exists_error(tmp_path: Path) -> None:
"""Test mkdir_p raises error when path is a file."""
# Create a file
file_path = tmp_path / "test_file.txt"
file_path.write_text("test content")
# Try to create directory with same name as existing file
with pytest.raises(EsphomeError, match=r"Error creating directories"):
helpers.mkdir_p(file_path)
def test_mkdir_p_with_existing_file_raises_error(tmp_path: Path) -> None:
"""Test mkdir_p raises error when trying to create dir over existing file."""
# Create a file where we want to create a directory
file_path = tmp_path / "existing_file"
file_path.write_text("content")
# Try to create a directory with a path that goes through the file
dir_path = file_path / "subdir"
with pytest.raises(EsphomeError, match=r"Error creating directories"):
helpers.mkdir_p(dir_path)
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
def test_read_file_unix(tmp_path: Path) -> None:
"""Test read_file reads file content correctly on Unix."""
# Test reading regular file
test_file = tmp_path / "test.txt"
expected_content = "Test content\nLine 2\n"
test_file.write_text(expected_content)
content = helpers.read_file(test_file)
assert content == expected_content
# Test reading file with UTF-8 characters
utf8_file = tmp_path / "utf8.txt"
utf8_content = "Hello 世界 🌍"
utf8_file.write_text(utf8_content, encoding="utf-8")
content = helpers.read_file(utf8_file)
assert content == utf8_content
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
def test_read_file_windows(tmp_path: Path) -> None:
"""Test read_file reads file content correctly on Windows."""
# Test reading regular file
test_file = tmp_path / "test.txt"
expected_content = "Test content\nLine 2\n"
test_file.write_text(expected_content)
content = helpers.read_file(test_file)
# On Windows, text mode reading converts \n to \r\n
assert content == expected_content.replace("\n", "\r\n")
# Test reading file with UTF-8 characters
utf8_file = tmp_path / "utf8.txt"
utf8_content = "Hello 世界 🌍"
utf8_file.write_text(utf8_content, encoding="utf-8")
content = helpers.read_file(utf8_file)
assert content == utf8_content
def test_read_file_not_found() -> None:
"""Test read_file raises error for non-existent file."""
with pytest.raises(EsphomeError, match=r"Error reading file"):
helpers.read_file("/nonexistent/file.txt")
def test_read_file_unicode_decode_error(tmp_path: Path) -> None:
"""Test read_file raises error for invalid UTF-8."""
test_file = tmp_path / "invalid.txt"
# Write invalid UTF-8 bytes
test_file.write_bytes(b"\xff\xfe")
with pytest.raises(EsphomeError, match=r"Error reading file"):
helpers.read_file(test_file)
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific test")
def test_write_file_unix(tmp_path: Path) -> None:
"""Test write_file writes content correctly on Unix."""
# Test writing string content
test_file = tmp_path / "test.txt"
content = "Test content\nLine 2"
helpers.write_file(test_file, content)
assert test_file.read_text() == content
# Check file permissions
assert oct(test_file.stat().st_mode)[-3:] == "644"
# Test overwriting existing file
new_content = "New content"
helpers.write_file(test_file, new_content)
assert test_file.read_text() == new_content
# Test writing to nested directories (should create them)
nested_file = tmp_path / "dir1" / "dir2" / "file.txt"
helpers.write_file(nested_file, content)
assert nested_file.read_text() == content
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
def test_write_file_windows(tmp_path: Path) -> None:
"""Test write_file writes content correctly on Windows."""
# Test writing string content
test_file = tmp_path / "test.txt"
content = "Test content\nLine 2"
helpers.write_file(test_file, content)
assert test_file.read_text() == content
# Windows doesn't have Unix-style 644 permissions
# Test overwriting existing file
new_content = "New content"
helpers.write_file(test_file, new_content)
assert test_file.read_text() == new_content
# Test writing to nested directories (should create them)
nested_file = tmp_path / "dir1" / "dir2" / "file.txt"
helpers.write_file(nested_file, content)
assert nested_file.read_text() == content
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific permission test")
def test_write_file_to_non_writable_directory_unix(tmp_path: Path) -> None:
"""Test write_file raises error when directory is not writable on Unix."""
# Create a directory and make it read-only
read_only_dir = tmp_path / "readonly"
read_only_dir.mkdir()
test_file = read_only_dir / "test.txt"
# Make directory read-only (no write permission)
read_only_dir.chmod(0o555)
try:
with pytest.raises(EsphomeError, match=r"Could not write file"):
helpers.write_file(test_file, "content")
finally:
# Restore write permissions for cleanup
read_only_dir.chmod(0o755)
@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
def test_write_file_to_non_writable_directory_windows(tmp_path: Path) -> None:
"""Test write_file error handling on Windows."""
# Windows handles permissions differently - test a different error case
# Try to write to a file path that contains an existing file as a directory component
existing_file = tmp_path / "file.txt"
existing_file.write_text("content")
# Try to write to a path that treats the file as a directory
invalid_path = existing_file / "subdir" / "test.txt"
with pytest.raises(EsphomeError, match=r"Could not write file"):
helpers.write_file(invalid_path, "content")
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific permission test")
def test_write_file_with_permission_bits_unix(tmp_path: Path) -> None:
"""Test that write_file sets correct permissions on Unix."""
test_file = tmp_path / "test.txt"
helpers.write_file(test_file, "content")
# Check that file has 644 permissions
file_mode = test_file.stat().st_mode
assert stat.S_IMODE(file_mode) == 0o644
@pytest.mark.skipif(os.name == "nt", reason="Unix-specific permission test")
def test_copy_file_if_changed_permission_recovery_unix(tmp_path: Path) -> None:
"""Test copy_file_if_changed handles permission errors correctly on Unix."""
# Test with read-only destination file
src = tmp_path / "source.txt"
dst = tmp_path / "dest.txt"
src.write_text("new content")
dst.write_text("old content")
dst.chmod(0o444) # Make destination read-only
try:
# Should handle permission error by deleting and retrying
helpers.copy_file_if_changed(src, dst)
assert dst.read_text() == "new content"
finally:
# Restore write permissions for cleanup
if dst.exists():
dst.chmod(0o644)
def test_copy_file_if_changed_creates_directories(tmp_path: Path) -> None:
"""Test copy_file_if_changed creates missing directories."""
src = tmp_path / "source.txt"
dst = tmp_path / "subdir" / "nested" / "dest.txt"
src.write_text("content")
helpers.copy_file_if_changed(src, dst)
assert dst.exists()
assert dst.read_text() == "content"
def test_copy_file_if_changed_nonexistent_source(tmp_path: Path) -> None:
"""Test copy_file_if_changed with non-existent source."""
src = tmp_path / "nonexistent.txt"
dst = tmp_path / "dest.txt"
with pytest.raises(EsphomeError, match=r"Error copying file"):
helpers.copy_file_if_changed(src, dst)
def test_resolve_ip_address_sorting() -> None:
"""Test that results are sorted by preference."""
# Create multiple address infos with different preferences

View File

@@ -1,10 +1,16 @@
"""Tests for platformio_api.py path functions."""
import json
import os
from pathlib import Path
from unittest.mock import patch
import shutil
from types import SimpleNamespace
from unittest.mock import MagicMock, Mock, patch
import pytest
from esphome import platformio_api
from esphome.core import CORE
from esphome.core import CORE, EsphomeError
def test_idedata_firmware_elf_path(setup_core: Path) -> None:
@@ -104,7 +110,9 @@ def test_flash_image_dataclass() -> None:
assert image.offset == "0x10000"
def test_load_idedata_returns_dict(setup_core: Path) -> None:
def test_load_idedata_returns_dict(
setup_core: Path, mock_run_platformio_cli_run
) -> None:
"""Test _load_idedata returns parsed idedata dict when successful."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
@@ -118,12 +126,511 @@ def test_load_idedata_returns_dict(setup_core: Path) -> None:
idedata_path.parent.mkdir(parents=True, exist_ok=True)
idedata_path.write_text('{"prog_path": "/test/firmware.elf"}')
with patch("esphome.platformio_api.run_platformio_cli_run") as mock_run:
mock_run.return_value = '{"prog_path": "/test/firmware.elf"}'
mock_run_platformio_cli_run.return_value = '{"prog_path": "/test/firmware.elf"}'
config = {"name": "test"}
result = platformio_api._load_idedata(config)
config = {"name": "test"}
result = platformio_api._load_idedata(config)
assert result is not None
assert isinstance(result, dict)
assert result["prog_path"] == "/test/firmware.elf"
def test_load_idedata_uses_cache_when_valid(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _load_idedata uses cached data when unchanged."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
# Create platformio.ini
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
platformio_ini.write_text("content")
# Create idedata cache file that's newer
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
idedata_path.parent.mkdir(parents=True, exist_ok=True)
idedata_path.write_text('{"prog_path": "/cached/firmware.elf"}')
# Make idedata newer than platformio.ini
platformio_ini_mtime = platformio_ini.stat().st_mtime
os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1))
config = {"name": "test"}
result = platformio_api._load_idedata(config)
# Should not call _run_idedata since cache is valid
mock_run_platformio_cli_run.assert_not_called()
assert result["prog_path"] == "/cached/firmware.elf"
def test_load_idedata_regenerates_when_platformio_ini_newer(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _load_idedata regenerates when platformio.ini is newer."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
# Create idedata cache file first
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
idedata_path.parent.mkdir(parents=True, exist_ok=True)
idedata_path.write_text('{"prog_path": "/old/firmware.elf"}')
# Create platformio.ini that's newer
idedata_mtime = idedata_path.stat().st_mtime
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
platformio_ini.write_text("content")
# Make platformio.ini newer than idedata
os.utime(platformio_ini, (idedata_mtime + 1, idedata_mtime + 1))
# Mock platformio to return new data
new_data = {"prog_path": "/new/firmware.elf"}
mock_run_platformio_cli_run.return_value = json.dumps(new_data)
config = {"name": "test"}
result = platformio_api._load_idedata(config)
# Should call _run_idedata since platformio.ini is newer
mock_run_platformio_cli_run.assert_called_once()
assert result["prog_path"] == "/new/firmware.elf"
def test_load_idedata_regenerates_on_corrupted_cache(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _load_idedata regenerates when cache file is corrupted."""
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
# Create platformio.ini
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
platformio_ini.write_text("content")
# Create corrupted idedata cache file
idedata_path = setup_core / ".esphome" / "idedata" / "test.json"
idedata_path.parent.mkdir(parents=True, exist_ok=True)
idedata_path.write_text('{"prog_path": invalid json')
# Make idedata newer so it would be used if valid
platformio_ini_mtime = platformio_ini.stat().st_mtime
os.utime(idedata_path, (platformio_ini_mtime + 1, platformio_ini_mtime + 1))
# Mock platformio to return new data
new_data = {"prog_path": "/new/firmware.elf"}
mock_run_platformio_cli_run.return_value = json.dumps(new_data)
config = {"name": "test"}
result = platformio_api._load_idedata(config)
# Should call _run_idedata since cache is corrupted
mock_run_platformio_cli_run.assert_called_once()
assert result["prog_path"] == "/new/firmware.elf"
def test_run_idedata_parses_json_from_output(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _run_idedata extracts JSON from platformio output."""
config = {"name": "test"}
expected_data = {
"prog_path": "/path/to/firmware.elf",
"cc_path": "/path/to/gcc",
"extra": {"flash_images": []},
}
# Simulate platformio output with JSON embedded
mock_run_platformio_cli_run.return_value = (
f"Some preamble\n{json.dumps(expected_data)}\nSome postamble"
)
result = platformio_api._run_idedata(config)
assert result == expected_data
def test_run_idedata_raises_on_no_json(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _run_idedata raises EsphomeError when no JSON found."""
config = {"name": "test"}
mock_run_platformio_cli_run.return_value = "No JSON in this output"
with pytest.raises(EsphomeError):
platformio_api._run_idedata(config)
def test_run_idedata_raises_on_invalid_json(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test _run_idedata raises on malformed JSON."""
config = {"name": "test"}
mock_run_platformio_cli_run.return_value = '{"invalid": json"}'
# The ValueError from json.loads is re-raised
with pytest.raises(ValueError):
platformio_api._run_idedata(config)
def test_run_platformio_cli_sets_environment_variables(
setup_core: Path, mock_run_external_command: Mock
) -> None:
"""Test run_platformio_cli sets correct environment variables."""
CORE.build_path = str(setup_core / "build" / "test")
with patch.dict(os.environ, {}, clear=False):
mock_run_external_command.return_value = 0
platformio_api.run_platformio_cli("test", "arg")
# Check environment variables were set
assert os.environ["PLATFORMIO_FORCE_COLOR"] == "true"
assert (
setup_core / "build" / "test"
in Path(os.environ["PLATFORMIO_BUILD_DIR"]).parents
or Path(os.environ["PLATFORMIO_BUILD_DIR"]) == setup_core / "build" / "test"
)
assert "PLATFORMIO_LIBDEPS_DIR" in os.environ
assert "PYTHONWARNINGS" in os.environ
# Check command was called correctly
mock_run_external_command.assert_called_once()
args = mock_run_external_command.call_args[0]
assert "platformio" in args
assert "test" in args
assert "arg" in args
def test_run_platformio_cli_run_builds_command(
setup_core: Path, mock_run_platformio_cli: Mock
) -> None:
"""Test run_platformio_cli_run builds correct command."""
CORE.build_path = str(setup_core / "build" / "test")
mock_run_platformio_cli.return_value = 0
config = {"name": "test"}
platformio_api.run_platformio_cli_run(config, True, "extra", "args")
mock_run_platformio_cli.assert_called_once_with(
"run", "-d", CORE.build_path, "-v", "extra", "args"
)
def test_run_compile(setup_core: Path, mock_run_platformio_cli_run: Mock) -> None:
"""Test run_compile with process limit."""
from esphome.const import CONF_COMPILE_PROCESS_LIMIT, CONF_ESPHOME
CORE.build_path = str(setup_core / "build" / "test")
config = {CONF_ESPHOME: {CONF_COMPILE_PROCESS_LIMIT: 4}}
mock_run_platformio_cli_run.return_value = 0
platformio_api.run_compile(config, verbose=True)
mock_run_platformio_cli_run.assert_called_once_with(config, True, "-j4")
def test_get_idedata_caches_result(
setup_core: Path, mock_run_platformio_cli_run: Mock
) -> None:
"""Test get_idedata caches result in CORE.data."""
from esphome.const import KEY_CORE
CORE.build_path = str(setup_core / "build" / "test")
CORE.name = "test"
CORE.data[KEY_CORE] = {}
# Create platformio.ini to avoid regeneration
platformio_ini = setup_core / "build" / "test" / "platformio.ini"
platformio_ini.parent.mkdir(parents=True, exist_ok=True)
platformio_ini.write_text("content")
# Mock platformio to return data
idedata = {"prog_path": "/test/firmware.elf"}
mock_run_platformio_cli_run.return_value = json.dumps(idedata)
config = {"name": "test"}
# First call should load and cache
result1 = platformio_api.get_idedata(config)
mock_run_platformio_cli_run.assert_called_once()
# Second call should use cache from CORE.data
result2 = platformio_api.get_idedata(config)
mock_run_platformio_cli_run.assert_called_once() # Still only called once
assert result1 is result2
assert isinstance(result1, platformio_api.IDEData)
assert result1.firmware_elf_path == "/test/firmware.elf"
def test_idedata_addr2line_path_windows(setup_core: Path) -> None:
"""Test IDEData.addr2line_path on Windows."""
raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "C:\\tools\\gcc.exe"}
idedata = platformio_api.IDEData(raw_data)
result = idedata.addr2line_path
assert result == "C:\\tools\\addr2line.exe"
def test_idedata_addr2line_path_unix(setup_core: Path) -> None:
"""Test IDEData.addr2line_path on Unix."""
raw_data = {"prog_path": "/path/to/firmware.elf", "cc_path": "/usr/bin/gcc"}
idedata = platformio_api.IDEData(raw_data)
result = idedata.addr2line_path
assert result == "/usr/bin/addr2line"
def test_patch_structhash(setup_core: Path) -> None:
"""Test patch_structhash monkey patches platformio functions."""
# Create simple namespace objects to act as modules
mock_cli = SimpleNamespace()
mock_helpers = SimpleNamespace()
mock_run = SimpleNamespace(cli=mock_cli, helpers=mock_helpers)
# Mock platformio modules
with patch.dict(
"sys.modules",
{
"platformio.run.cli": mock_cli,
"platformio.run.helpers": mock_helpers,
"platformio.run": mock_run,
"platformio.project.helpers": MagicMock(),
"platformio.fs": MagicMock(),
"platformio": MagicMock(),
},
):
# Call patch_structhash
platformio_api.patch_structhash()
# Verify both modules had clean_build_dir patched
# Check that clean_build_dir was set on both modules
assert hasattr(mock_cli, "clean_build_dir")
assert hasattr(mock_helpers, "clean_build_dir")
# Verify they got the same function assigned
assert mock_cli.clean_build_dir is mock_helpers.clean_build_dir
# Verify it's a real function (not a Mock)
assert callable(mock_cli.clean_build_dir)
assert mock_cli.clean_build_dir.__name__ == "patched_clean_build_dir"
def test_patched_clean_build_dir_removes_outdated(setup_core: Path) -> None:
"""Test patched_clean_build_dir removes build dir when platformio.ini is newer."""
build_dir = setup_core / "build"
build_dir.mkdir()
platformio_ini = setup_core / "platformio.ini"
platformio_ini.write_text("config")
# Make platformio.ini newer than build_dir
build_mtime = build_dir.stat().st_mtime
os.utime(platformio_ini, (build_mtime + 1, build_mtime + 1))
# Track if directory was removed
removed_paths: list[str] = []
def track_rmtree(path: str) -> None:
removed_paths.append(path)
shutil.rmtree(path)
# Create mock modules that patch_structhash expects
mock_cli = SimpleNamespace()
mock_helpers = SimpleNamespace()
mock_project_helpers = MagicMock()
mock_project_helpers.get_project_dir.return_value = str(setup_core)
mock_fs = SimpleNamespace(rmtree=track_rmtree)
with patch.dict(
"sys.modules",
{
"platformio": SimpleNamespace(fs=mock_fs),
"platformio.fs": mock_fs,
"platformio.project.helpers": mock_project_helpers,
"platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers),
"platformio.run.cli": mock_cli,
"platformio.run.helpers": mock_helpers,
},
):
# Call patch_structhash to install the patched function
platformio_api.patch_structhash()
# Call the patched function
mock_helpers.clean_build_dir(str(build_dir), [])
# Verify directory was removed and recreated
assert len(removed_paths) == 1
assert removed_paths[0] == str(build_dir)
assert build_dir.exists() # makedirs recreated it
def test_patched_clean_build_dir_keeps_updated(setup_core: Path) -> None:
"""Test patched_clean_build_dir keeps build dir when it's up to date."""
build_dir = setup_core / "build"
build_dir.mkdir()
test_file = build_dir / "test.txt"
test_file.write_text("test content")
platformio_ini = setup_core / "platformio.ini"
platformio_ini.write_text("config")
# Make build_dir newer than platformio.ini
ini_mtime = platformio_ini.stat().st_mtime
os.utime(build_dir, (ini_mtime + 1, ini_mtime + 1))
# Track if rmtree is called
removed_paths: list[str] = []
def track_rmtree(path: str) -> None:
removed_paths.append(path)
# Create mock modules
mock_cli = SimpleNamespace()
mock_helpers = SimpleNamespace()
mock_project_helpers = MagicMock()
mock_project_helpers.get_project_dir.return_value = str(setup_core)
mock_fs = SimpleNamespace(rmtree=track_rmtree)
with patch.dict(
"sys.modules",
{
"platformio": SimpleNamespace(fs=mock_fs),
"platformio.fs": mock_fs,
"platformio.project.helpers": mock_project_helpers,
"platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers),
"platformio.run.cli": mock_cli,
"platformio.run.helpers": mock_helpers,
},
):
# Call patch_structhash to install the patched function
platformio_api.patch_structhash()
# Call the patched function
mock_helpers.clean_build_dir(str(build_dir), [])
# Verify rmtree was NOT called
assert len(removed_paths) == 0
# Verify directory and file still exist
assert build_dir.exists()
assert test_file.exists()
assert test_file.read_text() == "test content"
def test_patched_clean_build_dir_creates_missing(setup_core: Path) -> None:
"""Test patched_clean_build_dir creates build dir when it doesn't exist."""
build_dir = setup_core / "build"
platformio_ini = setup_core / "platformio.ini"
platformio_ini.write_text("config")
# Ensure build_dir doesn't exist
assert not build_dir.exists()
# Track if rmtree is called
removed_paths: list[str] = []
def track_rmtree(path: str) -> None:
removed_paths.append(path)
# Create mock modules
mock_cli = SimpleNamespace()
mock_helpers = SimpleNamespace()
mock_project_helpers = MagicMock()
mock_project_helpers.get_project_dir.return_value = str(setup_core)
mock_fs = SimpleNamespace(rmtree=track_rmtree)
with patch.dict(
"sys.modules",
{
"platformio": SimpleNamespace(fs=mock_fs),
"platformio.fs": mock_fs,
"platformio.project.helpers": mock_project_helpers,
"platformio.run": SimpleNamespace(cli=mock_cli, helpers=mock_helpers),
"platformio.run.cli": mock_cli,
"platformio.run.helpers": mock_helpers,
},
):
# Call patch_structhash to install the patched function
platformio_api.patch_structhash()
# Call the patched function
mock_helpers.clean_build_dir(str(build_dir), [])
# Verify rmtree was NOT called
assert len(removed_paths) == 0
# Verify directory was created
assert build_dir.exists()
def test_process_stacktrace_esp8266_exception(setup_core: Path, caplog) -> None:
"""Test process_stacktrace handles ESP8266 exceptions."""
config = {"name": "test"}
# Test exception type parsing
line = "Exception (28):"
backtrace_state = False
result = platformio_api.process_stacktrace(config, line, backtrace_state)
assert "Access to invalid address: LOAD (wild pointer?)" in caplog.text
assert result is False
def test_process_stacktrace_esp8266_backtrace(
setup_core: Path, mock_decode_pc: Mock
) -> None:
"""Test process_stacktrace handles ESP8266 multi-line backtrace."""
config = {"name": "test"}
# Start of backtrace
line1 = ">>>stack>>>"
state = platformio_api.process_stacktrace(config, line1, False)
assert state is True
# Backtrace content with addresses
line2 = "40201234 40205678"
state = platformio_api.process_stacktrace(config, line2, state)
assert state is True
assert mock_decode_pc.call_count == 2
# End of backtrace
line3 = "<<<stack<<<"
state = platformio_api.process_stacktrace(config, line3, state)
assert state is False
def test_process_stacktrace_esp32_backtrace(
setup_core: Path, mock_decode_pc: Mock
) -> None:
"""Test process_stacktrace handles ESP32 single-line backtrace."""
config = {"name": "test"}
line = "Backtrace: 0x40081234:0x3ffb1234 0x40085678:0x3ffb5678"
state = platformio_api.process_stacktrace(config, line, False)
# Should decode both addresses
assert mock_decode_pc.call_count == 2
mock_decode_pc.assert_any_call(config, "40081234")
mock_decode_pc.assert_any_call(config, "40085678")
assert state is False
def test_process_stacktrace_bad_alloc(
setup_core: Path, mock_decode_pc: Mock, caplog
) -> None:
"""Test process_stacktrace handles bad alloc messages."""
config = {"name": "test"}
line = "last failed alloc call: 40201234(512)"
state = platformio_api.process_stacktrace(config, line, False)
assert "Memory allocation of 512 bytes failed at 40201234" in caplog.text
mock_decode_pc.assert_called_once_with(config, "40201234")
assert state is False

View File

@@ -1,12 +1,15 @@
"""Tests for storage_json.py path functions."""
from datetime import datetime
import json
from pathlib import Path
import sys
from unittest.mock import patch
from unittest.mock import MagicMock, Mock, patch
import pytest
from esphome import storage_json
from esphome.const import CONF_DISABLED, CONF_MDNS
from esphome.core import CORE
@@ -115,7 +118,9 @@ def test_storage_json_firmware_bin_path_property(setup_core: Path) -> None:
assert storage.firmware_bin_path == "/path/to/firmware.bin"
def test_storage_json_save_creates_directory(setup_core: Path, tmp_path: Path) -> None:
def test_storage_json_save_creates_directory(
setup_core: Path, tmp_path: Path, mock_write_file_if_changed: Mock
) -> None:
"""Test StorageJSON.save creates storage directory if it doesn't exist."""
storage_dir = tmp_path / "new_data" / "storage"
storage_file = storage_dir / "test.json"
@@ -139,11 +144,10 @@ def test_storage_json_save_creates_directory(setup_core: Path, tmp_path: Path) -
no_mdns=False,
)
with patch("esphome.storage_json.write_file_if_changed") as mock_write:
storage.save(str(storage_file))
mock_write.assert_called_once()
call_args = mock_write.call_args[0]
assert call_args[0] == str(storage_file)
storage.save(str(storage_file))
mock_write_file_if_changed.assert_called_once()
call_args = mock_write_file_if_changed.call_args[0]
assert call_args[0] == str(storage_file)
def test_storage_json_from_wizard(setup_core: Path) -> None:
@@ -180,3 +184,477 @@ def test_storage_paths_with_ha_addon(mock_is_ha_addon: bool, tmp_path: Path) ->
result = storage_json.esphome_storage_path()
expected = str(Path("/data") / "esphome.json")
assert result == expected
def test_storage_json_as_dict() -> None:
"""Test StorageJSON.as_dict returns correct dictionary."""
storage = storage_json.StorageJSON(
storage_version=1,
name="test_device",
friendly_name="Test Device",
comment="Test comment",
esphome_version="2024.1.0",
src_version=1,
address="192.168.1.100",
web_port=80,
target_platform="ESP32",
build_path="/path/to/build",
firmware_bin_path="/path/to/firmware.bin",
loaded_integrations={"wifi", "api", "ota"},
loaded_platforms={"sensor", "binary_sensor"},
no_mdns=True,
framework="arduino",
core_platform="esp32",
)
result = storage.as_dict()
assert result["storage_version"] == 1
assert result["name"] == "test_device"
assert result["friendly_name"] == "Test Device"
assert result["comment"] == "Test comment"
assert result["esphome_version"] == "2024.1.0"
assert result["src_version"] == 1
assert result["address"] == "192.168.1.100"
assert result["web_port"] == 80
assert result["esp_platform"] == "ESP32"
assert result["build_path"] == "/path/to/build"
assert result["firmware_bin_path"] == "/path/to/firmware.bin"
assert "api" in result["loaded_integrations"]
assert "wifi" in result["loaded_integrations"]
assert "ota" in result["loaded_integrations"]
assert result["loaded_integrations"] == sorted(
["wifi", "api", "ota"]
) # Should be sorted
assert "sensor" in result["loaded_platforms"]
assert result["loaded_platforms"] == sorted(
["sensor", "binary_sensor"]
) # Should be sorted
assert result["no_mdns"] is True
assert result["framework"] == "arduino"
assert result["core_platform"] == "esp32"
def test_storage_json_to_json() -> None:
"""Test StorageJSON.to_json returns valid JSON string."""
storage = storage_json.StorageJSON(
storage_version=1,
name="test",
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=None,
address="test.local",
web_port=None,
target_platform="ESP8266",
build_path=None,
firmware_bin_path=None,
loaded_integrations=set(),
loaded_platforms=set(),
no_mdns=False,
)
json_str = storage.to_json()
# Should be valid JSON
parsed = json.loads(json_str)
assert parsed["name"] == "test"
assert parsed["storage_version"] == 1
# Should end with newline
assert json_str.endswith("\n")
def test_storage_json_save(tmp_path: Path) -> None:
"""Test StorageJSON.save writes file correctly."""
storage = storage_json.StorageJSON(
storage_version=1,
name="test",
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=None,
address="test.local",
web_port=None,
target_platform="ESP32",
build_path=None,
firmware_bin_path=None,
loaded_integrations=set(),
loaded_platforms=set(),
no_mdns=False,
)
save_path = tmp_path / "test.json"
with patch("esphome.storage_json.write_file_if_changed") as mock_write:
storage.save(str(save_path))
mock_write.assert_called_once_with(str(save_path), storage.to_json())
def test_storage_json_from_esphome_core(setup_core: Path) -> None:
"""Test StorageJSON.from_esphome_core creates correct storage object."""
# Mock CORE object
mock_core = MagicMock()
mock_core.name = "my_device"
mock_core.friendly_name = "My Device"
mock_core.comment = "A test device"
mock_core.address = "192.168.1.50"
mock_core.web_port = 8080
mock_core.target_platform = "esp32"
mock_core.is_esp32 = True
mock_core.build_path = "/build/my_device"
mock_core.firmware_bin = "/build/my_device/firmware.bin"
mock_core.loaded_integrations = {"wifi", "api"}
mock_core.loaded_platforms = {"sensor"}
mock_core.config = {CONF_MDNS: {CONF_DISABLED: True}}
mock_core.target_framework = "esp-idf"
with patch("esphome.components.esp32.get_esp32_variant") as mock_variant:
mock_variant.return_value = "ESP32-C3"
result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None)
assert result.name == "my_device"
assert result.friendly_name == "My Device"
assert result.comment == "A test device"
assert result.address == "192.168.1.50"
assert result.web_port == 8080
assert result.target_platform == "ESP32-C3"
assert result.build_path == "/build/my_device"
assert result.firmware_bin_path == "/build/my_device/firmware.bin"
assert result.loaded_integrations == {"wifi", "api"}
assert result.loaded_platforms == {"sensor"}
assert result.no_mdns is True
assert result.framework == "esp-idf"
assert result.core_platform == "esp32"
def test_storage_json_from_esphome_core_mdns_enabled(setup_core: Path) -> None:
"""Test from_esphome_core with mDNS enabled."""
mock_core = MagicMock()
mock_core.name = "test"
mock_core.friendly_name = "Test"
mock_core.comment = None
mock_core.address = "test.local"
mock_core.web_port = None
mock_core.target_platform = "esp8266"
mock_core.is_esp32 = False
mock_core.build_path = "/build"
mock_core.firmware_bin = "/build/firmware.bin"
mock_core.loaded_integrations = set()
mock_core.loaded_platforms = set()
mock_core.config = {} # No MDNS config means enabled
mock_core.target_framework = "arduino"
result = storage_json.StorageJSON.from_esphome_core(mock_core, old=None)
assert result.no_mdns is False
def test_storage_json_load_valid_file(tmp_path: Path) -> None:
"""Test StorageJSON.load with valid JSON file."""
storage_data = {
"storage_version": 1,
"name": "loaded_device",
"friendly_name": "Loaded Device",
"comment": "Loaded from file",
"esphome_version": "2024.1.0",
"src_version": 2,
"address": "10.0.0.1",
"web_port": 8080,
"esp_platform": "ESP32",
"build_path": "/loaded/build",
"firmware_bin_path": "/loaded/firmware.bin",
"loaded_integrations": ["wifi", "api"],
"loaded_platforms": ["sensor"],
"no_mdns": True,
"framework": "arduino",
"core_platform": "esp32",
}
file_path = tmp_path / "storage.json"
file_path.write_text(json.dumps(storage_data))
result = storage_json.StorageJSON.load(str(file_path))
assert result is not None
assert result.name == "loaded_device"
assert result.friendly_name == "Loaded Device"
assert result.comment == "Loaded from file"
assert result.esphome_version == "2024.1.0"
assert result.src_version == 2
assert result.address == "10.0.0.1"
assert result.web_port == 8080
assert result.target_platform == "ESP32"
assert result.build_path == "/loaded/build"
assert result.firmware_bin_path == "/loaded/firmware.bin"
assert result.loaded_integrations == {"wifi", "api"}
assert result.loaded_platforms == {"sensor"}
assert result.no_mdns is True
assert result.framework == "arduino"
assert result.core_platform == "esp32"
def test_storage_json_load_invalid_file(tmp_path: Path) -> None:
"""Test StorageJSON.load with invalid JSON file."""
file_path = tmp_path / "invalid.json"
file_path.write_text("not valid json{")
result = storage_json.StorageJSON.load(str(file_path))
assert result is None
def test_storage_json_load_nonexistent_file() -> None:
"""Test StorageJSON.load with non-existent file."""
result = storage_json.StorageJSON.load("/nonexistent/file.json")
assert result is None
def test_storage_json_equality() -> None:
"""Test StorageJSON equality comparison."""
storage1 = storage_json.StorageJSON(
storage_version=1,
name="test",
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=1,
address="test.local",
web_port=80,
target_platform="ESP32",
build_path="/build",
firmware_bin_path="/firmware.bin",
loaded_integrations={"wifi"},
loaded_platforms=set(),
no_mdns=False,
)
storage2 = storage_json.StorageJSON(
storage_version=1,
name="test",
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=1,
address="test.local",
web_port=80,
target_platform="ESP32",
build_path="/build",
firmware_bin_path="/firmware.bin",
loaded_integrations={"wifi"},
loaded_platforms=set(),
no_mdns=False,
)
storage3 = storage_json.StorageJSON(
storage_version=1,
name="different", # Different name
friendly_name="Test",
comment=None,
esphome_version="2024.1.0",
src_version=1,
address="test.local",
web_port=80,
target_platform="ESP32",
build_path="/build",
firmware_bin_path="/firmware.bin",
loaded_integrations={"wifi"},
loaded_platforms=set(),
no_mdns=False,
)
assert storage1 == storage2
assert storage1 != storage3
assert storage1 != "not a storage object"
def test_esphome_storage_json_as_dict() -> None:
"""Test EsphomeStorageJSON.as_dict returns correct dictionary."""
storage = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret123",
last_update_check="2024-01-15T10:30:00",
remote_version="2024.1.1",
)
result = storage.as_dict()
assert result["storage_version"] == 1
assert result["cookie_secret"] == "secret123"
assert result["last_update_check"] == "2024-01-15T10:30:00"
assert result["remote_version"] == "2024.1.1"
def test_esphome_storage_json_last_update_check_property() -> None:
"""Test EsphomeStorageJSON.last_update_check property."""
storage = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret",
last_update_check="2024-01-15T10:30:00",
remote_version=None,
)
# Test getter
result = storage.last_update_check
assert isinstance(result, datetime)
assert result.year == 2024
assert result.month == 1
assert result.day == 15
assert result.hour == 10
assert result.minute == 30
# Test setter
new_date = datetime(2024, 2, 20, 15, 45, 30)
storage.last_update_check = new_date
assert storage.last_update_check_str == "2024-02-20T15:45:30"
def test_esphome_storage_json_last_update_check_invalid() -> None:
"""Test EsphomeStorageJSON.last_update_check with invalid date."""
storage = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret",
last_update_check="invalid date",
remote_version=None,
)
result = storage.last_update_check
assert result is None
def test_esphome_storage_json_to_json() -> None:
"""Test EsphomeStorageJSON.to_json returns valid JSON string."""
storage = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="mysecret",
last_update_check="2024-01-15T10:30:00",
remote_version="2024.1.1",
)
json_str = storage.to_json()
# Should be valid JSON
parsed = json.loads(json_str)
assert parsed["cookie_secret"] == "mysecret"
assert parsed["storage_version"] == 1
# Should end with newline
assert json_str.endswith("\n")
def test_esphome_storage_json_save(tmp_path: Path) -> None:
"""Test EsphomeStorageJSON.save writes file correctly."""
storage = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret",
last_update_check=None,
remote_version=None,
)
save_path = tmp_path / "esphome.json"
with patch("esphome.storage_json.write_file_if_changed") as mock_write:
storage.save(str(save_path))
mock_write.assert_called_once_with(str(save_path), storage.to_json())
def test_esphome_storage_json_load_valid_file(tmp_path: Path) -> None:
"""Test EsphomeStorageJSON.load with valid JSON file."""
storage_data = {
"storage_version": 1,
"cookie_secret": "loaded_secret",
"last_update_check": "2024-01-20T14:30:00",
"remote_version": "2024.1.2",
}
file_path = tmp_path / "esphome.json"
file_path.write_text(json.dumps(storage_data))
result = storage_json.EsphomeStorageJSON.load(str(file_path))
assert result is not None
assert result.storage_version == 1
assert result.cookie_secret == "loaded_secret"
assert result.last_update_check_str == "2024-01-20T14:30:00"
assert result.remote_version == "2024.1.2"
def test_esphome_storage_json_load_invalid_file(tmp_path: Path) -> None:
"""Test EsphomeStorageJSON.load with invalid JSON file."""
file_path = tmp_path / "invalid.json"
file_path.write_text("not valid json{")
result = storage_json.EsphomeStorageJSON.load(str(file_path))
assert result is None
def test_esphome_storage_json_load_nonexistent_file() -> None:
"""Test EsphomeStorageJSON.load with non-existent file."""
result = storage_json.EsphomeStorageJSON.load("/nonexistent/file.json")
assert result is None
def test_esphome_storage_json_get_default() -> None:
"""Test EsphomeStorageJSON.get_default creates default storage."""
with patch("esphome.storage_json.os.urandom") as mock_urandom:
# Mock urandom to return predictable bytes
mock_urandom.return_value = b"test" * 16 # 64 bytes
result = storage_json.EsphomeStorageJSON.get_default()
assert result.storage_version == 1
assert len(result.cookie_secret) == 128 # 64 bytes hex = 128 chars
assert result.last_update_check is None
assert result.remote_version is None
def test_esphome_storage_json_equality() -> None:
"""Test EsphomeStorageJSON equality comparison."""
storage1 = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret",
last_update_check="2024-01-15T10:30:00",
remote_version="2024.1.1",
)
storage2 = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="secret",
last_update_check="2024-01-15T10:30:00",
remote_version="2024.1.1",
)
storage3 = storage_json.EsphomeStorageJSON(
storage_version=1,
cookie_secret="different", # Different secret
last_update_check="2024-01-15T10:30:00",
remote_version="2024.1.1",
)
assert storage1 == storage2
assert storage1 != storage3
assert storage1 != "not a storage object"
def test_storage_json_load_legacy_esphomeyaml_version(tmp_path: Path) -> None:
"""Test loading storage with legacy esphomeyaml_version field."""
storage_data = {
"storage_version": 1,
"name": "legacy_device",
"friendly_name": "Legacy Device",
"esphomeyaml_version": "1.14.0", # Legacy field name
"address": "legacy.local",
"esp_platform": "ESP8266",
}
file_path = tmp_path / "legacy.json"
file_path.write_text(json.dumps(storage_data))
result = storage_json.StorageJSON.load(str(file_path))
assert result is not None
assert result.esphome_version == "1.14.0" # Should map to esphome_version

View File

@@ -1,5 +1,7 @@
"""Tests for esphome.util module."""
from __future__ import annotations
from pathlib import Path
import pytest
@@ -308,3 +310,85 @@ def test_filter_yaml_files_case_sensitive() -> None:
assert "/path/to/config.YAML" not in result
assert "/path/to/config.YML" not in result
assert "/path/to/config.Yaml" not in result
@pytest.mark.parametrize(
("input_str", "expected"),
[
# Empty string
("", "''"),
# Simple strings that don't need quoting
("hello", "hello"),
("test123", "test123"),
("file.txt", "file.txt"),
("/path/to/file", "/path/to/file"),
("user@host", "user@host"),
("value:123", "value:123"),
("item,list", "item,list"),
("path-with-dash", "path-with-dash"),
# Strings that need quoting
("hello world", "'hello world'"),
("test\ttab", "'test\ttab'"),
("line\nbreak", "'line\nbreak'"),
("semicolon;here", "'semicolon;here'"),
("pipe|symbol", "'pipe|symbol'"),
("redirect>file", "'redirect>file'"),
("redirect<file", "'redirect<file'"),
("background&", "'background&'"),
("dollar$sign", "'dollar$sign'"),
("backtick`cmd", "'backtick`cmd'"),
('double"quote', "'double\"quote'"),
("backslash\\path", "'backslash\\path'"),
("question?mark", "'question?mark'"),
("asterisk*wild", "'asterisk*wild'"),
("bracket[test]", "'bracket[test]'"),
("paren(test)", "'paren(test)'"),
("curly{brace}", "'curly{brace}'"),
# Single quotes in string (special escaping)
("it's", "'it'\"'\"'s'"),
("don't", "'don'\"'\"'t'"),
("'quoted'", "''\"'\"'quoted'\"'\"''"),
# Complex combinations
("test 'with' quotes", "'test '\"'\"'with'\"'\"' quotes'"),
("path/to/file's.txt", "'path/to/file'\"'\"'s.txt'"),
],
)
def test_shlex_quote(input_str: str, expected: str) -> None:
"""Test shlex_quote properly escapes shell arguments."""
assert util.shlex_quote(input_str) == expected
def test_shlex_quote_safe_characters() -> None:
"""Test that safe characters are not quoted."""
# These characters are considered safe and shouldn't be quoted
safe_chars = (
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789@%+=:,./-_"
)
for char in safe_chars:
assert util.shlex_quote(char) == char
assert util.shlex_quote(f"test{char}test") == f"test{char}test"
def test_shlex_quote_unsafe_characters() -> None:
"""Test that unsafe characters trigger quoting."""
# These characters should trigger quoting
unsafe_chars = ' \t\n;|>&<$`"\\?*[](){}!#~^'
for char in unsafe_chars:
result = util.shlex_quote(f"test{char}test")
assert result.startswith("'")
assert result.endswith("'")
def test_shlex_quote_edge_cases() -> None:
"""Test edge cases for shlex_quote."""
# Multiple single quotes
assert util.shlex_quote("'''") == "''\"'\"''\"'\"''\"'\"''"
# Mixed quotes
assert util.shlex_quote('"\'"') == "'\"'\"'\"'\"'"
# Only whitespace
assert util.shlex_quote(" ") == "' '"
assert util.shlex_quote("\t") == "'\t'"
assert util.shlex_quote("\n") == "'\n'"
assert util.shlex_quote(" ") == "' '"