1
0
mirror of https://github.com/esphome/esphome.git synced 2025-09-29 08:32:26 +01:00

Merge branch 'integration' into memory_api

This commit is contained in:
J. Nick Koston
2025-09-19 22:44:34 -06:00
104 changed files with 1620 additions and 1005 deletions

View File

@@ -6,6 +6,7 @@ import getpass
import importlib
import logging
import os
from pathlib import Path
import re
import sys
import time
@@ -452,7 +453,7 @@ def upload_using_esptool(
"detect",
]
for img in flash_images:
cmd += [img.offset, img.path]
cmd += [img.offset, str(img.path)]
if os.environ.get("ESPHOME_USE_SUBPROCESS") is None:
import esptool
@@ -538,7 +539,10 @@ def upload_program(
remote_port = int(ota_conf[CONF_PORT])
password = ota_conf.get(CONF_PASSWORD, "")
binary = args.file if getattr(args, "file", None) is not None else CORE.firmware_bin
if getattr(args, "file", None) is not None:
binary = Path(args.file)
else:
binary = CORE.firmware_bin
# MQTT address resolution
if get_port_type(host) in ("MQTT", "MQTTIP"):
@@ -605,7 +609,7 @@ def clean_mqtt(config: ConfigType, args: ArgsProtocol) -> int | None:
def command_wizard(args: ArgsProtocol) -> int | None:
from esphome import wizard
return wizard.wizard(args.configuration)
return wizard.wizard(Path(args.configuration))
def command_config(args: ArgsProtocol, config: ConfigType) -> int | None:
@@ -825,7 +829,8 @@ def command_idedata(args: ArgsProtocol, config: ConfigType) -> int:
def command_rename(args: ArgsProtocol, config: ConfigType) -> int | None:
for c in args.name:
new_name = args.name
for c in new_name:
if c not in ALLOWED_NAME_CHARS:
print(
color(
@@ -836,8 +841,7 @@ def command_rename(args: ArgsProtocol, config: ConfigType) -> int | None:
)
return 1
# Load existing yaml file
with open(CORE.config_path, mode="r+", encoding="utf-8") as raw_file:
raw_contents = raw_file.read()
raw_contents = CORE.config_path.read_text(encoding="utf-8")
yaml = yaml_util.load_yaml(CORE.config_path)
if CONF_ESPHOME not in yaml or CONF_NAME not in yaml[CONF_ESPHOME]:
@@ -852,7 +856,7 @@ def command_rename(args: ArgsProtocol, config: ConfigType) -> int | None:
if match is None:
new_raw = re.sub(
rf"name:\s+[\"']?{old_name}[\"']?",
f'name: "{args.name}"',
f'name: "{new_name}"',
raw_contents,
)
else:
@@ -872,29 +876,28 @@ def command_rename(args: ArgsProtocol, config: ConfigType) -> int | None:
new_raw = re.sub(
rf"^(\s+{match.group(1)}):\s+[\"']?{old_name}[\"']?",
f'\\1: "{args.name}"',
f'\\1: "{new_name}"',
raw_contents,
flags=re.MULTILINE,
)
new_path = os.path.join(CORE.config_dir, args.name + ".yaml")
new_path: Path = CORE.config_dir / (new_name + ".yaml")
print(
f"Updating {color(AnsiFore.CYAN, CORE.config_path)} to {color(AnsiFore.CYAN, new_path)}"
f"Updating {color(AnsiFore.CYAN, str(CORE.config_path))} to {color(AnsiFore.CYAN, str(new_path))}"
)
print()
with open(new_path, mode="w", encoding="utf-8") as new_file:
new_file.write(new_raw)
new_path.write_text(new_raw, encoding="utf-8")
rc = run_external_process("esphome", "config", new_path)
rc = run_external_process("esphome", "config", str(new_path))
if rc != 0:
print(color(AnsiFore.BOLD_RED, "Rename failed. Reverting changes."))
os.remove(new_path)
new_path.unlink()
return 1
cli_args = [
"run",
new_path,
str(new_path),
"--no-logs",
"--device",
CORE.address,
@@ -908,11 +911,11 @@ def command_rename(args: ArgsProtocol, config: ConfigType) -> int | None:
except KeyboardInterrupt:
rc = 1
if rc != 0:
os.remove(new_path)
new_path.unlink()
return 1
if CORE.config_path != new_path:
os.remove(CORE.config_path)
CORE.config_path.unlink()
print(color(AnsiFore.BOLD_GREEN, "SUCCESS"))
print()
@@ -1280,7 +1283,8 @@ def run_esphome(argv):
_LOGGER.info("ESPHome %s", const.__version__)
for conf_path in args.configuration:
if any(os.path.basename(conf_path) == x for x in SECRETS_FILES):
conf_path = Path(conf_path)
if any(conf_path.name == x for x in SECRETS_FILES):
_LOGGER.warning("Skipping secrets file %s", conf_path)
continue

View File

@@ -1,5 +1,3 @@
import os
from esphome.const import __version__
from esphome.core import CORE
from esphome.helpers import mkdir_p, read_file, write_file_if_changed
@@ -63,7 +61,7 @@ def write_ini(content):
update_storage_json()
path = CORE.relative_build_path("platformio.ini")
if os.path.isfile(path):
if path.is_file():
text = read_file(path)
content_format = find_begin_end(
text, INI_AUTO_GENERATE_BEGIN, INI_AUTO_GENERATE_END

View File

@@ -66,6 +66,9 @@ service APIConnection {
rpc voice_assistant_set_configuration(VoiceAssistantSetConfiguration) returns (void) {}
rpc alarm_control_panel_command (AlarmControlPanelCommandRequest) returns (void) {}
rpc zwave_proxy_frame(ZWaveProxyFrame) returns (void) {}
rpc zwave_proxy_request(ZWaveProxyRequest) returns (void) {}
}
@@ -254,6 +257,10 @@ message DeviceInfoResponse {
// Top-level area info to phase out suggested_area
AreaInfo area = 22 [(field_ifdef) = "USE_AREAS"];
// Indicates if Z-Wave proxy support is available and features supported
uint32 zwave_proxy_feature_flags = 23 [(field_ifdef) = "USE_ZWAVE_PROXY"];
uint32 zwave_home_id = 24 [(field_ifdef) = "USE_ZWAVE_PROXY"];
}
message ListEntitiesRequest {
@@ -2276,3 +2283,26 @@ message UpdateCommandRequest {
UpdateCommand command = 2;
uint32 device_id = 3 [(field_ifdef) = "USE_DEVICES"];
}
// ==================== Z-WAVE ====================
message ZWaveProxyFrame {
option (id) = 128;
option (source) = SOURCE_BOTH;
option (ifdef) = "USE_ZWAVE_PROXY";
option (no_delay) = true;
bytes data = 1 [(fixed_array_size) = 257];
}
enum ZWaveProxyRequestType {
ZWAVE_PROXY_REQUEST_TYPE_SUBSCRIBE = 0;
ZWAVE_PROXY_REQUEST_TYPE_UNSUBSCRIBE = 1;
}
message ZWaveProxyRequest {
option (id) = 129;
option (source) = SOURCE_CLIENT;
option (ifdef) = "USE_ZWAVE_PROXY";
ZWaveProxyRequestType type = 1;
}

View File

@@ -30,6 +30,9 @@
#ifdef USE_VOICE_ASSISTANT
#include "esphome/components/voice_assistant/voice_assistant.h"
#endif
#ifdef USE_ZWAVE_PROXY
#include "esphome/components/zwave_proxy/zwave_proxy.h"
#endif
namespace esphome::api {
@@ -1203,7 +1206,16 @@ void APIConnection::voice_assistant_set_configuration(const VoiceAssistantSetCon
voice_assistant::global_voice_assistant->on_set_configuration(msg.active_wake_words);
}
}
#endif
#ifdef USE_ZWAVE_PROXY
void APIConnection::zwave_proxy_frame(const ZWaveProxyFrame &msg) {
zwave_proxy::global_zwave_proxy->send_frame(msg.data, msg.data_len);
}
void APIConnection::zwave_proxy_request(const ZWaveProxyRequest &msg) {
zwave_proxy::global_zwave_proxy->zwave_proxy_request(this, msg.type);
}
#endif
#ifdef USE_ALARM_CONTROL_PANEL
@@ -1460,6 +1472,10 @@ bool APIConnection::send_device_info_response(const DeviceInfoRequest &msg) {
#ifdef USE_VOICE_ASSISTANT
resp.voice_assistant_feature_flags = voice_assistant::global_voice_assistant->get_feature_flags();
#endif
#ifdef USE_ZWAVE_PROXY
resp.zwave_proxy_feature_flags = zwave_proxy::global_zwave_proxy->get_feature_flags();
resp.zwave_home_id = zwave_proxy::global_zwave_proxy->get_home_id();
#endif
#ifdef USE_API_NOISE
resp.api_encryption_supported = true;
#endif

View File

@@ -171,6 +171,11 @@ class APIConnection final : public APIServerConnection {
void voice_assistant_set_configuration(const VoiceAssistantSetConfiguration &msg) override;
#endif
#ifdef USE_ZWAVE_PROXY
void zwave_proxy_frame(const ZWaveProxyFrame &msg) override;
void zwave_proxy_request(const ZWaveProxyRequest &msg) override;
#endif
#ifdef USE_ALARM_CONTROL_PANEL
bool send_alarm_control_panel_state(alarm_control_panel::AlarmControlPanel *a_alarm_control_panel);
void alarm_control_panel_command(const AlarmControlPanelCommandRequest &msg) override;

View File

@@ -129,6 +129,12 @@ void DeviceInfoResponse::encode(ProtoWriteBuffer buffer) const {
#ifdef USE_AREAS
buffer.encode_message(22, this->area);
#endif
#ifdef USE_ZWAVE_PROXY
buffer.encode_uint32(23, this->zwave_proxy_feature_flags);
#endif
#ifdef USE_ZWAVE_PROXY
buffer.encode_uint32(24, this->zwave_home_id);
#endif
}
void DeviceInfoResponse::calculate_size(ProtoSize &size) const {
#ifdef USE_API_PASSWORD
@@ -181,6 +187,12 @@ void DeviceInfoResponse::calculate_size(ProtoSize &size) const {
#ifdef USE_AREAS
size.add_message_object(2, this->area);
#endif
#ifdef USE_ZWAVE_PROXY
size.add_uint32(2, this->zwave_proxy_feature_flags);
#endif
#ifdef USE_ZWAVE_PROXY
size.add_uint32(2, this->zwave_home_id);
#endif
}
#ifdef USE_BINARY_SENSOR
void ListEntitiesBinarySensorResponse::encode(ProtoWriteBuffer buffer) const {
@@ -3013,5 +3025,35 @@ bool UpdateCommandRequest::decode_32bit(uint32_t field_id, Proto32Bit value) {
return true;
}
#endif
#ifdef USE_ZWAVE_PROXY
bool ZWaveProxyFrame::decode_length(uint32_t field_id, ProtoLengthDelimited value) {
switch (field_id) {
case 1: {
const std::string &data_str = value.as_string();
this->data_len = data_str.size();
if (this->data_len > 257) {
this->data_len = 257;
}
memcpy(this->data, data_str.data(), this->data_len);
break;
}
default:
return false;
}
return true;
}
void ZWaveProxyFrame::encode(ProtoWriteBuffer buffer) const { buffer.encode_bytes(1, this->data, this->data_len); }
void ZWaveProxyFrame::calculate_size(ProtoSize &size) const { size.add_length(1, this->data_len); }
bool ZWaveProxyRequest::decode_varint(uint32_t field_id, ProtoVarInt value) {
switch (field_id) {
case 1:
this->type = static_cast<enums::ZWaveProxyRequestType>(value.as_uint32());
break;
default:
return false;
}
return true;
}
#endif
} // namespace esphome::api

View File

@@ -276,6 +276,12 @@ enum UpdateCommand : uint32_t {
UPDATE_COMMAND_CHECK = 2,
};
#endif
#ifdef USE_ZWAVE_PROXY
enum ZWaveProxyRequestType : uint32_t {
ZWAVE_PROXY_REQUEST_TYPE_SUBSCRIBE = 0,
ZWAVE_PROXY_REQUEST_TYPE_UNSUBSCRIBE = 1,
};
#endif
} // namespace enums
@@ -492,7 +498,7 @@ class DeviceInfo final : public ProtoMessage {
class DeviceInfoResponse final : public ProtoMessage {
public:
static constexpr uint8_t MESSAGE_TYPE = 10;
static constexpr uint8_t ESTIMATED_SIZE = 247;
static constexpr uint16_t ESTIMATED_SIZE = 257;
#ifdef HAS_PROTO_MESSAGE_DUMP
const char *message_name() const override { return "device_info_response"; }
#endif
@@ -552,6 +558,12 @@ class DeviceInfoResponse final : public ProtoMessage {
#endif
#ifdef USE_AREAS
AreaInfo area{};
#endif
#ifdef USE_ZWAVE_PROXY
uint32_t zwave_proxy_feature_flags{0};
#endif
#ifdef USE_ZWAVE_PROXY
uint32_t zwave_home_id{0};
#endif
void encode(ProtoWriteBuffer buffer) const override;
void calculate_size(ProtoSize &size) const override;
@@ -2913,5 +2925,40 @@ class UpdateCommandRequest final : public CommandProtoMessage {
bool decode_varint(uint32_t field_id, ProtoVarInt value) override;
};
#endif
#ifdef USE_ZWAVE_PROXY
class ZWaveProxyFrame final : public ProtoDecodableMessage {
public:
static constexpr uint8_t MESSAGE_TYPE = 128;
static constexpr uint8_t ESTIMATED_SIZE = 33;
#ifdef HAS_PROTO_MESSAGE_DUMP
const char *message_name() const override { return "z_wave_proxy_frame"; }
#endif
uint8_t data[257]{};
uint16_t data_len{0};
void encode(ProtoWriteBuffer buffer) const override;
void calculate_size(ProtoSize &size) const override;
#ifdef HAS_PROTO_MESSAGE_DUMP
void dump_to(std::string &out) const override;
#endif
protected:
bool decode_length(uint32_t field_id, ProtoLengthDelimited value) override;
};
class ZWaveProxyRequest final : public ProtoDecodableMessage {
public:
static constexpr uint8_t MESSAGE_TYPE = 129;
static constexpr uint8_t ESTIMATED_SIZE = 2;
#ifdef HAS_PROTO_MESSAGE_DUMP
const char *message_name() const override { return "z_wave_proxy_request"; }
#endif
enums::ZWaveProxyRequestType type{};
#ifdef HAS_PROTO_MESSAGE_DUMP
void dump_to(std::string &out) const override;
#endif
protected:
bool decode_varint(uint32_t field_id, ProtoVarInt value) override;
};
#endif
} // namespace esphome::api

View File

@@ -655,6 +655,18 @@ template<> const char *proto_enum_to_string<enums::UpdateCommand>(enums::UpdateC
}
}
#endif
#ifdef USE_ZWAVE_PROXY
template<> const char *proto_enum_to_string<enums::ZWaveProxyRequestType>(enums::ZWaveProxyRequestType value) {
switch (value) {
case enums::ZWAVE_PROXY_REQUEST_TYPE_SUBSCRIBE:
return "ZWAVE_PROXY_REQUEST_TYPE_SUBSCRIBE";
case enums::ZWAVE_PROXY_REQUEST_TYPE_UNSUBSCRIBE:
return "ZWAVE_PROXY_REQUEST_TYPE_UNSUBSCRIBE";
default:
return "UNKNOWN";
}
}
#endif
void HelloRequest::dump_to(std::string &out) const {
MessageDumpHelper helper(out, "HelloRequest");
@@ -754,6 +766,12 @@ void DeviceInfoResponse::dump_to(std::string &out) const {
this->area.dump_to(out);
out.append("\n");
#endif
#ifdef USE_ZWAVE_PROXY
dump_field(out, "zwave_proxy_feature_flags", this->zwave_proxy_feature_flags);
#endif
#ifdef USE_ZWAVE_PROXY
dump_field(out, "zwave_home_id", this->zwave_home_id);
#endif
}
void ListEntitiesRequest::dump_to(std::string &out) const { out.append("ListEntitiesRequest {}"); }
void ListEntitiesDoneResponse::dump_to(std::string &out) const { out.append("ListEntitiesDoneResponse {}"); }
@@ -2107,6 +2125,18 @@ void UpdateCommandRequest::dump_to(std::string &out) const {
#endif
}
#endif
#ifdef USE_ZWAVE_PROXY
void ZWaveProxyFrame::dump_to(std::string &out) const {
MessageDumpHelper helper(out, "ZWaveProxyFrame");
out.append(" data: ");
out.append(format_hex_pretty(this->data, this->data_len));
out.append("\n");
}
void ZWaveProxyRequest::dump_to(std::string &out) const {
MessageDumpHelper helper(out, "ZWaveProxyRequest");
dump_field(out, "type", static_cast<enums::ZWaveProxyRequestType>(this->type));
}
#endif
} // namespace esphome::api

View File

@@ -588,6 +588,28 @@ void APIServerConnectionBase::read_message(uint32_t msg_size, uint32_t msg_type,
this->on_bluetooth_scanner_set_mode_request(msg);
break;
}
#endif
#ifdef USE_ZWAVE_PROXY
case ZWaveProxyFrame::MESSAGE_TYPE: {
ZWaveProxyFrame msg;
msg.decode(msg_data, msg_size);
#ifdef HAS_PROTO_MESSAGE_DUMP
ESP_LOGVV(TAG, "on_z_wave_proxy_frame: %s", msg.dump().c_str());
#endif
this->on_z_wave_proxy_frame(msg);
break;
}
#endif
#ifdef USE_ZWAVE_PROXY
case ZWaveProxyRequest::MESSAGE_TYPE: {
ZWaveProxyRequest msg;
msg.decode(msg_data, msg_size);
#ifdef HAS_PROTO_MESSAGE_DUMP
ESP_LOGVV(TAG, "on_z_wave_proxy_request: %s", msg.dump().c_str());
#endif
this->on_z_wave_proxy_request(msg);
break;
}
#endif
default:
break;
@@ -899,5 +921,19 @@ void APIServerConnection::on_alarm_control_panel_command_request(const AlarmCont
}
}
#endif
#ifdef USE_ZWAVE_PROXY
void APIServerConnection::on_z_wave_proxy_frame(const ZWaveProxyFrame &msg) {
if (this->check_authenticated_()) {
this->zwave_proxy_frame(msg);
}
}
#endif
#ifdef USE_ZWAVE_PROXY
void APIServerConnection::on_z_wave_proxy_request(const ZWaveProxyRequest &msg) {
if (this->check_authenticated_()) {
this->zwave_proxy_request(msg);
}
}
#endif
} // namespace esphome::api

View File

@@ -207,6 +207,12 @@ class APIServerConnectionBase : public ProtoService {
#ifdef USE_UPDATE
virtual void on_update_command_request(const UpdateCommandRequest &value){};
#endif
#ifdef USE_ZWAVE_PROXY
virtual void on_z_wave_proxy_frame(const ZWaveProxyFrame &value){};
#endif
#ifdef USE_ZWAVE_PROXY
virtual void on_z_wave_proxy_request(const ZWaveProxyRequest &value){};
#endif
protected:
void read_message(uint32_t msg_size, uint32_t msg_type, uint8_t *msg_data) override;
@@ -335,6 +341,12 @@ class APIServerConnection : public APIServerConnectionBase {
#endif
#ifdef USE_ALARM_CONTROL_PANEL
virtual void alarm_control_panel_command(const AlarmControlPanelCommandRequest &msg) = 0;
#endif
#ifdef USE_ZWAVE_PROXY
virtual void zwave_proxy_frame(const ZWaveProxyFrame &msg) = 0;
#endif
#ifdef USE_ZWAVE_PROXY
virtual void zwave_proxy_request(const ZWaveProxyRequest &msg) = 0;
#endif
protected:
void on_hello_request(const HelloRequest &msg) override;
@@ -459,6 +471,12 @@ class APIServerConnection : public APIServerConnectionBase {
#ifdef USE_ALARM_CONTROL_PANEL
void on_alarm_control_panel_command_request(const AlarmControlPanelCommandRequest &msg) override;
#endif
#ifdef USE_ZWAVE_PROXY
void on_z_wave_proxy_frame(const ZWaveProxyFrame &msg) override;
#endif
#ifdef USE_ZWAVE_PROXY
void on_z_wave_proxy_request(const ZWaveProxyRequest &msg) override;
#endif
};
} // namespace esphome::api

View File

@@ -10,7 +10,8 @@ from esphome.const import (
PLATFORM_LN882X,
PLATFORM_RTL87XX,
)
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
AUTO_LOAD = ["web_server_base", "ota.web_server"]
DEPENDENCIES = ["wifi"]
@@ -40,7 +41,7 @@ CONFIG_SCHEMA = cv.All(
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.CAPTIVE_PORTAL)
async def to_code(config):
paren = await cg.get_variable(config[CONF_WEB_SERVER_BASE_ID])

View File

@@ -2,7 +2,7 @@ from esphome import pins
import esphome.codegen as cg
from esphome.components import i2c, touchscreen
import esphome.config_validation as cv
from esphome.const import CONF_ID, CONF_INTERRUPT_PIN
from esphome.const import CONF_ID, CONF_INTERRUPT_PIN, CONF_RESET_PIN
CODEOWNERS = ["@jesserockz"]
DEPENDENCIES = ["i2c"]
@@ -15,7 +15,7 @@ EKTF2232Touchscreen = ektf2232_ns.class_(
)
CONF_EKTF2232_ID = "ektf2232_id"
CONF_RTS_PIN = "rts_pin"
CONF_RTS_PIN = "rts_pin" # To be removed before 2026.4.0
CONFIG_SCHEMA = touchscreen.TOUCHSCREEN_SCHEMA.extend(
cv.Schema(
@@ -24,7 +24,10 @@ CONFIG_SCHEMA = touchscreen.TOUCHSCREEN_SCHEMA.extend(
cv.Required(CONF_INTERRUPT_PIN): cv.All(
pins.internal_gpio_input_pin_schema
),
cv.Required(CONF_RTS_PIN): pins.gpio_output_pin_schema,
cv.Required(CONF_RESET_PIN): pins.gpio_output_pin_schema,
cv.Optional(CONF_RTS_PIN): cv.invalid(
f"{CONF_RTS_PIN} has been renamed to {CONF_RESET_PIN}"
),
}
).extend(i2c.i2c_device_schema(0x15))
)
@@ -37,5 +40,5 @@ async def to_code(config):
interrupt_pin = await cg.gpio_pin_expression(config[CONF_INTERRUPT_PIN])
cg.add(var.set_interrupt_pin(interrupt_pin))
rts_pin = await cg.gpio_pin_expression(config[CONF_RTS_PIN])
cg.add(var.set_rts_pin(rts_pin))
reset_pin = await cg.gpio_pin_expression(config[CONF_RESET_PIN])
cg.add(var.set_reset_pin(reset_pin))

View File

@@ -21,7 +21,7 @@ void EKTF2232Touchscreen::setup() {
this->attach_interrupt_(this->interrupt_pin_, gpio::INTERRUPT_FALLING_EDGE);
this->rts_pin_->setup();
this->reset_pin_->setup();
this->hard_reset_();
if (!this->soft_reset_()) {
@@ -98,9 +98,9 @@ bool EKTF2232Touchscreen::get_power_state() {
}
void EKTF2232Touchscreen::hard_reset_() {
this->rts_pin_->digital_write(false);
this->reset_pin_->digital_write(false);
delay(15);
this->rts_pin_->digital_write(true);
this->reset_pin_->digital_write(true);
delay(15);
}
@@ -127,7 +127,7 @@ void EKTF2232Touchscreen::dump_config() {
ESP_LOGCONFIG(TAG, "EKT2232 Touchscreen:");
LOG_I2C_DEVICE(this);
LOG_PIN(" Interrupt Pin: ", this->interrupt_pin_);
LOG_PIN(" RTS Pin: ", this->rts_pin_);
LOG_PIN(" Reset Pin: ", this->reset_pin_);
}
} // namespace ektf2232

View File

@@ -17,7 +17,7 @@ class EKTF2232Touchscreen : public Touchscreen, public i2c::I2CDevice {
void dump_config() override;
void set_interrupt_pin(InternalGPIOPin *pin) { this->interrupt_pin_ = pin; }
void set_rts_pin(GPIOPin *pin) { this->rts_pin_ = pin; }
void set_reset_pin(GPIOPin *pin) { this->reset_pin_ = pin; }
void set_power_state(bool enable);
bool get_power_state();
@@ -28,7 +28,7 @@ class EKTF2232Touchscreen : public Touchscreen, public i2c::I2CDevice {
void update_touches() override;
InternalGPIOPin *interrupt_pin_;
GPIOPin *rts_pin_;
GPIOPin *reset_pin_;
};
} // namespace ektf2232

View File

@@ -37,7 +37,7 @@ from esphome.const import (
)
from esphome.core import CORE, HexInt, TimePeriod
import esphome.final_validate as fv
from esphome.helpers import copy_file_if_changed, mkdir_p, write_file_if_changed
from esphome.helpers import copy_file_if_changed, write_file_if_changed
from esphome.types import ConfigType
from esphome.writer import clean_cmake_cache
@@ -272,14 +272,14 @@ def add_idf_component(
}
def add_extra_script(stage: str, filename: str, path: str):
def add_extra_script(stage: str, filename: str, path: Path):
"""Add an extra script to the project."""
key = f"{stage}:{filename}"
if add_extra_build_file(filename, path):
cg.add_platformio_option("extra_scripts", [key])
def add_extra_build_file(filename: str, path: str) -> bool:
def add_extra_build_file(filename: str, path: Path) -> bool:
"""Add an extra build file to the project."""
if filename not in CORE.data[KEY_ESP32][KEY_EXTRA_BUILD_FILES]:
CORE.data[KEY_ESP32][KEY_EXTRA_BUILD_FILES][filename] = {
@@ -818,7 +818,7 @@ async def to_code(config):
add_extra_script(
"post",
"post_build.py",
os.path.join(os.path.dirname(__file__), "post_build.py.script"),
Path(__file__).parent / "post_build.py.script",
)
if conf[CONF_TYPE] == FRAMEWORK_ESP_IDF:
@@ -1040,7 +1040,7 @@ def _write_sdkconfig():
def _write_idf_component_yml():
yml_path = Path(CORE.relative_build_path("src/idf_component.yml"))
yml_path = CORE.relative_build_path("src/idf_component.yml")
if CORE.data[KEY_ESP32][KEY_COMPONENTS]:
components: dict = CORE.data[KEY_ESP32][KEY_COMPONENTS]
dependencies = {}
@@ -1058,8 +1058,8 @@ def _write_idf_component_yml():
contents = ""
if write_file_if_changed(yml_path, contents):
dependencies_lock = CORE.relative_build_path("dependencies.lock")
if os.path.isfile(dependencies_lock):
os.remove(dependencies_lock)
if dependencies_lock.is_file():
dependencies_lock.unlink()
clean_cmake_cache()
@@ -1093,14 +1093,13 @@ def copy_files():
)
for file in CORE.data[KEY_ESP32][KEY_EXTRA_BUILD_FILES].values():
if file[KEY_PATH].startswith("http"):
name: str = file[KEY_NAME]
path: Path = file[KEY_PATH]
if str(path).startswith("http"):
import requests
mkdir_p(CORE.relative_build_path(os.path.dirname(file[KEY_NAME])))
with open(CORE.relative_build_path(file[KEY_NAME]), "wb") as f:
f.write(requests.get(file[KEY_PATH], timeout=30).content)
CORE.relative_build_path(name).parent.mkdir(parents=True, exist_ok=True)
content = requests.get(path, timeout=30).content
CORE.relative_build_path(name).write_bytes(content)
else:
copy_file_if_changed(
file[KEY_PATH],
CORE.relative_build_path(file[KEY_NAME]),
)
copy_file_if_changed(path, CORE.relative_build_path(name))

View File

@@ -1,4 +1,5 @@
import os
from pathlib import Path
from esphome import pins
from esphome.components import esp32
@@ -97,5 +98,5 @@ async def to_code(config):
esp32.add_extra_script(
"post",
"esp32_hosted.py",
os.path.join(os.path.dirname(__file__), "esp32_hosted.py.script"),
Path(__file__).parent / "esp32_hosted.py.script",
)

View File

@@ -1,5 +1,5 @@
import logging
import os
from pathlib import Path
import esphome.codegen as cg
import esphome.config_validation as cv
@@ -259,8 +259,8 @@ async def to_code(config):
# Called by writer.py
def copy_files():
dir = os.path.dirname(__file__)
post_build_file = os.path.join(dir, "post_build.py.script")
dir = Path(__file__).parent
post_build_file = dir / "post_build.py.script"
copy_file_if_changed(
post_build_file,
CORE.relative_build_path("post_build.py"),

View File

@@ -16,7 +16,8 @@ from esphome.const import (
CONF_SAFE_MODE,
CONF_VERSION,
)
from esphome.core import CoroPriority, coroutine_with_priority
from esphome.core import coroutine_with_priority
from esphome.coroutine import CoroPriority
import esphome.final_validate as fv
_LOGGER = logging.getLogger(__name__)
@@ -121,7 +122,7 @@ CONFIG_SCHEMA = (
FINAL_VALIDATE_SCHEMA = ota_esphome_final_validate
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.OTA_UPDATES)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
cg.add(var.set_port(config[CONF_PORT]))

View File

@@ -3,7 +3,6 @@ import functools
import hashlib
from itertools import accumulate
import logging
import os
from pathlib import Path
import re
@@ -38,6 +37,7 @@ from esphome.const import (
)
from esphome.core import CORE, HexInt
from esphome.helpers import cpp_string_escape
from esphome.types import ConfigType
_LOGGER = logging.getLogger(__name__)
@@ -253,11 +253,11 @@ def validate_truetype_file(value):
return CORE.relative_config_path(cv.file_(value))
def add_local_file(value):
def add_local_file(value: ConfigType) -> ConfigType:
if value in FONT_CACHE:
return value
path = value[CONF_PATH]
if not os.path.isfile(path):
path = Path(value[CONF_PATH])
if not path.is_file():
raise cv.Invalid(f"File '{path}' not found.")
FONT_CACHE[value] = path
return value
@@ -318,7 +318,7 @@ def download_gfont(value):
external_files.compute_local_file_dir(DOMAIN)
/ f"{value[CONF_FAMILY]}@{value[CONF_WEIGHT]}@{value[CONF_ITALIC]}@v1.ttf"
)
if not external_files.is_file_recent(str(path), value[CONF_REFRESH]):
if not external_files.is_file_recent(path, value[CONF_REFRESH]):
_LOGGER.debug("download_gfont: path=%s", path)
try:
req = requests.get(url, timeout=external_files.NETWORK_TIMEOUT)

View File

@@ -6,6 +6,7 @@ namespace gpio {
static const char *const TAG = "gpio.binary_sensor";
#if ESPHOME_LOG_LEVEL >= ESPHOME_LOG_LEVEL_DEBUG
static const LogString *interrupt_type_to_string(gpio::InterruptType type) {
switch (type) {
case gpio::INTERRUPT_RISING_EDGE:
@@ -22,6 +23,7 @@ static const LogString *interrupt_type_to_string(gpio::InterruptType type) {
static const LogString *gpio_mode_to_string(bool use_interrupt) {
return use_interrupt ? LOG_STR("interrupt") : LOG_STR("polling");
}
#endif
void IRAM_ATTR GPIOBinarySensorStore::gpio_intr(GPIOBinarySensorStore *arg) {
bool new_state = arg->isr_pin_.digital_read();

View File

@@ -194,7 +194,7 @@ async def to_code(config):
cg.add_define("CPPHTTPLIB_OPENSSL_SUPPORT")
elif path := config.get(CONF_CA_CERTIFICATE_PATH):
cg.add_define("CPPHTTPLIB_OPENSSL_SUPPORT")
cg.add(var.set_ca_path(path))
cg.add(var.set_ca_path(str(path)))
cg.add_build_flag("-lssl")
cg.add_build_flag("-lcrypto")

View File

@@ -3,7 +3,8 @@ import esphome.codegen as cg
from esphome.components.ota import BASE_OTA_SCHEMA, OTAComponent, ota_to_code
import esphome.config_validation as cv
from esphome.const import CONF_ID, CONF_PASSWORD, CONF_URL, CONF_USERNAME
from esphome.core import CoroPriority, coroutine_with_priority
from esphome.core import coroutine_with_priority
from esphome.coroutine import CoroPriority
from .. import CONF_HTTP_REQUEST_ID, HttpRequestComponent, http_request_ns
@@ -40,7 +41,7 @@ CONFIG_SCHEMA = cv.All(
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.OTA_UPDATES)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await ota_to_code(var, config)

View File

@@ -1,6 +1,5 @@
import json
import logging
from os.path import dirname, isfile, join
import esphome.codegen as cg
import esphome.config_validation as cv
@@ -24,6 +23,7 @@ from esphome.const import (
__version__,
)
from esphome.core import CORE
from esphome.storage_json import StorageJSON
from . import gpio # noqa
from .const import (
@@ -129,7 +129,7 @@ def only_on_family(*, supported=None, unsupported=None):
return validator_
def get_download_types(storage_json=None):
def get_download_types(storage_json: StorageJSON = None):
types = [
{
"title": "UF2 package (recommended)",
@@ -139,11 +139,11 @@ def get_download_types(storage_json=None):
},
]
build_dir = dirname(storage_json.firmware_bin_path)
outputs = join(build_dir, "firmware.json")
if not isfile(outputs):
build_dir = storage_json.firmware_bin_path.parent
outputs = build_dir / "firmware.json"
if not outputs.is_file():
return types
with open(outputs, encoding="utf-8") as f:
with outputs.open(encoding="utf-8") as f:
outputs = json.load(f)
for output in outputs:
if not output["public"]:

View File

@@ -11,7 +11,8 @@ from esphome.const import (
CONF_SERVICES,
PlatformFramework,
)
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"]
DEPENDENCIES = ["network"]
@@ -72,7 +73,7 @@ def mdns_service(
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.NETWORK_SERVICES)
async def to_code(config):
if config[CONF_DISABLED] is True:
return

View File

@@ -10,7 +10,8 @@ from esphome.const import (
CONF_TRIGGER_ID,
PlatformFramework,
)
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"]
AUTO_LOAD = ["md5", "safe_mode"]
@@ -82,7 +83,7 @@ BASE_OTA_SCHEMA = cv.Schema(
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.OTA_UPDATES)
async def to_code(config):
cg.add_define("USE_OTA")

View File

@@ -121,15 +121,11 @@ def transport_schema(cls):
return TRANSPORT_SCHEMA.extend({cv.GenerateID(): cv.declare_id(cls)})
# Build a list of sensors for this platform
CORE.data[DOMAIN] = {CONF_SENSORS: []}
def get_sensors(transport_id):
"""Return the list of sensors for this platform."""
return (
sensor
for sensor in CORE.data[DOMAIN][CONF_SENSORS]
for sensor in CORE.data.setdefault(DOMAIN, {}).setdefault(CONF_SENSORS, [])
if sensor[CONF_TRANSPORT_ID] == transport_id
)
@@ -137,7 +133,8 @@ def get_sensors(transport_id):
def validate_packet_transport_sensor(config):
if CONF_NAME in config and CONF_INTERNAL not in config:
raise cv.Invalid("Must provide internal: config when using name:")
CORE.data[DOMAIN][CONF_SENSORS].append(config)
conf_sensors = CORE.data.setdefault(DOMAIN, {}).setdefault(CONF_SENSORS, [])
conf_sensors.append(config)
return config

View File

@@ -1,5 +1,5 @@
import logging
import os
from pathlib import Path
from string import ascii_letters, digits
import esphome.codegen as cg
@@ -19,7 +19,7 @@ from esphome.const import (
ThreadModel,
)
from esphome.core import CORE, CoroPriority, EsphomeError, coroutine_with_priority
from esphome.helpers import copy_file_if_changed, mkdir_p, read_file, write_file
from esphome.helpers import copy_file_if_changed, read_file, write_file_if_changed
from .const import KEY_BOARD, KEY_PIO_FILES, KEY_RP2040, rp2040_ns
@@ -221,18 +221,18 @@ def generate_pio_files() -> bool:
if not files:
return False
for key, data in files.items():
pio_path = CORE.relative_build_path(f"src/pio/{key}.pio")
mkdir_p(os.path.dirname(pio_path))
write_file(pio_path, data)
pio_path = CORE.build_path / "src" / "pio" / f"{key}.pio"
pio_path.parent.mkdir(parents=True, exist_ok=True)
write_file_if_changed(pio_path, data)
includes.append(f"pio/{key}.pio.h")
write_file(
write_file_if_changed(
CORE.relative_build_path("src/pio_includes.h"),
"#pragma once\n" + "\n".join([f'#include "{include}"' for include in includes]),
)
dir = os.path.dirname(__file__)
build_pio_file = os.path.join(dir, "build_pio.py.script")
dir = Path(__file__).parent
build_pio_file = dir / "build_pio.py.script"
copy_file_if_changed(
build_pio_file,
CORE.relative_build_path("build_pio.py"),
@@ -243,8 +243,8 @@ def generate_pio_files() -> bool:
# Called by writer.py
def copy_files():
dir = os.path.dirname(__file__)
post_build_file = os.path.join(dir, "post_build.py.script")
dir = Path(__file__).parent
post_build_file = dir / "post_build.py.script"
copy_file_if_changed(
post_build_file,
CORE.relative_build_path("post_build.py"),
@@ -252,4 +252,4 @@ def copy_files():
if generate_pio_files():
path = CORE.relative_src_path("esphome.h")
content = read_file(path).rstrip("\n")
write_file(path, content + '\n#include "pio_includes.h"\n')
write_file_if_changed(path, content + '\n#include "pio_includes.h"\n')

View File

@@ -3,7 +3,8 @@ from esphome.components.esp32 import add_idf_component
from esphome.components.ota import BASE_OTA_SCHEMA, OTAComponent, ota_to_code
import esphome.config_validation as cv
from esphome.const import CONF_ID
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"]
DEPENDENCIES = ["network", "web_server_base"]
@@ -22,7 +23,7 @@ CONFIG_SCHEMA = (
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.WEB_SERVER_OTA)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await ota_to_code(var, config)

View File

@@ -1,7 +1,8 @@
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.const import CONF_ID
from esphome.core import CORE, CoroPriority, coroutine_with_priority
from esphome.core import CORE, coroutine_with_priority
from esphome.coroutine import CoroPriority
CODEOWNERS = ["@esphome/core"]
DEPENDENCIES = ["network"]
@@ -26,7 +27,7 @@ CONFIG_SCHEMA = cv.Schema(
)
@coroutine_with_priority(CoroPriority.COMMUNICATION)
@coroutine_with_priority(CoroPriority.WEB_SERVER_BASE)
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await cg.register_component(var, config)

View File

@@ -1,4 +1,4 @@
import os
from pathlib import Path
from typing import TypedDict
import esphome.codegen as cg
@@ -48,7 +48,7 @@ class ZephyrData(TypedDict):
bootloader: str
prj_conf: dict[str, tuple[PrjConfValueType, bool]]
overlay: str
extra_build_files: dict[str, str]
extra_build_files: dict[str, Path]
pm_static: list[Section]
user: dict[str, list[str]]
@@ -93,7 +93,7 @@ def zephyr_add_overlay(content):
zephyr_data()[KEY_OVERLAY] += content
def add_extra_build_file(filename: str, path: str) -> bool:
def add_extra_build_file(filename: str, path: Path) -> bool:
"""Add an extra build file to the project."""
extra_build_files = zephyr_data()[KEY_EXTRA_BUILD_FILES]
if filename not in extra_build_files:
@@ -102,7 +102,7 @@ def add_extra_build_file(filename: str, path: str) -> bool:
return False
def add_extra_script(stage: str, filename: str, path: str):
def add_extra_script(stage: str, filename: str, path: Path) -> None:
"""Add an extra script to the project."""
key = f"{stage}:{filename}"
if add_extra_build_file(filename, path):
@@ -144,7 +144,7 @@ def zephyr_to_code(config):
add_extra_script(
"pre",
"pre_build.py",
os.path.join(os.path.dirname(__file__), "pre_build.py.script"),
Path(__file__).parent / "pre_build.py.script",
)

View File

@@ -0,0 +1,43 @@
import esphome.codegen as cg
from esphome.components import uart
import esphome.config_validation as cv
from esphome.const import CONF_ID, CONF_POWER_SAVE_MODE, CONF_WIFI
import esphome.final_validate as fv
CODEOWNERS = ["@kbx81"]
DEPENDENCIES = ["api", "uart"]
zwave_proxy_ns = cg.esphome_ns.namespace("zwave_proxy")
ZWaveProxy = zwave_proxy_ns.class_("ZWaveProxy", cg.Component, uart.UARTDevice)
def final_validate(config):
full_config = fv.full_config.get()
if (wifi_conf := full_config.get(CONF_WIFI)) and (
wifi_conf.get(CONF_POWER_SAVE_MODE).lower() != "none"
):
raise cv.Invalid(
f"{CONF_WIFI} {CONF_POWER_SAVE_MODE} must be set to 'none' when using Z-Wave proxy"
)
return config
CONFIG_SCHEMA = (
cv.Schema(
{
cv.GenerateID(): cv.declare_id(ZWaveProxy),
}
)
.extend(cv.COMPONENT_SCHEMA)
.extend(uart.UART_DEVICE_SCHEMA)
)
FINAL_VALIDATE_SCHEMA = final_validate
async def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
await cg.register_component(var, config)
await uart.register_uart_device(var, config)
cg.add_define("USE_ZWAVE_PROXY")

View File

@@ -0,0 +1,262 @@
#include "zwave_proxy.h"
#include "esphome/core/helpers.h"
#include "esphome/core/log.h"
#include "esphome/core/util.h"
namespace esphome {
namespace zwave_proxy {
static const char *const TAG = "zwave_proxy";
static constexpr uint8_t ZWAVE_COMMAND_GET_NETWORK_IDS = 0x20;
// GET_NETWORK_IDS response: [SOF][LENGTH][TYPE][CMD][HOME_ID(4)][NODE_ID][...]
static constexpr uint8_t ZWAVE_COMMAND_TYPE_RESPONSE = 0x01; // Response type field value
static constexpr uint8_t ZWAVE_MIN_GET_NETWORK_IDS_LENGTH = 9; // TYPE + CMD + HOME_ID(4) + NODE_ID + checksum
static uint8_t calculate_frame_checksum(const uint8_t *data, uint8_t length) {
// Calculate Z-Wave frame checksum
// XOR all bytes between SOF and checksum position (exclusive)
// Initial value is 0xFF per Z-Wave protocol specification
uint8_t checksum = 0xFF;
for (uint8_t i = 1; i < length - 1; i++) {
checksum ^= data[i];
}
return checksum;
}
ZWaveProxy::ZWaveProxy() { global_zwave_proxy = this; }
void ZWaveProxy::setup() { this->send_simple_command_(ZWAVE_COMMAND_GET_NETWORK_IDS); }
void ZWaveProxy::loop() {
if (this->response_handler_()) {
ESP_LOGV(TAG, "Handled late response");
}
if (this->api_connection_ != nullptr && (!this->api_connection_->is_connection_setup() || !api_is_connected())) {
ESP_LOGW(TAG, "Subscriber disconnected");
this->api_connection_ = nullptr; // Unsubscribe if disconnected
}
while (this->available()) {
uint8_t byte;
if (!this->read_byte(&byte)) {
this->status_set_warning("UART read failed");
return;
}
if (this->parse_byte_(byte)) {
// Check if this is a GET_NETWORK_IDS response frame
// Frame format: [SOF][LENGTH][TYPE][CMD][HOME_ID(4)][NODE_ID][...]
// We verify:
// - buffer_[0]: Start of frame marker (0x01)
// - buffer_[1]: Length field must be >= 9 to contain all required data
// - buffer_[2]: Command type (0x01 for response)
// - buffer_[3]: Command ID (0x20 for GET_NETWORK_IDS)
if (this->buffer_[3] == ZWAVE_COMMAND_GET_NETWORK_IDS && this->buffer_[2] == ZWAVE_COMMAND_TYPE_RESPONSE &&
this->buffer_[1] >= ZWAVE_MIN_GET_NETWORK_IDS_LENGTH && this->buffer_[0] == ZWAVE_FRAME_TYPE_START) {
// Extract the 4-byte Home ID starting at offset 4
// The frame parser has already validated the checksum and ensured all bytes are present
std::memcpy(this->home_id_.data(), this->buffer_.data() + 4, this->home_id_.size());
ESP_LOGI(TAG, "Home ID: %s",
format_hex_pretty(this->home_id_.data(), this->home_id_.size(), ':', false).c_str());
}
ESP_LOGV(TAG, "Sending to client: %s", YESNO(this->api_connection_ != nullptr));
if (this->api_connection_ != nullptr) {
// minimize copying to reduce CPU overhead
if (this->in_bootloader_) {
this->outgoing_proto_msg_.data_len = this->buffer_index_;
} else {
// If this is a data frame, use frame length indicator + 2 (for SoF + checksum), else assume 1 for ACK/NAK/CAN
this->outgoing_proto_msg_.data_len = this->buffer_[0] == ZWAVE_FRAME_TYPE_START ? this->buffer_[1] + 2 : 1;
}
std::memcpy(this->outgoing_proto_msg_.data, this->buffer_.data(), this->outgoing_proto_msg_.data_len);
this->api_connection_->send_message(this->outgoing_proto_msg_, api::ZWaveProxyFrame::MESSAGE_TYPE);
}
}
}
this->status_clear_warning();
}
void ZWaveProxy::dump_config() { ESP_LOGCONFIG(TAG, "Z-Wave Proxy"); }
void ZWaveProxy::zwave_proxy_request(api::APIConnection *api_connection, api::enums::ZWaveProxyRequestType type) {
switch (type) {
case api::enums::ZWAVE_PROXY_REQUEST_TYPE_SUBSCRIBE:
if (this->api_connection_ != nullptr) {
ESP_LOGE(TAG, "Only one API subscription is allowed at a time");
return;
}
this->api_connection_ = api_connection;
ESP_LOGV(TAG, "API connection is now subscribed");
break;
case api::enums::ZWAVE_PROXY_REQUEST_TYPE_UNSUBSCRIBE:
if (this->api_connection_ != api_connection) {
ESP_LOGV(TAG, "API connection is not subscribed");
return;
}
this->api_connection_ = nullptr;
break;
default:
ESP_LOGW(TAG, "Unknown request type: %d", type);
break;
}
}
void ZWaveProxy::send_frame(const uint8_t *data, size_t length) {
if (length == 1 && data[0] == this->last_response_) {
ESP_LOGV(TAG, "Skipping sending duplicate response: 0x%02X", data[0]);
return;
}
ESP_LOGVV(TAG, "Sending: %s", format_hex_pretty(data, length).c_str());
this->write_array(data, length);
}
void ZWaveProxy::send_simple_command_(const uint8_t command_id) {
// Send a simple Z-Wave command with no parameters
// Frame format: [SOF][LENGTH][TYPE][CMD][CHECKSUM]
// Where LENGTH=0x03 (3 bytes: TYPE + CMD + CHECKSUM)
uint8_t cmd[] = {0x01, 0x03, 0x00, command_id, 0x00};
cmd[4] = calculate_frame_checksum(cmd, sizeof(cmd));
this->send_frame(cmd, sizeof(cmd));
}
bool ZWaveProxy::parse_byte_(uint8_t byte) {
bool frame_completed = false;
// Basic parsing logic for received frames
switch (this->parsing_state_) {
case ZWAVE_PARSING_STATE_WAIT_START:
this->parse_start_(byte);
break;
case ZWAVE_PARSING_STATE_WAIT_LENGTH:
if (!byte) {
ESP_LOGW(TAG, "Invalid LENGTH: %u", byte);
this->parsing_state_ = ZWAVE_PARSING_STATE_SEND_NAK;
return false;
}
ESP_LOGVV(TAG, "Received LENGTH: %u", byte);
this->end_frame_after_ = this->buffer_index_ + byte;
ESP_LOGVV(TAG, "Calculated EOF: %u", this->end_frame_after_);
this->buffer_[this->buffer_index_++] = byte;
this->parsing_state_ = ZWAVE_PARSING_STATE_WAIT_TYPE;
break;
case ZWAVE_PARSING_STATE_WAIT_TYPE:
this->buffer_[this->buffer_index_++] = byte;
ESP_LOGVV(TAG, "Received TYPE: 0x%02X", byte);
this->parsing_state_ = ZWAVE_PARSING_STATE_WAIT_COMMAND_ID;
break;
case ZWAVE_PARSING_STATE_WAIT_COMMAND_ID:
this->buffer_[this->buffer_index_++] = byte;
ESP_LOGVV(TAG, "Received COMMAND ID: 0x%02X", byte);
this->parsing_state_ = ZWAVE_PARSING_STATE_WAIT_PAYLOAD;
break;
case ZWAVE_PARSING_STATE_WAIT_PAYLOAD:
this->buffer_[this->buffer_index_++] = byte;
ESP_LOGVV(TAG, "Received PAYLOAD: 0x%02X", byte);
if (this->buffer_index_ >= this->end_frame_after_) {
this->parsing_state_ = ZWAVE_PARSING_STATE_WAIT_CHECKSUM;
}
break;
case ZWAVE_PARSING_STATE_WAIT_CHECKSUM: {
this->buffer_[this->buffer_index_++] = byte;
auto checksum = calculate_frame_checksum(this->buffer_.data(), this->buffer_index_);
ESP_LOGVV(TAG, "CHECKSUM Received: 0x%02X - Calculated: 0x%02X", byte, checksum);
if (checksum != byte) {
ESP_LOGW(TAG, "Bad checksum: expected 0x%02X, got 0x%02X", checksum, byte);
this->parsing_state_ = ZWAVE_PARSING_STATE_SEND_NAK;
} else {
this->parsing_state_ = ZWAVE_PARSING_STATE_SEND_ACK;
ESP_LOGVV(TAG, "Received frame: %s", format_hex_pretty(this->buffer_.data(), this->buffer_index_).c_str());
frame_completed = true;
}
this->response_handler_();
break;
}
case ZWAVE_PARSING_STATE_READ_BL_MENU:
this->buffer_[this->buffer_index_++] = byte;
if (!byte) {
this->parsing_state_ = ZWAVE_PARSING_STATE_WAIT_START;
frame_completed = true;
}
break;
case ZWAVE_PARSING_STATE_SEND_ACK:
case ZWAVE_PARSING_STATE_SEND_NAK:
break; // Should not happen, handled in loop()
default:
ESP_LOGW(TAG, "Bad parsing state; resetting");
this->parsing_state_ = ZWAVE_PARSING_STATE_WAIT_START;
break;
}
return frame_completed;
}
void ZWaveProxy::parse_start_(uint8_t byte) {
this->buffer_index_ = 0;
this->parsing_state_ = ZWAVE_PARSING_STATE_WAIT_START;
switch (byte) {
case ZWAVE_FRAME_TYPE_START:
ESP_LOGVV(TAG, "Received START");
if (this->in_bootloader_) {
ESP_LOGD(TAG, "Exited bootloader mode");
this->in_bootloader_ = false;
}
this->buffer_[this->buffer_index_++] = byte;
this->parsing_state_ = ZWAVE_PARSING_STATE_WAIT_LENGTH;
return;
case ZWAVE_FRAME_TYPE_BL_MENU:
ESP_LOGVV(TAG, "Received BL_MENU");
if (!this->in_bootloader_) {
ESP_LOGD(TAG, "Entered bootloader mode");
this->in_bootloader_ = true;
}
this->buffer_[this->buffer_index_++] = byte;
this->parsing_state_ = ZWAVE_PARSING_STATE_READ_BL_MENU;
return;
case ZWAVE_FRAME_TYPE_BL_BEGIN_UPLOAD:
ESP_LOGVV(TAG, "Received BL_BEGIN_UPLOAD");
break;
case ZWAVE_FRAME_TYPE_ACK:
ESP_LOGVV(TAG, "Received ACK");
break;
case ZWAVE_FRAME_TYPE_NAK:
ESP_LOGW(TAG, "Received NAK");
break;
case ZWAVE_FRAME_TYPE_CAN:
ESP_LOGW(TAG, "Received CAN");
break;
default:
ESP_LOGW(TAG, "Unrecognized START: 0x%02X", byte);
return;
}
// Forward response (ACK/NAK/CAN) back to client for processing
if (this->api_connection_ != nullptr) {
this->outgoing_proto_msg_.data[0] = byte;
this->outgoing_proto_msg_.data_len = 1;
this->api_connection_->send_message(this->outgoing_proto_msg_, api::ZWaveProxyFrame::MESSAGE_TYPE);
}
}
bool ZWaveProxy::response_handler_() {
switch (this->parsing_state_) {
case ZWAVE_PARSING_STATE_SEND_ACK:
this->last_response_ = ZWAVE_FRAME_TYPE_ACK;
break;
case ZWAVE_PARSING_STATE_SEND_CAN:
this->last_response_ = ZWAVE_FRAME_TYPE_CAN;
break;
case ZWAVE_PARSING_STATE_SEND_NAK:
this->last_response_ = ZWAVE_FRAME_TYPE_NAK;
break;
default:
return false; // No response handled
}
ESP_LOGVV(TAG, "Sending %s (0x%02X)", this->last_response_ == ZWAVE_FRAME_TYPE_ACK ? "ACK" : "NAK/CAN",
this->last_response_);
this->write_byte(this->last_response_);
this->parsing_state_ = ZWAVE_PARSING_STATE_WAIT_START;
return true;
}
ZWaveProxy *global_zwave_proxy = nullptr; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
} // namespace zwave_proxy
} // namespace esphome

View File

@@ -0,0 +1,81 @@
#pragma once
#include "esphome/components/api/api_connection.h"
#include "esphome/components/api/api_pb2.h"
#include "esphome/core/component.h"
#include "esphome/core/helpers.h"
#include "esphome/components/uart/uart.h"
#include <array>
namespace esphome {
namespace zwave_proxy {
enum ZWaveResponseTypes : uint8_t {
ZWAVE_FRAME_TYPE_ACK = 0x06,
ZWAVE_FRAME_TYPE_CAN = 0x18,
ZWAVE_FRAME_TYPE_NAK = 0x15,
ZWAVE_FRAME_TYPE_START = 0x01,
ZWAVE_FRAME_TYPE_BL_MENU = 0x0D,
ZWAVE_FRAME_TYPE_BL_BEGIN_UPLOAD = 0x43,
};
enum ZWaveParsingState : uint8_t {
ZWAVE_PARSING_STATE_WAIT_START,
ZWAVE_PARSING_STATE_WAIT_LENGTH,
ZWAVE_PARSING_STATE_WAIT_TYPE,
ZWAVE_PARSING_STATE_WAIT_COMMAND_ID,
ZWAVE_PARSING_STATE_WAIT_PAYLOAD,
ZWAVE_PARSING_STATE_WAIT_CHECKSUM,
ZWAVE_PARSING_STATE_SEND_ACK,
ZWAVE_PARSING_STATE_SEND_CAN,
ZWAVE_PARSING_STATE_SEND_NAK,
ZWAVE_PARSING_STATE_READ_BL_MENU,
};
enum ZWaveProxyFeature : uint32_t {
FEATURE_ZWAVE_PROXY_ENABLED = 1 << 0,
};
class ZWaveProxy : public uart::UARTDevice, public Component {
public:
ZWaveProxy();
void setup() override;
void loop() override;
void dump_config() override;
void zwave_proxy_request(api::APIConnection *api_connection, api::enums::ZWaveProxyRequestType type);
api::APIConnection *get_api_connection() { return this->api_connection_; }
uint32_t get_feature_flags() const { return ZWaveProxyFeature::FEATURE_ZWAVE_PROXY_ENABLED; }
uint32_t get_home_id() {
return encode_uint32(this->home_id_[0], this->home_id_[1], this->home_id_[2], this->home_id_[3]);
}
void send_frame(const uint8_t *data, size_t length);
protected:
void send_simple_command_(uint8_t command_id);
bool parse_byte_(uint8_t byte); // Returns true if frame parsing was completed (a frame is ready in the buffer)
void parse_start_(uint8_t byte);
bool response_handler_();
api::APIConnection *api_connection_{nullptr}; // Current subscribed client
std::array<uint8_t, 4> home_id_{0, 0, 0, 0}; // Fixed buffer for home ID
std::array<uint8_t, sizeof(api::ZWaveProxyFrame::data)> buffer_; // Fixed buffer for incoming data
uint8_t buffer_index_{0}; // Index for populating the data buffer
uint8_t end_frame_after_{0}; // Payload reception ends after this index
uint8_t last_response_{0}; // Last response type sent
ZWaveParsingState parsing_state_{ZWAVE_PARSING_STATE_WAIT_START};
bool in_bootloader_{false}; // True if the device is detected to be in bootloader mode
// Pre-allocated message - always ready to send
api::ZWaveProxyFrame outgoing_proto_msg_;
};
extern ZWaveProxy *global_zwave_proxy; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
} // namespace zwave_proxy
} // namespace esphome

View File

@@ -15,7 +15,7 @@ from ipaddress import (
ip_network,
)
import logging
import os
from pathlib import Path
import re
from string import ascii_letters, digits
import uuid as uuid_
@@ -1609,34 +1609,32 @@ def dimensions(value):
return dimensions([match.group(1), match.group(2)])
def directory(value):
def directory(value: object) -> Path:
value = string(value)
path = CORE.relative_config_path(value)
if not os.path.exists(path):
if not path.exists():
raise Invalid(
f"Could not find directory '{path}'. Please make sure it exists (full path: {os.path.abspath(path)})."
f"Could not find directory '{path}'. Please make sure it exists (full path: {path.resolve()})."
)
if not os.path.isdir(path):
if not path.is_dir():
raise Invalid(
f"Path '{path}' is not a directory (full path: {os.path.abspath(path)})."
f"Path '{path}' is not a directory (full path: {path.resolve()})."
)
return value
return path
def file_(value):
def file_(value: object) -> Path:
value = string(value)
path = CORE.relative_config_path(value)
if not os.path.exists(path):
if not path.exists():
raise Invalid(
f"Could not find file '{path}'. Please make sure it exists (full path: {os.path.abspath(path)})."
f"Could not find file '{path}'. Please make sure it exists (full path: {path.resolve()})."
)
if not os.path.isfile(path):
raise Invalid(
f"Path '{path}' is not a file (full path: {os.path.abspath(path)})."
)
return value
if not path.is_file():
raise Invalid(f"Path '{path}' is not a file (full path: {path.resolve()}).")
return path
ENTITY_ID_CHARACTERS = "abcdefghijklmnopqrstuvwxyz0123456789_"

View File

@@ -3,6 +3,7 @@ from contextlib import contextmanager
import logging
import math
import os
from pathlib import Path
import re
from typing import TYPE_CHECKING
@@ -383,7 +384,7 @@ class DocumentLocation:
@classmethod
def from_mark(cls, mark):
return cls(mark.name, mark.line, mark.column)
return cls(str(mark.name), mark.line, mark.column)
def __str__(self):
return f"{self.document} {self.line}:{self.column}"
@@ -538,9 +539,9 @@ class EsphomeCore:
# The first key to this dict should always be the integration name
self.data = {}
# The relative path to the configuration YAML
self.config_path: str | None = None
self.config_path: Path | None = None
# The relative path to where all build files are stored
self.build_path: str | None = None
self.build_path: Path | None = None
# The validated configuration, this is None until the config has been validated
self.config: ConfigType | None = None
# The pending tasks in the task queue (mostly for C++ generation)
@@ -664,39 +665,42 @@ class EsphomeCore:
return None
@property
def config_dir(self):
return os.path.abspath(os.path.dirname(self.config_path))
def config_dir(self) -> Path:
if self.config_path.is_dir():
return self.config_path.absolute()
return self.config_path.absolute().parent
@property
def data_dir(self):
def data_dir(self) -> Path:
if is_ha_addon():
return os.path.join("/data")
return Path("/data")
if "ESPHOME_DATA_DIR" in os.environ:
return get_str_env("ESPHOME_DATA_DIR", None)
return Path(get_str_env("ESPHOME_DATA_DIR", None))
return self.relative_config_path(".esphome")
@property
def config_filename(self):
return os.path.basename(self.config_path)
def config_filename(self) -> str:
return self.config_path.name
def relative_config_path(self, *path):
path_ = os.path.expanduser(os.path.join(*path))
return os.path.join(self.config_dir, path_)
def relative_config_path(self, *path: str | Path) -> Path:
path_ = Path(*path).expanduser()
return self.config_dir / path_
def relative_internal_path(self, *path: str) -> str:
return os.path.join(self.data_dir, *path)
def relative_internal_path(self, *path: str | Path) -> Path:
path_ = Path(*path).expanduser()
return self.data_dir / path_
def relative_build_path(self, *path):
path_ = os.path.expanduser(os.path.join(*path))
return os.path.join(self.build_path, path_)
def relative_build_path(self, *path: str | Path) -> Path:
path_ = Path(*path).expanduser()
return self.build_path / path_
def relative_src_path(self, *path):
def relative_src_path(self, *path: str | Path) -> Path:
return self.relative_build_path("src", *path)
def relative_pioenvs_path(self, *path):
def relative_pioenvs_path(self, *path: str | Path) -> Path:
return self.relative_build_path(".pioenvs", *path)
def relative_piolibdeps_path(self, *path):
def relative_piolibdeps_path(self, *path: str | Path) -> Path:
return self.relative_build_path(".piolibdeps", *path)
@property
@@ -709,7 +713,7 @@ class EsphomeCore:
return os.path.expanduser("~/.platformio/.cache")
@property
def firmware_bin(self):
def firmware_bin(self) -> Path:
if self.is_libretiny:
return self.relative_pioenvs_path(self.name, "firmware.uf2")
return self.relative_pioenvs_path(self.name, "firmware.bin")

View File

@@ -136,21 +136,21 @@ def validate_ids_and_references(config: ConfigType) -> ConfigType:
return config
def valid_include(value):
def valid_include(value: str) -> str:
# Look for "<...>" includes
if value.startswith("<") and value.endswith(">"):
return value
try:
return cv.directory(value)
return str(cv.directory(value))
except cv.Invalid:
pass
value = cv.file_(value)
_, ext = os.path.splitext(value)
path = cv.file_(value)
ext = path.suffix
if ext not in VALID_INCLUDE_EXTS:
raise cv.Invalid(
f"Include has invalid file extension {ext} - valid extensions are {', '.join(VALID_INCLUDE_EXTS)}"
)
return value
return str(path)
def valid_project_name(value: str):
@@ -311,9 +311,9 @@ def preload_core_config(config, result) -> str:
CORE.data[KEY_CORE] = {}
if CONF_BUILD_PATH not in conf:
build_path = get_str_env("ESPHOME_BUILD_PATH", "build")
conf[CONF_BUILD_PATH] = os.path.join(build_path, CORE.name)
CORE.build_path = CORE.relative_internal_path(conf[CONF_BUILD_PATH])
build_path = Path(get_str_env("ESPHOME_BUILD_PATH", "build"))
conf[CONF_BUILD_PATH] = str(build_path / CORE.name)
CORE.build_path = CORE.data_dir / conf[CONF_BUILD_PATH]
target_platforms = []
@@ -339,12 +339,12 @@ def preload_core_config(config, result) -> str:
return target_platforms[0]
def include_file(path, basename):
parts = basename.split(os.path.sep)
def include_file(path: Path, basename: Path):
parts = basename.parts
dst = CORE.relative_src_path(*parts)
copy_file_if_changed(path, dst)
_, ext = os.path.splitext(path)
ext = path.suffix
if ext in [".h", ".hpp", ".tcc"]:
# Header, add include statement
cg.add_global(cg.RawStatement(f'#include "{basename}"'))
@@ -377,18 +377,18 @@ async def add_arduino_global_workaround():
@coroutine_with_priority(CoroPriority.FINAL)
async def add_includes(includes):
async def add_includes(includes: list[str]) -> None:
# Add includes at the very end, so that the included files can access global variables
for include in includes:
path = CORE.relative_config_path(include)
if os.path.isdir(path):
if path.is_dir():
# Directory, copy tree
for p in walk_files(path):
basename = os.path.relpath(p, os.path.dirname(path))
basename = p.relative_to(path.parent)
include_file(p, basename)
else:
# Copy file
basename = os.path.basename(path)
basename = Path(path.name)
include_file(path, basename)

View File

@@ -100,6 +100,7 @@
#define USE_UART_DEBUGGER
#define USE_UPDATE
#define USE_VALVE
#define USE_ZWAVE_PROXY
// Feature flags which do not work for zephyr
#ifndef USE_ZEPHYR

View File

@@ -90,11 +90,30 @@ class CoroPriority(enum.IntEnum):
# Examples: status_led (80)
STATUS = 80
# Web server infrastructure
# Examples: web_server_base (65)
WEB_SERVER_BASE = 65
# Network portal services
# Examples: captive_portal (64)
CAPTIVE_PORTAL = 64
# Communication protocols and services
# Examples: web_server_base (65), captive_portal (64), wifi (60), ethernet (60),
# mdns (55), ota_updates (54), web_server_ota (52)
# Examples: wifi (60), ethernet (60)
COMMUNICATION = 60
# Network discovery and management services
# Examples: mdns (55)
NETWORK_SERVICES = 55
# OTA update services
# Examples: ota_updates (54)
OTA_UPDATES = 54
# Web-based OTA services
# Examples: web_server_ota (52)
WEB_SERVER_OTA = 52
# Application-level services
# Examples: safe_mode (50)
APPLICATION = 50

View File

@@ -7,7 +7,6 @@ from dataclasses import dataclass
from functools import partial
import json
import logging
from pathlib import Path
import threading
from typing import Any
@@ -108,7 +107,7 @@ class ESPHomeDashboard:
await self.loop.run_in_executor(None, self.load_ignored_devices)
def load_ignored_devices(self) -> None:
storage_path = Path(ignored_devices_storage_path())
storage_path = ignored_devices_storage_path()
try:
with storage_path.open("r", encoding="utf-8") as f_handle:
data = json.load(f_handle)
@@ -117,7 +116,7 @@ class ESPHomeDashboard:
pass
def save_ignored_devices(self) -> None:
storage_path = Path(ignored_devices_storage_path())
storage_path = ignored_devices_storage_path()
with storage_path.open("w", encoding="utf-8") as f_handle:
json.dump(
{"ignored_devices": sorted(self.ignored_devices)}, indent=2, fp=f_handle

View File

@@ -5,7 +5,7 @@ from collections import defaultdict
from dataclasses import dataclass
from functools import lru_cache
import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING, Any
from esphome import const, util
@@ -287,12 +287,12 @@ class DashboardEntries:
for file in util.list_yaml_files([self._config_dir]):
try:
# Prefer the json storage path if it exists
stat = os.stat(ext_storage_path(os.path.basename(file)))
stat = ext_storage_path(file.name).stat()
except OSError:
try:
# Fallback to the yaml file if the storage
# file does not exist or could not be generated
stat = os.stat(file)
stat = file.stat()
except OSError:
# File was deleted, ignore
continue
@@ -329,10 +329,10 @@ class DashboardEntry:
"_to_dict",
)
def __init__(self, path: str, cache_key: DashboardCacheKeyType) -> None:
def __init__(self, path: Path, cache_key: DashboardCacheKeyType) -> None:
"""Initialize the DashboardEntry."""
self.path = path
self.filename: str = os.path.basename(path)
self.filename: str = path.name
self._storage_path = ext_storage_path(self.filename)
self.cache_key = cache_key
self.storage: StorageJSON | None = None
@@ -365,7 +365,7 @@ class DashboardEntry:
"loaded_integrations": sorted(self.loaded_integrations),
"deployed_version": self.update_old,
"current_version": self.update_new,
"path": self.path,
"path": str(self.path),
"comment": self.comment,
"address": self.address,
"web_port": self.web_port,

View File

@@ -27,7 +27,7 @@ class DashboardSettings:
def __init__(self) -> None:
"""Initialize the dashboard settings."""
self.config_dir: str = ""
self.config_dir: Path = None
self.password_hash: str = ""
self.username: str = ""
self.using_password: bool = False
@@ -45,10 +45,10 @@ class DashboardSettings:
self.using_password = bool(password)
if self.using_password:
self.password_hash = password_hash(password)
self.config_dir = args.configuration
self.absolute_config_dir = Path(self.config_dir).resolve()
self.config_dir = Path(args.configuration)
self.absolute_config_dir = self.config_dir.resolve()
self.verbose = args.verbose
CORE.config_path = os.path.join(self.config_dir, ".")
CORE.config_path = self.config_dir / "."
@property
def relative_url(self) -> str:
@@ -81,9 +81,9 @@ class DashboardSettings:
# Compare password in constant running time (to prevent timing attacks)
return hmac.compare_digest(self.password_hash, password_hash(password))
def rel_path(self, *args: Any) -> str:
def rel_path(self, *args: Any) -> Path:
"""Return a path relative to the ESPHome config folder."""
joined_path = os.path.join(self.config_dir, *args)
joined_path = self.config_dir / Path(*args)
# Raises ValueError if not relative to ESPHome config folder
Path(joined_path).resolve().relative_to(self.absolute_config_dir)
joined_path.resolve().relative_to(self.absolute_config_dir)
return joined_path

View File

@@ -1,63 +0,0 @@
import logging
import os
from pathlib import Path
import tempfile
_LOGGER = logging.getLogger(__name__)
def write_utf8_file(
filename: Path,
utf8_str: str,
private: bool = False,
) -> None:
"""Write a file and rename it into place.
Writes all or nothing.
"""
write_file(filename, utf8_str.encode("utf-8"), private)
# from https://github.com/home-assistant/core/blob/dev/homeassistant/util/file.py
def write_file(
filename: Path,
utf8_data: bytes,
private: bool = False,
) -> None:
"""Write a file and rename it into place.
Writes all or nothing.
"""
tmp_filename = ""
missing_fchmod = False
try:
# Modern versions of Python tempfile create this file with mode 0o600
with tempfile.NamedTemporaryFile(
mode="wb", dir=os.path.dirname(filename), delete=False
) as fdesc:
fdesc.write(utf8_data)
tmp_filename = fdesc.name
if not private:
try:
os.fchmod(fdesc.fileno(), 0o644)
except AttributeError:
# os.fchmod is not available on Windows
missing_fchmod = True
os.replace(tmp_filename, filename)
if missing_fchmod:
os.chmod(filename, 0o644)
finally:
if os.path.exists(tmp_filename):
try:
os.remove(tmp_filename)
except OSError as err:
# If we are cleaning up then something else went wrong, so
# we should suppress likely follow-on errors in the cleanup
_LOGGER.error(
"File replacement cleanup failed for %s while saving %s: %s",
tmp_filename,
filename,
err,
)

View File

@@ -49,10 +49,10 @@ from esphome.storage_json import (
from esphome.util import get_serial_ports, shlex_quote
from esphome.yaml_util import FastestAvailableSafeLoader
from ..helpers import write_file
from .const import DASHBOARD_COMMAND
from .core import DASHBOARD, ESPHomeDashboard
from .entries import UNKNOWN_STATE, DashboardEntry, entry_state_to_bool
from .util.file import write_file
from .util.subprocess import async_run_system_command
from .util.text import friendly_name_slugify
@@ -581,7 +581,7 @@ class WizardRequestHandler(BaseHandler):
destination = settings.rel_path(filename)
# Check if destination file already exists
if os.path.exists(destination):
if destination.exists():
self.set_status(409) # Conflict status code
self.set_header("content-type", "application/json")
self.write(
@@ -798,10 +798,9 @@ class DownloadBinaryRequestHandler(BaseHandler):
"download",
f"{storage_json.name}-{file_name}",
)
path = os.path.dirname(storage_json.firmware_bin_path)
path = os.path.join(path, file_name)
path = storage_json.firmware_bin_path.with_name(file_name)
if not Path(path).is_file():
if not path.is_file():
args = ["esphome", "idedata", settings.rel_path(configuration)]
rc, stdout, _ = await async_run_system_command(args)
@@ -1016,7 +1015,7 @@ class EditRequestHandler(BaseHandler):
return
filename = settings.rel_path(configuration)
if Path(filename).resolve().parent != settings.absolute_config_dir:
if filename.resolve().parent != settings.absolute_config_dir:
self.send_error(404)
return
@@ -1039,10 +1038,6 @@ class EditRequestHandler(BaseHandler):
self.set_status(404)
return None
def _write_file(self, filename: str, content: bytes) -> None:
"""Write a file with the given content."""
write_file(filename, content)
@authenticated
@bind_config
async def post(self, configuration: str | None = None) -> None:
@@ -1052,12 +1047,12 @@ class EditRequestHandler(BaseHandler):
return
filename = settings.rel_path(configuration)
if Path(filename).resolve().parent != settings.absolute_config_dir:
if filename.resolve().parent != settings.absolute_config_dir:
self.send_error(404)
return
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._write_file, filename, self.request.body)
await loop.run_in_executor(None, write_file, filename, self.request.body)
# Ensure the StorageJSON is updated as well
DASHBOARD.entries.async_schedule_storage_json_update(filename)
self.set_status(200)
@@ -1072,7 +1067,7 @@ class ArchiveRequestHandler(BaseHandler):
archive_path = archive_storage_path()
mkdir_p(archive_path)
shutil.move(config_file, os.path.join(archive_path, configuration))
shutil.move(config_file, archive_path / configuration)
storage_json = StorageJSON.load(storage_path)
if storage_json is not None and storage_json.build_path:
@@ -1086,7 +1081,7 @@ class UnArchiveRequestHandler(BaseHandler):
def post(self, configuration: str | None = None) -> None:
config_file = settings.rel_path(configuration)
archive_path = archive_storage_path()
shutil.move(os.path.join(archive_path, configuration), config_file)
shutil.move(archive_path / configuration, config_file)
class LoginHandler(BaseHandler):
@@ -1173,7 +1168,7 @@ class SecretKeysRequestHandler(BaseHandler):
for secret_filename in const.SECRETS_FILES:
relative_filename = settings.rel_path(secret_filename)
if os.path.isfile(relative_filename):
if relative_filename.is_file():
filename = relative_filename
break
@@ -1206,16 +1201,17 @@ class JsonConfigRequestHandler(BaseHandler):
@bind_config
async def get(self, configuration: str | None = None) -> None:
filename = settings.rel_path(configuration)
if not os.path.isfile(filename):
if not filename.is_file():
self.send_error(404)
return
args = ["esphome", "config", filename, "--show-secrets"]
args = ["esphome", "config", str(filename), "--show-secrets"]
rc, stdout, _ = await async_run_system_command(args)
rc, stdout, stderr = await async_run_system_command(args)
if rc != 0:
self.send_error(422)
self.set_status(422)
self.write(stderr)
return
data = yaml.load(stdout, Loader=SafeLoaderIgnoreUnknown)
@@ -1224,7 +1220,7 @@ class JsonConfigRequestHandler(BaseHandler):
self.finish()
def get_base_frontend_path() -> str:
def get_base_frontend_path() -> Path:
if ENV_DEV not in os.environ:
import esphome_dashboard
@@ -1235,11 +1231,12 @@ def get_base_frontend_path() -> str:
static_path += "/"
# This path can be relative, so resolve against the root or else templates don't work
return os.path.abspath(os.path.join(os.getcwd(), static_path, "esphome_dashboard"))
path = Path(os.getcwd()) / static_path / "esphome_dashboard"
return path.resolve()
def get_static_path(*args: Iterable[str]) -> str:
return os.path.join(get_base_frontend_path(), "static", *args)
def get_static_path(*args: Iterable[str]) -> Path:
return get_base_frontend_path() / "static" / Path(*args)
@functools.cache
@@ -1256,8 +1253,7 @@ def get_static_file_url(name: str) -> str:
return base.replace("index.js", esphome_dashboard.entrypoint())
path = get_static_path(name)
with open(path, "rb") as f_handle:
hash_ = hashlib.md5(f_handle.read()).hexdigest()[:8]
hash_ = hashlib.md5(path.read_bytes()).hexdigest()[:8]
return f"{base}?hash={hash_}"
@@ -1357,7 +1353,7 @@ def start_web_server(
"""Start the web server listener."""
trash_path = trash_storage_path()
if os.path.exists(trash_path):
if trash_path.is_dir() and trash_path.exists():
_LOGGER.info("Renaming 'trash' folder to 'archive'")
archive_path = archive_storage_path()
shutil.move(trash_path, archive_path)

View File

@@ -4,6 +4,7 @@ import gzip
import hashlib
import io
import logging
from pathlib import Path
import random
import socket
import sys
@@ -191,7 +192,7 @@ def send_check(sock, data, msg):
def perform_ota(
sock: socket.socket, password: str, file_handle: io.IOBase, filename: str
sock: socket.socket, password: str, file_handle: io.IOBase, filename: Path
) -> None:
file_contents = file_handle.read()
file_size = len(file_contents)
@@ -309,7 +310,7 @@ def perform_ota(
def run_ota_impl_(
remote_host: str | list[str], remote_port: int, password: str, filename: str
remote_host: str | list[str], remote_port: int, password: str, filename: Path
) -> tuple[int, str | None]:
from esphome.core import CORE
@@ -360,7 +361,7 @@ def run_ota_impl_(
def run_ota(
remote_host: str | list[str], remote_port: int, password: str, filename: str
remote_host: str | list[str], remote_port: int, password: str, filename: Path
) -> tuple[int, str | None]:
try:
return run_ota_impl_(remote_host, remote_port, password, filename)

View File

@@ -2,7 +2,6 @@ from __future__ import annotations
from datetime import datetime
import logging
import os
from pathlib import Path
import requests
@@ -23,11 +22,11 @@ CONTENT_DISPOSITION = "content-disposition"
TEMP_DIR = "temp"
def has_remote_file_changed(url, local_file_path):
if os.path.exists(local_file_path):
def has_remote_file_changed(url: str, local_file_path: Path) -> bool:
if local_file_path.exists():
_LOGGER.debug("has_remote_file_changed: File exists at %s", local_file_path)
try:
local_modification_time = os.path.getmtime(local_file_path)
local_modification_time = local_file_path.stat().st_mtime
local_modification_time_str = datetime.utcfromtimestamp(
local_modification_time
).strftime("%a, %d %b %Y %H:%M:%S GMT")
@@ -65,9 +64,9 @@ def has_remote_file_changed(url, local_file_path):
return True
def is_file_recent(file_path: str, refresh: TimePeriodSeconds) -> bool:
if os.path.exists(file_path):
creation_time = os.path.getctime(file_path)
def is_file_recent(file_path: Path, refresh: TimePeriodSeconds) -> bool:
if file_path.exists():
creation_time = file_path.stat().st_ctime
current_time = datetime.now().timestamp()
return current_time - creation_time <= refresh.total_seconds
return False

View File

@@ -1,6 +1,5 @@
from __future__ import annotations
import codecs
from contextlib import suppress
import ipaddress
import logging
@@ -8,6 +7,7 @@ import os
from pathlib import Path
import platform
import re
import shutil
import tempfile
from typing import TYPE_CHECKING
from urllib.parse import urlparse
@@ -140,16 +140,16 @@ def run_system_command(*args):
return rc, stdout, stderr
def mkdir_p(path):
def mkdir_p(path: Path):
if not path:
# Empty path - means create current dir
return
try:
os.makedirs(path)
path.mkdir(parents=True, exist_ok=True)
except OSError as err:
import errno
if err.errno == errno.EEXIST and os.path.isdir(path):
if err.errno == errno.EEXIST and path.is_dir():
pass
else:
from esphome.core import EsphomeError
@@ -331,16 +331,15 @@ def is_ha_addon():
return get_bool_env("ESPHOME_IS_HA_ADDON")
def walk_files(path):
def walk_files(path: Path):
for root, _, files in os.walk(path):
for name in files:
yield os.path.join(root, name)
yield Path(root) / name
def read_file(path):
def read_file(path: Path) -> str:
try:
with codecs.open(path, "r", encoding="utf-8") as f_handle:
return f_handle.read()
return path.read_text(encoding="utf-8")
except OSError as err:
from esphome.core import EsphomeError
@@ -351,13 +350,15 @@ def read_file(path):
raise EsphomeError(f"Error reading file {path}: {err}") from err
def _write_file(path: Path | str, text: str | bytes):
def _write_file(
path: Path,
text: str | bytes,
private: bool = False,
) -> None:
"""Atomically writes `text` to the given path.
Automatically creates all parent directories.
"""
if not isinstance(path, Path):
path = Path(path)
data = text
if isinstance(text, str):
data = text.encode()
@@ -365,42 +366,54 @@ def _write_file(path: Path | str, text: str | bytes):
directory = path.parent
directory.mkdir(exist_ok=True, parents=True)
tmp_path = None
tmp_filename: Path | None = None
missing_fchmod = False
try:
# Modern versions of Python tempfile create this file with mode 0o600
with tempfile.NamedTemporaryFile(
mode="wb", dir=directory, delete=False
) as f_handle:
tmp_path = f_handle.name
f_handle.write(data)
# Newer tempfile implementations create the file with mode 0o600
os.chmod(tmp_path, 0o644)
# If destination exists, will be overwritten
os.replace(tmp_path, path)
tmp_filename = Path(f_handle.name)
if not private:
try:
os.fchmod(f_handle.fileno(), 0o644)
except AttributeError:
# os.fchmod is not available on Windows
missing_fchmod = True
shutil.move(tmp_filename, path)
if missing_fchmod:
path.chmod(0o644)
finally:
if tmp_path is not None and os.path.exists(tmp_path):
if tmp_filename and tmp_filename.exists():
try:
os.remove(tmp_path)
tmp_filename.unlink()
except OSError as err:
_LOGGER.error("Write file cleanup failed: %s", err)
# If we are cleaning up then something else went wrong, so
# we should suppress likely follow-on errors in the cleanup
_LOGGER.error(
"File replacement cleanup failed for %s while saving %s: %s",
tmp_filename,
path,
err,
)
def write_file(path: Path | str, text: str):
def write_file(path: Path, text: str | bytes, private: bool = False) -> None:
try:
_write_file(path, text)
_write_file(path, text, private=private)
except OSError as err:
from esphome.core import EsphomeError
raise EsphomeError(f"Could not write file at {path}") from err
def write_file_if_changed(path: Path | str, text: str) -> bool:
def write_file_if_changed(path: Path, text: str) -> bool:
"""Write text to the given path, but not if the contents match already.
Returns true if the file was changed.
"""
if not isinstance(path, Path):
path = Path(path)
src_content = None
if path.is_file():
src_content = read_file(path)
@@ -410,12 +423,10 @@ def write_file_if_changed(path: Path | str, text: str) -> bool:
return True
def copy_file_if_changed(src: os.PathLike, dst: os.PathLike) -> None:
import shutil
def copy_file_if_changed(src: Path, dst: Path) -> None:
if file_compare(src, dst):
return
mkdir_p(os.path.dirname(dst))
dst.parent.mkdir(parents=True, exist_ok=True)
try:
shutil.copyfile(src, dst)
except OSError as err:
@@ -440,12 +451,12 @@ def list_starts_with(list_, sub):
return len(sub) <= len(list_) and all(list_[i] == x for i, x in enumerate(sub))
def file_compare(path1: os.PathLike, path2: os.PathLike) -> bool:
def file_compare(path1: Path, path2: Path) -> bool:
"""Return True if the files path1 and path2 have the same contents."""
import stat
try:
stat1, stat2 = os.stat(path1), os.stat(path2)
stat1, stat2 = path1.stat(), path2.stat()
except OSError:
# File doesn't exist or another error -> not equal
return False
@@ -462,7 +473,7 @@ def file_compare(path1: os.PathLike, path2: os.PathLike) -> bool:
bufsize = 8 * 1024
# Read files in blocks until a mismatch is found
with open(path1, "rb") as fh1, open(path2, "rb") as fh2:
with path1.open("rb") as fh1, path2.open("rb") as fh2:
while True:
blob1, blob2 = fh1.read(bufsize), fh2.read(bufsize)
if blob1 != blob2:

View File

@@ -19,23 +19,25 @@ def patch_structhash():
# removed/added. This might have unintended consequences, but this improves compile
# times greatly when adding/removing components and a simple clean build solves
# all issues
from os import makedirs
from os.path import getmtime, isdir, join
from platformio.run import cli, helpers
def patched_clean_build_dir(build_dir, *args):
from platformio import fs
from platformio.project.helpers import get_project_dir
platformio_ini = join(get_project_dir(), "platformio.ini")
platformio_ini = Path(get_project_dir()) / "platformio.ini"
build_dir = Path(build_dir)
# if project's config is modified
if isdir(build_dir) and getmtime(platformio_ini) > getmtime(build_dir):
if (
build_dir.is_dir()
and platformio_ini.stat().st_mtime > build_dir.stat().st_mtime
):
fs.rmtree(build_dir)
if not isdir(build_dir):
makedirs(build_dir)
if not build_dir.is_dir():
build_dir.mkdir(parents=True)
helpers.clean_build_dir = patched_clean_build_dir
cli.clean_build_dir = patched_clean_build_dir
@@ -78,9 +80,9 @@ FILTER_PLATFORMIO_LINES = [
def run_platformio_cli(*args, **kwargs) -> str | int:
os.environ["PLATFORMIO_FORCE_COLOR"] = "true"
os.environ["PLATFORMIO_BUILD_DIR"] = os.path.abspath(CORE.relative_pioenvs_path())
os.environ["PLATFORMIO_BUILD_DIR"] = str(CORE.relative_pioenvs_path().absolute())
os.environ.setdefault(
"PLATFORMIO_LIBDEPS_DIR", os.path.abspath(CORE.relative_piolibdeps_path())
"PLATFORMIO_LIBDEPS_DIR", str(CORE.relative_piolibdeps_path().absolute())
)
# Suppress Python syntax warnings from third-party scripts during compilation
os.environ.setdefault("PYTHONWARNINGS", "ignore::SyntaxWarning")
@@ -99,7 +101,7 @@ def run_platformio_cli(*args, **kwargs) -> str | int:
def run_platformio_cli_run(config, verbose, *args, **kwargs) -> str | int:
command = ["run", "-d", CORE.build_path]
command = ["run", "-d", str(CORE.build_path)]
if verbose:
command += ["-v"]
command += list(args)
@@ -140,8 +142,8 @@ def _run_idedata(config):
def _load_idedata(config):
platformio_ini = Path(CORE.relative_build_path("platformio.ini"))
temp_idedata = Path(CORE.relative_internal_path("idedata", f"{CORE.name}.json"))
platformio_ini = CORE.relative_build_path("platformio.ini")
temp_idedata = CORE.relative_internal_path("idedata", f"{CORE.name}.json")
changed = False
if (
@@ -311,7 +313,7 @@ def process_stacktrace(config, line, backtrace_state):
@dataclass
class FlashImage:
path: str
path: Path
offset: str
@@ -320,17 +322,17 @@ class IDEData:
self.raw = raw
@property
def firmware_elf_path(self):
return self.raw["prog_path"]
def firmware_elf_path(self) -> Path:
return Path(self.raw["prog_path"])
@property
def firmware_bin_path(self) -> str:
return str(Path(self.firmware_elf_path).with_suffix(".bin"))
def firmware_bin_path(self) -> Path:
return self.firmware_elf_path.with_suffix(".bin")
@property
def extra_flash_images(self) -> list[FlashImage]:
return [
FlashImage(path=entry["path"], offset=entry["offset"])
FlashImage(path=Path(entry["path"]), offset=entry["offset"])
for entry in self.raw["extra"]["flash_images"]
]

View File

@@ -1,11 +1,11 @@
from __future__ import annotations
import binascii
import codecs
from datetime import datetime
import json
import logging
import os
from pathlib import Path
from esphome import const
from esphome.const import CONF_DISABLED, CONF_MDNS
@@ -16,30 +16,35 @@ from esphome.types import CoreType
_LOGGER = logging.getLogger(__name__)
def storage_path() -> str:
return os.path.join(CORE.data_dir, "storage", f"{CORE.config_filename}.json")
def storage_path() -> Path:
return CORE.data_dir / "storage" / f"{CORE.config_filename}.json"
def ext_storage_path(config_filename: str) -> str:
return os.path.join(CORE.data_dir, "storage", f"{config_filename}.json")
def ext_storage_path(config_filename: str) -> Path:
return CORE.data_dir / "storage" / f"{config_filename}.json"
def esphome_storage_path() -> str:
return os.path.join(CORE.data_dir, "esphome.json")
def esphome_storage_path() -> Path:
return CORE.data_dir / "esphome.json"
def ignored_devices_storage_path() -> str:
return os.path.join(CORE.data_dir, "ignored-devices.json")
def ignored_devices_storage_path() -> Path:
return CORE.data_dir / "ignored-devices.json"
def trash_storage_path() -> str:
def trash_storage_path() -> Path:
return CORE.relative_config_path("trash")
def archive_storage_path() -> str:
def archive_storage_path() -> Path:
return CORE.relative_config_path("archive")
def _to_path_if_not_none(value: str | None) -> Path | None:
"""Convert a string to Path if it's not None."""
return Path(value) if value is not None else None
class StorageJSON:
def __init__(
self,
@@ -52,8 +57,8 @@ class StorageJSON:
address: str,
web_port: int | None,
target_platform: str,
build_path: str | None,
firmware_bin_path: str | None,
build_path: Path | None,
firmware_bin_path: Path | None,
loaded_integrations: set[str],
loaded_platforms: set[str],
no_mdns: bool,
@@ -107,8 +112,8 @@ class StorageJSON:
"address": self.address,
"web_port": self.web_port,
"esp_platform": self.target_platform,
"build_path": self.build_path,
"firmware_bin_path": self.firmware_bin_path,
"build_path": str(self.build_path),
"firmware_bin_path": str(self.firmware_bin_path),
"loaded_integrations": sorted(self.loaded_integrations),
"loaded_platforms": sorted(self.loaded_platforms),
"no_mdns": self.no_mdns,
@@ -176,8 +181,8 @@ class StorageJSON:
)
@staticmethod
def _load_impl(path: str) -> StorageJSON | None:
with codecs.open(path, "r", encoding="utf-8") as f_handle:
def _load_impl(path: Path) -> StorageJSON | None:
with path.open("r", encoding="utf-8") as f_handle:
storage = json.load(f_handle)
storage_version = storage["storage_version"]
name = storage.get("name")
@@ -190,8 +195,8 @@ class StorageJSON:
address = storage.get("address")
web_port = storage.get("web_port")
esp_platform = storage.get("esp_platform")
build_path = storage.get("build_path")
firmware_bin_path = storage.get("firmware_bin_path")
build_path = _to_path_if_not_none(storage.get("build_path"))
firmware_bin_path = _to_path_if_not_none(storage.get("firmware_bin_path"))
loaded_integrations = set(storage.get("loaded_integrations", []))
loaded_platforms = set(storage.get("loaded_platforms", []))
no_mdns = storage.get("no_mdns", False)
@@ -217,7 +222,7 @@ class StorageJSON:
)
@staticmethod
def load(path: str) -> StorageJSON | None:
def load(path: Path) -> StorageJSON | None:
try:
return StorageJSON._load_impl(path)
except Exception: # pylint: disable=broad-except
@@ -268,7 +273,7 @@ class EsphomeStorageJSON:
@staticmethod
def _load_impl(path: str) -> EsphomeStorageJSON | None:
with codecs.open(path, "r", encoding="utf-8") as f_handle:
with Path(path).open("r", encoding="utf-8") as f_handle:
storage = json.load(f_handle)
storage_version = storage["storage_version"]
cookie_secret = storage.get("cookie_secret")

View File

@@ -1,7 +1,6 @@
import collections
import io
import logging
import os
from pathlib import Path
import re
import subprocess
@@ -86,7 +85,10 @@ def safe_input(prompt=""):
return input()
def shlex_quote(s):
def shlex_quote(s: str | Path) -> str:
# Convert Path objects to strings
if isinstance(s, Path):
s = str(s)
if not s:
return "''"
if re.search(r"[^\w@%+=:,./-]", s) is None:
@@ -272,25 +274,28 @@ class OrderedDict(collections.OrderedDict):
return dict(self).__repr__()
def list_yaml_files(configs: list[str]) -> list[str]:
files: list[str] = []
def list_yaml_files(configs: list[str | Path]) -> list[Path]:
files: list[Path] = []
for config in configs:
if os.path.isfile(config):
config = Path(config)
if not config.exists():
raise FileNotFoundError(f"Config path '{config}' does not exist!")
if config.is_file():
files.append(config)
else:
files.extend(os.path.join(config, p) for p in os.listdir(config))
files.extend(config.glob("*"))
files = filter_yaml_files(files)
return sorted(files)
def filter_yaml_files(files: list[str]) -> list[str]:
def filter_yaml_files(files: list[Path]) -> list[Path]:
return [
f
for f in files
if (
os.path.splitext(f)[1] in (".yaml", ".yml")
and os.path.basename(f) not in ("secrets.yaml", "secrets.yml")
and not os.path.basename(f).startswith(".")
f.suffix in (".yaml", ".yml")
and f.name not in ("secrets.yaml", "secrets.yml")
and not f.name.startswith(".")
)
]

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
from io import StringIO
import json
import os
from pathlib import Path
from typing import Any
from esphome.config import Config, _format_vol_invalid, validate_config
@@ -67,24 +67,24 @@ def _read_file_content_from_json_on_stdin() -> str:
return data["content"]
def _print_file_read_event(path: str) -> None:
def _print_file_read_event(path: Path) -> None:
"""Print a file read event."""
print(
json.dumps(
{
"type": "read_file",
"path": path,
"path": str(path),
}
)
)
def _request_and_get_stream_on_stdin(fname: str) -> StringIO:
def _request_and_get_stream_on_stdin(fname: Path) -> StringIO:
_print_file_read_event(fname)
return StringIO(_read_file_content_from_json_on_stdin())
def _vscode_loader(fname: str) -> dict[str, Any]:
def _vscode_loader(fname: Path) -> dict[str, Any]:
raw_yaml_stream = _request_and_get_stream_on_stdin(fname)
# it is required to set the name on StringIO so document on start_mark
# is set properly. Otherwise it is initialized with "<file>"
@@ -92,7 +92,7 @@ def _vscode_loader(fname: str) -> dict[str, Any]:
return parse_yaml(fname, raw_yaml_stream, _vscode_loader)
def _ace_loader(fname: str) -> dict[str, Any]:
def _ace_loader(fname: Path) -> dict[str, Any]:
raw_yaml_stream = _request_and_get_stream_on_stdin(fname)
return parse_yaml(fname, raw_yaml_stream)
@@ -120,10 +120,10 @@ def read_config(args):
return
CORE.vscode = True
if args.ace: # Running from ESPHome Compiler dashboard, not vscode
CORE.config_path = os.path.join(args.configuration, data["file"])
CORE.config_path = Path(args.configuration) / data["file"]
loader = _ace_loader
else:
CORE.config_path = data["file"]
CORE.config_path = Path(data["file"])
loader = _vscode_loader
file_name = CORE.config_path

View File

@@ -1,4 +1,4 @@
import os
from pathlib import Path
import random
import string
from typing import Literal, NotRequired, TypedDict, Unpack
@@ -213,7 +213,7 @@ class WizardWriteKwargs(TypedDict):
file_text: NotRequired[str]
def wizard_write(path: str, **kwargs: Unpack[WizardWriteKwargs]) -> bool:
def wizard_write(path: Path, **kwargs: Unpack[WizardWriteKwargs]) -> bool:
from esphome.components.bk72xx import boards as bk72xx_boards
from esphome.components.esp32 import boards as esp32_boards
from esphome.components.esp8266 import boards as esp8266_boards
@@ -256,13 +256,13 @@ def wizard_write(path: str, **kwargs: Unpack[WizardWriteKwargs]) -> bool:
file_text = wizard_file(**kwargs)
# Check if file already exists to prevent overwriting
if os.path.exists(path) and os.path.isfile(path):
if path.exists() and path.is_file():
safe_print(color(AnsiFore.RED, f'The file "{path}" already exists.'))
return False
write_file(path, file_text)
storage = StorageJSON.from_wizard(name, name, f"{name}.local", hardware)
storage_path = ext_storage_path(os.path.basename(path))
storage_path = ext_storage_path(path.name)
storage.save(storage_path)
return True
@@ -301,7 +301,7 @@ def strip_accents(value: str) -> str:
)
def wizard(path: str) -> int:
def wizard(path: Path) -> int:
from esphome.components.bk72xx import boards as bk72xx_boards
from esphome.components.esp32 import boards as esp32_boards
from esphome.components.esp8266 import boards as esp8266_boards
@@ -309,14 +309,14 @@ def wizard(path: str) -> int:
from esphome.components.rp2040 import boards as rp2040_boards
from esphome.components.rtl87xx import boards as rtl87xx_boards
if not path.endswith(".yaml") and not path.endswith(".yml"):
if path.suffix not in (".yaml", ".yml"):
safe_print(
f"Please make your configuration file {color(AnsiFore.CYAN, path)} have the extension .yaml or .yml"
f"Please make your configuration file {color(AnsiFore.CYAN, str(path))} have the extension .yaml or .yml"
)
return 1
if os.path.exists(path):
if path.exists():
safe_print(
f"Uh oh, it seems like {color(AnsiFore.CYAN, path)} already exists, please delete that file first or chose another configuration file."
f"Uh oh, it seems like {color(AnsiFore.CYAN, str(path))} already exists, please delete that file first or chose another configuration file."
)
return 2
@@ -549,7 +549,7 @@ def wizard(path: str) -> int:
safe_print()
safe_print(
color(AnsiFore.CYAN, "DONE! I've now written a new configuration file to ")
+ color(AnsiFore.BOLD_CYAN, path)
+ color(AnsiFore.BOLD_CYAN, str(path))
)
safe_print()
safe_print("Next steps:")

View File

@@ -1,6 +1,5 @@
import importlib
import logging
import os
from pathlib import Path
import re
@@ -266,7 +265,7 @@ def generate_version_h():
def write_cpp(code_s):
path = CORE.relative_src_path("main.cpp")
if os.path.isfile(path):
if path.is_file():
text = read_file(path)
code_format = find_begin_end(
text, CPP_AUTO_GENERATE_BEGIN, CPP_AUTO_GENERATE_END
@@ -292,28 +291,28 @@ def write_cpp(code_s):
def clean_cmake_cache():
pioenvs = CORE.relative_pioenvs_path()
if os.path.isdir(pioenvs):
pioenvs_cmake_path = CORE.relative_pioenvs_path(CORE.name, "CMakeCache.txt")
if os.path.isfile(pioenvs_cmake_path):
if pioenvs.is_dir():
pioenvs_cmake_path = pioenvs / CORE.name / "CMakeCache.txt"
if pioenvs_cmake_path.is_file():
_LOGGER.info("Deleting %s", pioenvs_cmake_path)
os.remove(pioenvs_cmake_path)
pioenvs_cmake_path.unlink()
def clean_build():
import shutil
pioenvs = CORE.relative_pioenvs_path()
if os.path.isdir(pioenvs):
if pioenvs.is_dir():
_LOGGER.info("Deleting %s", pioenvs)
shutil.rmtree(pioenvs)
piolibdeps = CORE.relative_piolibdeps_path()
if os.path.isdir(piolibdeps):
if piolibdeps.is_dir():
_LOGGER.info("Deleting %s", piolibdeps)
shutil.rmtree(piolibdeps)
dependencies_lock = CORE.relative_build_path("dependencies.lock")
if os.path.isfile(dependencies_lock):
if dependencies_lock.is_file():
_LOGGER.info("Deleting %s", dependencies_lock)
os.remove(dependencies_lock)
dependencies_lock.unlink()
# Clean PlatformIO cache to resolve CMake compiler detection issues
# This helps when toolchain paths change or get corrupted
@@ -324,9 +323,11 @@ def clean_build():
pass
else:
cache_dir = get_project_cache_dir()
if cache_dir and cache_dir.strip() and os.path.isdir(cache_dir):
_LOGGER.info("Deleting PlatformIO cache %s", cache_dir)
shutil.rmtree(cache_dir)
if cache_dir and cache_dir.strip():
cache_path = Path(cache_dir)
if cache_path.is_dir():
_LOGGER.info("Deleting PlatformIO cache %s", cache_dir)
shutil.rmtree(cache_dir)
GITIGNORE_CONTENT = """# Gitignore settings for ESPHome
@@ -339,6 +340,5 @@ GITIGNORE_CONTENT = """# Gitignore settings for ESPHome
def write_gitignore():
path = CORE.relative_config_path(".gitignore")
if not os.path.isfile(path):
with open(file=path, mode="w", encoding="utf-8") as f:
f.write(GITIGNORE_CONTENT)
if not path.is_file():
path.write_text(GITIGNORE_CONTENT, encoding="utf-8")

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
from collections.abc import Callable
import fnmatch
import functools
import inspect
from io import BytesIO, TextIOBase, TextIOWrapper
@@ -9,6 +8,7 @@ from ipaddress import _BaseAddress, _BaseNetwork
import logging
import math
import os
from pathlib import Path
from typing import Any
import uuid
@@ -109,7 +109,9 @@ def _add_data_ref(fn):
class ESPHomeLoaderMixin:
"""Loader class that keeps track of line numbers."""
def __init__(self, name: str, yaml_loader: Callable[[str], dict[str, Any]]) -> None:
def __init__(
self, name: Path, yaml_loader: Callable[[Path], dict[str, Any]]
) -> None:
"""Initialize the loader."""
self.name = name
self.yaml_loader = yaml_loader
@@ -254,12 +256,8 @@ class ESPHomeLoaderMixin:
f"Environment variable '{node.value}' not defined", node.start_mark
)
@property
def _directory(self) -> str:
return os.path.dirname(self.name)
def _rel_path(self, *args: str) -> str:
return os.path.join(self._directory, *args)
def _rel_path(self, *args: str) -> Path:
return self.name.parent / Path(*args)
@_add_data_ref
def construct_secret(self, node: yaml.Node) -> str:
@@ -269,8 +267,8 @@ class ESPHomeLoaderMixin:
if self.name == CORE.config_path:
raise e
try:
main_config_dir = os.path.dirname(CORE.config_path)
main_secret_yml = os.path.join(main_config_dir, SECRET_YAML)
main_config_dir = CORE.config_path.parent
main_secret_yml = main_config_dir / SECRET_YAML
secrets = self.yaml_loader(main_secret_yml)
except EsphomeError as er:
raise EsphomeError(f"{e}\n{er}") from er
@@ -329,7 +327,7 @@ class ESPHomeLoaderMixin:
files = filter_yaml_files(_find_files(self._rel_path(node.value), "*.yaml"))
mapping = OrderedDict()
for fname in files:
filename = os.path.splitext(os.path.basename(fname))[0]
filename = fname.stem
mapping[filename] = self.yaml_loader(fname)
return mapping
@@ -369,8 +367,8 @@ class ESPHomeLoader(ESPHomeLoaderMixin, FastestAvailableSafeLoader):
def __init__(
self,
stream: TextIOBase | BytesIO,
name: str,
yaml_loader: Callable[[str], dict[str, Any]],
name: Path,
yaml_loader: Callable[[Path], dict[str, Any]],
) -> None:
FastestAvailableSafeLoader.__init__(self, stream)
ESPHomeLoaderMixin.__init__(self, name, yaml_loader)
@@ -382,8 +380,8 @@ class ESPHomePurePythonLoader(ESPHomeLoaderMixin, PurePythonLoader):
def __init__(
self,
stream: TextIOBase | BytesIO,
name: str,
yaml_loader: Callable[[str], dict[str, Any]],
name: Path,
yaml_loader: Callable[[Path], dict[str, Any]],
) -> None:
PurePythonLoader.__init__(self, stream)
ESPHomeLoaderMixin.__init__(self, name, yaml_loader)
@@ -414,24 +412,24 @@ for _loader in (ESPHomeLoader, ESPHomePurePythonLoader):
_loader.add_constructor("!remove", _loader.construct_remove)
def load_yaml(fname: str, clear_secrets: bool = True) -> Any:
def load_yaml(fname: Path, clear_secrets: bool = True) -> Any:
if clear_secrets:
_SECRET_VALUES.clear()
_SECRET_CACHE.clear()
return _load_yaml_internal(fname)
def _load_yaml_internal(fname: str) -> Any:
def _load_yaml_internal(fname: Path) -> Any:
"""Load a YAML file."""
try:
with open(fname, encoding="utf-8") as f_handle:
with fname.open(encoding="utf-8") as f_handle:
return parse_yaml(fname, f_handle)
except (UnicodeDecodeError, OSError) as err:
raise EsphomeError(f"Error reading file {fname}: {err}") from err
def parse_yaml(
file_name: str, file_handle: TextIOWrapper, yaml_loader=_load_yaml_internal
file_name: Path, file_handle: TextIOWrapper, yaml_loader=_load_yaml_internal
) -> Any:
"""Parse a YAML file."""
try:
@@ -483,9 +481,9 @@ def substitute_vars(config, vars):
def _load_yaml_internal_with_type(
loader_type: type[ESPHomeLoader] | type[ESPHomePurePythonLoader],
fname: str,
fname: Path,
content: TextIOWrapper,
yaml_loader: Any,
yaml_loader: Callable[[Path], dict[str, Any]],
) -> Any:
"""Load a YAML file."""
loader = loader_type(content, fname, yaml_loader)
@@ -512,13 +510,14 @@ def _is_file_valid(name: str) -> bool:
return not name.startswith(".")
def _find_files(directory, pattern):
def _find_files(directory: Path, pattern):
"""Recursively load files in a directory."""
for root, dirs, files in os.walk(directory, topdown=True):
for root, dirs, files in os.walk(directory):
dirs[:] = [d for d in dirs if _is_file_valid(d)]
for basename in files:
if _is_file_valid(basename) and fnmatch.fnmatch(basename, pattern):
filename = os.path.join(root, basename)
for f in files:
filename = Path(f)
if _is_file_valid(f) and filename.match(pattern):
filename = Path(root) / filename
yield filename
@@ -627,3 +626,4 @@ ESPHomeDumper.add_multi_representer(TimePeriod, ESPHomeDumper.represent_stringif
ESPHomeDumper.add_multi_representer(Lambda, ESPHomeDumper.represent_lambda)
ESPHomeDumper.add_multi_representer(core.ID, ESPHomeDumper.represent_id)
ESPHomeDumper.add_multi_representer(uuid.UUID, ESPHomeDumper.represent_stringify)
ESPHomeDumper.add_multi_representer(Path, ESPHomeDumper.represent_stringify)