mirror of
https://github.com/esphome/esphome.git
synced 2026-02-11 18:21:52 +00:00
Compare commits
6 Commits
api-optimi
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c9c125aa8d | ||
|
|
8d62a6a88a | ||
|
|
0ec02d4886 | ||
|
|
1411868a0b | ||
|
|
069c90ec4a | ||
|
|
930a186168 |
@@ -1921,10 +1921,6 @@ bool APIConnection::schedule_batch_() {
|
||||
}
|
||||
|
||||
void APIConnection::process_batch_() {
|
||||
// Ensure MessageInfo remains trivially destructible for our placement new approach
|
||||
static_assert(std::is_trivially_destructible<MessageInfo>::value,
|
||||
"MessageInfo must remain trivially destructible with this placement-new approach");
|
||||
|
||||
if (this->deferred_batch_.empty()) {
|
||||
this->flags_.batch_scheduled = false;
|
||||
return;
|
||||
@@ -1949,6 +1945,10 @@ void APIConnection::process_batch_() {
|
||||
for (size_t i = 0; i < num_items; i++) {
|
||||
total_estimated_size += this->deferred_batch_[i].estimated_size;
|
||||
}
|
||||
// Clamp to MAX_BATCH_PACKET_SIZE — we won't send more than that per batch
|
||||
if (total_estimated_size > MAX_BATCH_PACKET_SIZE) {
|
||||
total_estimated_size = MAX_BATCH_PACKET_SIZE;
|
||||
}
|
||||
|
||||
this->prepare_first_message_buffer(shared_buf, header_padding, total_estimated_size);
|
||||
|
||||
@@ -1972,7 +1972,20 @@ void APIConnection::process_batch_() {
|
||||
return;
|
||||
}
|
||||
|
||||
size_t messages_to_process = std::min(num_items, MAX_MESSAGES_PER_BATCH);
|
||||
// Multi-message path — heavy stack frame isolated in separate noinline function
|
||||
this->process_batch_multi_(shared_buf, num_items, header_padding, footer_size);
|
||||
}
|
||||
|
||||
// Separated from process_batch_() so the single-message fast path gets a minimal
|
||||
// stack frame without the MAX_MESSAGES_PER_BATCH * sizeof(MessageInfo) array.
|
||||
void APIConnection::process_batch_multi_(std::vector<uint8_t> &shared_buf, size_t num_items, uint8_t header_padding,
|
||||
uint8_t footer_size) {
|
||||
// Ensure MessageInfo remains trivially destructible for our placement new approach
|
||||
static_assert(std::is_trivially_destructible<MessageInfo>::value,
|
||||
"MessageInfo must remain trivially destructible with this placement-new approach");
|
||||
|
||||
const size_t messages_to_process = std::min(num_items, MAX_MESSAGES_PER_BATCH);
|
||||
const uint8_t frame_overhead = header_padding + footer_size;
|
||||
|
||||
// Stack-allocated array for message info
|
||||
alignas(MessageInfo) char message_info_storage[MAX_MESSAGES_PER_BATCH * sizeof(MessageInfo)];
|
||||
@@ -1999,7 +2012,7 @@ void APIConnection::process_batch_() {
|
||||
|
||||
// Message was encoded successfully
|
||||
// payload_size is header_padding + actual payload size + footer_size
|
||||
uint16_t proto_payload_size = payload_size - header_padding - footer_size;
|
||||
uint16_t proto_payload_size = payload_size - frame_overhead;
|
||||
// Use placement new to construct MessageInfo in pre-allocated stack array
|
||||
// This avoids default-constructing all MAX_MESSAGES_PER_BATCH elements
|
||||
// Explicit destruction is not needed because MessageInfo is trivially destructible,
|
||||
@@ -2015,42 +2028,38 @@ void APIConnection::process_batch_() {
|
||||
current_offset = shared_buf.size() + footer_size;
|
||||
}
|
||||
|
||||
if (items_processed == 0) {
|
||||
this->deferred_batch_.clear();
|
||||
return;
|
||||
}
|
||||
if (items_processed > 0) {
|
||||
// Add footer space for the last message (for Noise protocol MAC)
|
||||
if (footer_size > 0) {
|
||||
shared_buf.resize(shared_buf.size() + footer_size);
|
||||
}
|
||||
|
||||
// Add footer space for the last message (for Noise protocol MAC)
|
||||
if (footer_size > 0) {
|
||||
shared_buf.resize(shared_buf.size() + footer_size);
|
||||
}
|
||||
|
||||
// Send all collected messages
|
||||
APIError err = this->helper_->write_protobuf_messages(ProtoWriteBuffer{&shared_buf},
|
||||
std::span<const MessageInfo>(message_info, items_processed));
|
||||
if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
|
||||
this->fatal_error_with_log_(LOG_STR("Batch write failed"), err);
|
||||
}
|
||||
// Send all collected messages
|
||||
APIError err = this->helper_->write_protobuf_messages(ProtoWriteBuffer{&shared_buf},
|
||||
std::span<const MessageInfo>(message_info, items_processed));
|
||||
if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
|
||||
this->fatal_error_with_log_(LOG_STR("Batch write failed"), err);
|
||||
}
|
||||
|
||||
#ifdef HAS_PROTO_MESSAGE_DUMP
|
||||
// Log messages after send attempt for VV debugging
|
||||
// It's safe to use the buffer for logging at this point regardless of send result
|
||||
for (size_t i = 0; i < items_processed; i++) {
|
||||
const auto &item = this->deferred_batch_[i];
|
||||
this->log_batch_item_(item);
|
||||
}
|
||||
// Log messages after send attempt for VV debugging
|
||||
// It's safe to use the buffer for logging at this point regardless of send result
|
||||
for (size_t i = 0; i < items_processed; i++) {
|
||||
const auto &item = this->deferred_batch_[i];
|
||||
this->log_batch_item_(item);
|
||||
}
|
||||
#endif
|
||||
|
||||
// Handle remaining items more efficiently
|
||||
if (items_processed < this->deferred_batch_.size()) {
|
||||
// Remove processed items from the beginning
|
||||
this->deferred_batch_.remove_front(items_processed);
|
||||
// Reschedule for remaining items
|
||||
this->schedule_batch_();
|
||||
} else {
|
||||
// All items processed
|
||||
this->clear_batch_();
|
||||
// Partial batch — remove processed items and reschedule
|
||||
if (items_processed < this->deferred_batch_.size()) {
|
||||
this->deferred_batch_.remove_front(items_processed);
|
||||
this->schedule_batch_();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// All items processed (or none could be processed)
|
||||
this->clear_batch_();
|
||||
}
|
||||
|
||||
// Dispatch message encoding based on message_type
|
||||
|
||||
@@ -548,8 +548,8 @@ class APIConnection final : public APIServerConnectionBase {
|
||||
batch_start_time = 0;
|
||||
}
|
||||
|
||||
// Remove processed items from the front
|
||||
void remove_front(size_t count) { items.erase(items.begin(), items.begin() + count); }
|
||||
// Remove processed items from the front — noinline to keep memmove out of warm callers
|
||||
void remove_front(size_t count) __attribute__((noinline)) { items.erase(items.begin(), items.begin() + count); }
|
||||
|
||||
bool empty() const { return items.empty(); }
|
||||
size_t size() const { return items.size(); }
|
||||
@@ -621,6 +621,8 @@ class APIConnection final : public APIServerConnectionBase {
|
||||
|
||||
bool schedule_batch_();
|
||||
void process_batch_();
|
||||
void process_batch_multi_(std::vector<uint8_t> &shared_buf, size_t num_items, uint8_t header_padding,
|
||||
uint8_t footer_size) __attribute__((noinline));
|
||||
void clear_batch_() {
|
||||
this->deferred_batch_.clear();
|
||||
this->flags_.batch_scheduled = false;
|
||||
|
||||
@@ -11,6 +11,7 @@ from esphome.const import (
|
||||
CONF_ICON,
|
||||
CONF_ID,
|
||||
CONF_MQTT_ID,
|
||||
CONF_MQTT_JSON_STATE_PAYLOAD,
|
||||
CONF_ON_IDLE,
|
||||
CONF_ON_OPEN,
|
||||
CONF_POSITION,
|
||||
@@ -119,6 +120,9 @@ _COVER_SCHEMA = (
|
||||
.extend(
|
||||
{
|
||||
cv.OnlyWith(CONF_MQTT_ID, "mqtt"): cv.declare_id(mqtt.MQTTCoverComponent),
|
||||
cv.Optional(CONF_MQTT_JSON_STATE_PAYLOAD): cv.All(
|
||||
cv.requires_component("mqtt"), cv.boolean
|
||||
),
|
||||
cv.Optional(CONF_DEVICE_CLASS): cv.one_of(*DEVICE_CLASSES, lower=True),
|
||||
cv.Optional(CONF_POSITION_COMMAND_TOPIC): cv.All(
|
||||
cv.requires_component("mqtt"), cv.subscribe_topic
|
||||
@@ -148,6 +152,22 @@ _COVER_SCHEMA = (
|
||||
_COVER_SCHEMA.add_extra(entity_duplicate_validator("cover"))
|
||||
|
||||
|
||||
def _validate_mqtt_state_topics(config):
|
||||
if config.get(CONF_MQTT_JSON_STATE_PAYLOAD):
|
||||
if CONF_POSITION_STATE_TOPIC in config:
|
||||
raise cv.Invalid(
|
||||
f"'{CONF_POSITION_STATE_TOPIC}' cannot be used with '{CONF_MQTT_JSON_STATE_PAYLOAD}: true'"
|
||||
)
|
||||
if CONF_TILT_STATE_TOPIC in config:
|
||||
raise cv.Invalid(
|
||||
f"'{CONF_TILT_STATE_TOPIC}' cannot be used with '{CONF_MQTT_JSON_STATE_PAYLOAD}: true'"
|
||||
)
|
||||
return config
|
||||
|
||||
|
||||
_COVER_SCHEMA.add_extra(_validate_mqtt_state_topics)
|
||||
|
||||
|
||||
def cover_schema(
|
||||
class_: MockObjClass,
|
||||
*,
|
||||
@@ -195,6 +215,9 @@ async def setup_cover_core_(var, config):
|
||||
position_command_topic := config.get(CONF_POSITION_COMMAND_TOPIC)
|
||||
) is not None:
|
||||
cg.add(mqtt_.set_custom_position_command_topic(position_command_topic))
|
||||
if config.get(CONF_MQTT_JSON_STATE_PAYLOAD):
|
||||
cg.add_define("USE_MQTT_COVER_JSON")
|
||||
cg.add(mqtt_.set_use_json_format(True))
|
||||
if (tilt_state_topic := config.get(CONF_TILT_STATE_TOPIC)) is not None:
|
||||
cg.add(mqtt_.set_custom_tilt_state_topic(tilt_state_topic))
|
||||
if (tilt_command_topic := config.get(CONF_TILT_COMMAND_TOPIC)) is not None:
|
||||
|
||||
@@ -124,14 +124,11 @@ class ESP32Preferences : public ESPPreferences {
|
||||
return true;
|
||||
|
||||
ESP_LOGV(TAG, "Saving %zu items...", s_pending_save.size());
|
||||
// goal try write all pending saves even if one fails
|
||||
int cached = 0, written = 0, failed = 0;
|
||||
esp_err_t last_err = ESP_OK;
|
||||
uint32_t last_key = 0;
|
||||
|
||||
// go through vector from back to front (makes erase easier/more efficient)
|
||||
for (ssize_t i = s_pending_save.size() - 1; i >= 0; i--) {
|
||||
const auto &save = s_pending_save[i];
|
||||
for (const auto &save : s_pending_save) {
|
||||
char key_str[KEY_BUFFER_SIZE];
|
||||
snprintf(key_str, sizeof(key_str), "%" PRIu32, save.key);
|
||||
ESP_LOGVV(TAG, "Checking if NVS data %s has changed", key_str);
|
||||
@@ -150,8 +147,9 @@ class ESP32Preferences : public ESPPreferences {
|
||||
ESP_LOGV(TAG, "NVS data not changed skipping %" PRIu32 " len=%zu", save.key, save.len);
|
||||
cached++;
|
||||
}
|
||||
s_pending_save.erase(s_pending_save.begin() + i);
|
||||
}
|
||||
s_pending_save.clear();
|
||||
|
||||
ESP_LOGD(TAG, "Writing %d items: %d cached, %d written, %d failed", cached + written + failed, cached, written,
|
||||
failed);
|
||||
if (failed > 0) {
|
||||
|
||||
@@ -114,14 +114,11 @@ class LibreTinyPreferences : public ESPPreferences {
|
||||
return true;
|
||||
|
||||
ESP_LOGV(TAG, "Saving %zu items...", s_pending_save.size());
|
||||
// goal try write all pending saves even if one fails
|
||||
int cached = 0, written = 0, failed = 0;
|
||||
fdb_err_t last_err = FDB_NO_ERR;
|
||||
uint32_t last_key = 0;
|
||||
|
||||
// go through vector from back to front (makes erase easier/more efficient)
|
||||
for (ssize_t i = s_pending_save.size() - 1; i >= 0; i--) {
|
||||
const auto &save = s_pending_save[i];
|
||||
for (const auto &save : s_pending_save) {
|
||||
char key_str[KEY_BUFFER_SIZE];
|
||||
snprintf(key_str, sizeof(key_str), "%" PRIu32, save.key);
|
||||
ESP_LOGVV(TAG, "Checking if FDB data %s has changed", key_str);
|
||||
@@ -141,8 +138,9 @@ class LibreTinyPreferences : public ESPPreferences {
|
||||
ESP_LOGD(TAG, "FDB data not changed; skipping %" PRIu32 " len=%zu", save.key, save.len);
|
||||
cached++;
|
||||
}
|
||||
s_pending_save.erase(s_pending_save.begin() + i);
|
||||
}
|
||||
s_pending_save.clear();
|
||||
|
||||
ESP_LOGD(TAG, "Writing %d items: %d cached, %d written, %d failed", cached + written + failed, cached, written,
|
||||
failed);
|
||||
if (failed > 0) {
|
||||
|
||||
@@ -67,17 +67,26 @@ void MQTTCoverComponent::dump_config() {
|
||||
auto traits = this->cover_->get_traits();
|
||||
bool has_command_topic = traits.get_supports_position() || !traits.get_supports_tilt();
|
||||
LOG_MQTT_COMPONENT(true, has_command_topic);
|
||||
char topic_buf[MQTT_DEFAULT_TOPIC_MAX_LEN];
|
||||
#ifdef USE_MQTT_COVER_JSON
|
||||
if (this->use_json_format_) {
|
||||
ESP_LOGCONFIG(TAG, " JSON State Payload: YES");
|
||||
} else {
|
||||
#endif
|
||||
if (traits.get_supports_position()) {
|
||||
ESP_LOGCONFIG(TAG, " Position State Topic: '%s'", this->get_position_state_topic_to(topic_buf).c_str());
|
||||
}
|
||||
if (traits.get_supports_tilt()) {
|
||||
ESP_LOGCONFIG(TAG, " Tilt State Topic: '%s'", this->get_tilt_state_topic_to(topic_buf).c_str());
|
||||
}
|
||||
#ifdef USE_MQTT_COVER_JSON
|
||||
}
|
||||
#endif
|
||||
if (traits.get_supports_position()) {
|
||||
ESP_LOGCONFIG(TAG,
|
||||
" Position State Topic: '%s'\n"
|
||||
" Position Command Topic: '%s'",
|
||||
this->get_position_state_topic().c_str(), this->get_position_command_topic().c_str());
|
||||
ESP_LOGCONFIG(TAG, " Position Command Topic: '%s'", this->get_position_command_topic_to(topic_buf).c_str());
|
||||
}
|
||||
if (traits.get_supports_tilt()) {
|
||||
ESP_LOGCONFIG(TAG,
|
||||
" Tilt State Topic: '%s'\n"
|
||||
" Tilt Command Topic: '%s'",
|
||||
this->get_tilt_state_topic().c_str(), this->get_tilt_command_topic().c_str());
|
||||
ESP_LOGCONFIG(TAG, " Tilt Command Topic: '%s'", this->get_tilt_command_topic_to(topic_buf).c_str());
|
||||
}
|
||||
}
|
||||
void MQTTCoverComponent::send_discovery(JsonObject root, mqtt::SendDiscoveryConfig &config) {
|
||||
@@ -92,13 +101,33 @@ void MQTTCoverComponent::send_discovery(JsonObject root, mqtt::SendDiscoveryConf
|
||||
if (traits.get_is_assumed_state()) {
|
||||
root[MQTT_OPTIMISTIC] = true;
|
||||
}
|
||||
if (traits.get_supports_position()) {
|
||||
root[MQTT_POSITION_TOPIC] = this->get_position_state_topic();
|
||||
root[MQTT_SET_POSITION_TOPIC] = this->get_position_command_topic();
|
||||
}
|
||||
if (traits.get_supports_tilt()) {
|
||||
root[MQTT_TILT_STATUS_TOPIC] = this->get_tilt_state_topic();
|
||||
root[MQTT_TILT_COMMAND_TOPIC] = this->get_tilt_command_topic();
|
||||
char topic_buf[MQTT_DEFAULT_TOPIC_MAX_LEN];
|
||||
#ifdef USE_MQTT_COVER_JSON
|
||||
if (this->use_json_format_) {
|
||||
// JSON mode: all state published to state_topic as JSON, use templates to extract
|
||||
root[MQTT_VALUE_TEMPLATE] = ESPHOME_F("{{ value_json.state }}");
|
||||
if (traits.get_supports_position()) {
|
||||
root[MQTT_POSITION_TOPIC] = this->get_state_topic_to_(topic_buf);
|
||||
root[MQTT_POSITION_TEMPLATE] = ESPHOME_F("{{ value_json.position }}");
|
||||
root[MQTT_SET_POSITION_TOPIC] = this->get_position_command_topic_to(topic_buf);
|
||||
}
|
||||
if (traits.get_supports_tilt()) {
|
||||
root[MQTT_TILT_STATUS_TOPIC] = this->get_state_topic_to_(topic_buf);
|
||||
root[MQTT_TILT_STATUS_TEMPLATE] = ESPHOME_F("{{ value_json.tilt }}");
|
||||
root[MQTT_TILT_COMMAND_TOPIC] = this->get_tilt_command_topic_to(topic_buf);
|
||||
}
|
||||
} else
|
||||
#endif
|
||||
{
|
||||
// Standard mode: separate topics for position and tilt
|
||||
if (traits.get_supports_position()) {
|
||||
root[MQTT_POSITION_TOPIC] = this->get_position_state_topic_to(topic_buf);
|
||||
root[MQTT_SET_POSITION_TOPIC] = this->get_position_command_topic_to(topic_buf);
|
||||
}
|
||||
if (traits.get_supports_tilt()) {
|
||||
root[MQTT_TILT_STATUS_TOPIC] = this->get_tilt_state_topic_to(topic_buf);
|
||||
root[MQTT_TILT_COMMAND_TOPIC] = this->get_tilt_command_topic_to(topic_buf);
|
||||
}
|
||||
}
|
||||
if (traits.get_supports_tilt() && !traits.get_supports_position()) {
|
||||
config.command_topic = false;
|
||||
@@ -111,8 +140,24 @@ const EntityBase *MQTTCoverComponent::get_entity() const { return this->cover_;
|
||||
bool MQTTCoverComponent::send_initial_state() { return this->publish_state(); }
|
||||
bool MQTTCoverComponent::publish_state() {
|
||||
auto traits = this->cover_->get_traits();
|
||||
bool success = true;
|
||||
char topic_buf[MQTT_DEFAULT_TOPIC_MAX_LEN];
|
||||
#ifdef USE_MQTT_COVER_JSON
|
||||
if (this->use_json_format_) {
|
||||
return this->publish_json(this->get_state_topic_to_(topic_buf), [this, traits](JsonObject root) {
|
||||
// NOLINTBEGIN(clang-analyzer-cplusplus.NewDeleteLeaks) false positive with ArduinoJson
|
||||
root[ESPHOME_F("state")] = cover_state_to_mqtt_str(this->cover_->current_operation, this->cover_->position,
|
||||
traits.get_supports_position());
|
||||
if (traits.get_supports_position()) {
|
||||
root[ESPHOME_F("position")] = static_cast<int>(roundf(this->cover_->position * 100));
|
||||
}
|
||||
if (traits.get_supports_tilt()) {
|
||||
root[ESPHOME_F("tilt")] = static_cast<int>(roundf(this->cover_->tilt * 100));
|
||||
}
|
||||
// NOLINTEND(clang-analyzer-cplusplus.NewDeleteLeaks)
|
||||
});
|
||||
}
|
||||
#endif
|
||||
bool success = true;
|
||||
if (traits.get_supports_position()) {
|
||||
char pos[VALUE_ACCURACY_MAX_LEN];
|
||||
size_t len = value_accuracy_to_buf(pos, roundf(this->cover_->position * 100), 0);
|
||||
|
||||
@@ -27,12 +27,18 @@ class MQTTCoverComponent : public mqtt::MQTTComponent {
|
||||
bool publish_state();
|
||||
|
||||
void dump_config() override;
|
||||
#ifdef USE_MQTT_COVER_JSON
|
||||
void set_use_json_format(bool use_json_format) { this->use_json_format_ = use_json_format; }
|
||||
#endif
|
||||
|
||||
protected:
|
||||
const char *component_type() const override;
|
||||
const EntityBase *get_entity() const override;
|
||||
|
||||
cover::Cover *cover_;
|
||||
#ifdef USE_MQTT_COVER_JSON
|
||||
bool use_json_format_{false};
|
||||
#endif
|
||||
};
|
||||
|
||||
} // namespace esphome::mqtt
|
||||
|
||||
@@ -104,7 +104,7 @@ void OpenThreadComponent::ot_main() {
|
||||
esp_cli_custom_command_init();
|
||||
#endif // CONFIG_OPENTHREAD_CLI_ESP_EXTENSION
|
||||
|
||||
otLinkModeConfig link_mode_config = {0};
|
||||
otLinkModeConfig link_mode_config{};
|
||||
#if CONFIG_OPENTHREAD_FTD
|
||||
link_mode_config.mRxOnWhenIdle = true;
|
||||
link_mode_config.mDeviceType = true;
|
||||
|
||||
@@ -16,19 +16,13 @@ namespace esphome::socket {
|
||||
|
||||
class BSDSocketImpl final : public Socket {
|
||||
public:
|
||||
BSDSocketImpl(int fd, bool monitor_loop = false) : fd_(fd) {
|
||||
#ifdef USE_SOCKET_SELECT_SUPPORT
|
||||
BSDSocketImpl(int fd, bool monitor_loop = false) {
|
||||
this->fd_ = fd;
|
||||
// Register new socket with the application for select() if monitoring requested
|
||||
if (monitor_loop && this->fd_ >= 0) {
|
||||
// Only set loop_monitored_ to true if registration succeeds
|
||||
this->loop_monitored_ = App.register_socket_fd(this->fd_);
|
||||
} else {
|
||||
this->loop_monitored_ = false;
|
||||
}
|
||||
#else
|
||||
// Without select support, ignore monitor_loop parameter
|
||||
(void) monitor_loop;
|
||||
#endif
|
||||
}
|
||||
~BSDSocketImpl() override {
|
||||
if (!this->closed_) {
|
||||
@@ -52,12 +46,10 @@ class BSDSocketImpl final : public Socket {
|
||||
int bind(const struct sockaddr *addr, socklen_t addrlen) override { return ::bind(this->fd_, addr, addrlen); }
|
||||
int close() override {
|
||||
if (!this->closed_) {
|
||||
#ifdef USE_SOCKET_SELECT_SUPPORT
|
||||
// Unregister from select() before closing if monitored
|
||||
if (this->loop_monitored_) {
|
||||
App.unregister_socket_fd(this->fd_);
|
||||
}
|
||||
#endif
|
||||
int ret = ::close(this->fd_);
|
||||
this->closed_ = true;
|
||||
return ret;
|
||||
@@ -130,23 +122,6 @@ class BSDSocketImpl final : public Socket {
|
||||
::fcntl(this->fd_, F_SETFL, fl);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int get_fd() const override { return this->fd_; }
|
||||
|
||||
#ifdef USE_SOCKET_SELECT_SUPPORT
|
||||
bool ready() const override {
|
||||
if (!this->loop_monitored_)
|
||||
return true;
|
||||
return App.is_socket_ready(this->fd_);
|
||||
}
|
||||
#endif
|
||||
|
||||
protected:
|
||||
int fd_;
|
||||
bool closed_{false};
|
||||
#ifdef USE_SOCKET_SELECT_SUPPORT
|
||||
bool loop_monitored_{false};
|
||||
#endif
|
||||
};
|
||||
|
||||
// Helper to create a socket with optional monitoring
|
||||
|
||||
@@ -452,6 +452,8 @@ class LWIPRawImpl : public Socket {
|
||||
errno = ENOSYS;
|
||||
return -1;
|
||||
}
|
||||
bool ready() const override { return this->rx_buf_ != nullptr || this->rx_closed_ || this->pcb_ == nullptr; }
|
||||
|
||||
int setblocking(bool blocking) final {
|
||||
if (pcb_ == nullptr) {
|
||||
errno = ECONNRESET;
|
||||
@@ -576,6 +578,8 @@ class LWIPRawListenImpl final : public LWIPRawImpl {
|
||||
tcp_err(pcb_, LWIPRawImpl::s_err_fn); // Use base class error handler
|
||||
}
|
||||
|
||||
bool ready() const override { return this->accepted_socket_count_ > 0; }
|
||||
|
||||
std::unique_ptr<Socket> accept(struct sockaddr *addr, socklen_t *addrlen) override {
|
||||
if (pcb_ == nullptr) {
|
||||
errno = EBADF;
|
||||
|
||||
@@ -11,19 +11,13 @@ namespace esphome::socket {
|
||||
|
||||
class LwIPSocketImpl final : public Socket {
|
||||
public:
|
||||
LwIPSocketImpl(int fd, bool monitor_loop = false) : fd_(fd) {
|
||||
#ifdef USE_SOCKET_SELECT_SUPPORT
|
||||
LwIPSocketImpl(int fd, bool monitor_loop = false) {
|
||||
this->fd_ = fd;
|
||||
// Register new socket with the application for select() if monitoring requested
|
||||
if (monitor_loop && this->fd_ >= 0) {
|
||||
// Only set loop_monitored_ to true if registration succeeds
|
||||
this->loop_monitored_ = App.register_socket_fd(this->fd_);
|
||||
} else {
|
||||
this->loop_monitored_ = false;
|
||||
}
|
||||
#else
|
||||
// Without select support, ignore monitor_loop parameter
|
||||
(void) monitor_loop;
|
||||
#endif
|
||||
}
|
||||
~LwIPSocketImpl() override {
|
||||
if (!this->closed_) {
|
||||
@@ -49,12 +43,10 @@ class LwIPSocketImpl final : public Socket {
|
||||
int bind(const struct sockaddr *addr, socklen_t addrlen) override { return lwip_bind(this->fd_, addr, addrlen); }
|
||||
int close() override {
|
||||
if (!this->closed_) {
|
||||
#ifdef USE_SOCKET_SELECT_SUPPORT
|
||||
// Unregister from select() before closing if monitored
|
||||
if (this->loop_monitored_) {
|
||||
App.unregister_socket_fd(this->fd_);
|
||||
}
|
||||
#endif
|
||||
int ret = lwip_close(this->fd_);
|
||||
this->closed_ = true;
|
||||
return ret;
|
||||
@@ -97,23 +89,6 @@ class LwIPSocketImpl final : public Socket {
|
||||
lwip_fcntl(this->fd_, F_SETFL, fl);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int get_fd() const override { return this->fd_; }
|
||||
|
||||
#ifdef USE_SOCKET_SELECT_SUPPORT
|
||||
bool ready() const override {
|
||||
if (!this->loop_monitored_)
|
||||
return true;
|
||||
return App.is_socket_ready(this->fd_);
|
||||
}
|
||||
#endif
|
||||
|
||||
protected:
|
||||
int fd_;
|
||||
bool closed_{false};
|
||||
#ifdef USE_SOCKET_SELECT_SUPPORT
|
||||
bool loop_monitored_{false};
|
||||
#endif
|
||||
};
|
||||
|
||||
// Helper to create a socket with optional monitoring
|
||||
|
||||
@@ -10,6 +10,10 @@ namespace esphome::socket {
|
||||
|
||||
Socket::~Socket() {}
|
||||
|
||||
#ifdef USE_SOCKET_SELECT_SUPPORT
|
||||
bool Socket::ready() const { return !this->loop_monitored_ || App.is_socket_ready_(this->fd_); }
|
||||
#endif
|
||||
|
||||
// Platform-specific inet_ntop wrappers
|
||||
#if defined(USE_SOCKET_IMPL_LWIP_TCP)
|
||||
// LWIP raw TCP (ESP8266) uses inet_ntoa_r which takes struct by value
|
||||
|
||||
@@ -63,13 +63,29 @@ class Socket {
|
||||
virtual int setblocking(bool blocking) = 0;
|
||||
virtual int loop() { return 0; };
|
||||
|
||||
/// Get the underlying file descriptor (returns -1 if not supported)
|
||||
virtual int get_fd() const { return -1; }
|
||||
/// Get the underlying file descriptor (returns -1 if not supported)
|
||||
/// Non-virtual: only one socket implementation is active per build.
|
||||
#ifdef USE_SOCKET_SELECT_SUPPORT
|
||||
int get_fd() const { return this->fd_; }
|
||||
#else
|
||||
int get_fd() const { return -1; }
|
||||
#endif
|
||||
|
||||
/// Check if socket has data ready to read
|
||||
/// For loop-monitored sockets, checks with the Application's select() results
|
||||
/// For non-monitored sockets, always returns true (assumes data may be available)
|
||||
/// For select()-based sockets: non-virtual, checks Application's select() results
|
||||
/// For LWIP raw TCP sockets: virtual, checks internal buffer state
|
||||
#ifdef USE_SOCKET_SELECT_SUPPORT
|
||||
bool ready() const;
|
||||
#else
|
||||
virtual bool ready() const { return true; }
|
||||
#endif
|
||||
|
||||
protected:
|
||||
#ifdef USE_SOCKET_SELECT_SUPPORT
|
||||
int fd_{-1};
|
||||
bool closed_{false};
|
||||
bool loop_monitored_{false};
|
||||
#endif
|
||||
};
|
||||
|
||||
/// Create a socket of the given domain, type and protocol.
|
||||
|
||||
@@ -352,7 +352,26 @@ bool AsyncWebServerRequest::authenticate(const char *username, const char *passw
|
||||
esp_crypto_base64_encode(reinterpret_cast<uint8_t *>(digest), max_digest_len, &out,
|
||||
reinterpret_cast<const uint8_t *>(user_info), user_info_len);
|
||||
|
||||
return strcmp(digest, auth_str + auth_prefix_len) == 0;
|
||||
// Constant-time comparison to avoid timing side channels.
|
||||
// No early return on length mismatch — the length difference is folded
|
||||
// into the accumulator so any mismatch is rejected.
|
||||
const char *provided = auth_str + auth_prefix_len;
|
||||
size_t digest_len = out; // length from esp_crypto_base64_encode
|
||||
// Derive provided_len from the already-sized std::string rather than
|
||||
// rescanning with strlen (avoids attacker-controlled scan length).
|
||||
size_t provided_len = auth.value().size() - auth_prefix_len;
|
||||
// Use full-width XOR so any bit difference in the lengths is preserved
|
||||
// (uint8_t truncation would miss differences in higher bytes, e.g.
|
||||
// digest_len vs digest_len + 256).
|
||||
volatile size_t result = digest_len ^ provided_len;
|
||||
// Iterate over the expected digest length only — the full-width length
|
||||
// XOR above already rejects any length mismatch, and bounding the loop
|
||||
// prevents a long Authorization header from forcing extra work.
|
||||
for (size_t i = 0; i < digest_len; i++) {
|
||||
char provided_ch = (i < provided_len) ? provided[i] : 0;
|
||||
result |= static_cast<uint8_t>(digest[i] ^ provided_ch);
|
||||
}
|
||||
return result == 0;
|
||||
}
|
||||
|
||||
void AsyncWebServerRequest::requestAuthentication(const char *realm) const {
|
||||
|
||||
@@ -639,6 +639,7 @@ CONF_MOVEMENT_COUNTER = "movement_counter"
|
||||
CONF_MOVING_DISTANCE = "moving_distance"
|
||||
CONF_MQTT = "mqtt"
|
||||
CONF_MQTT_ID = "mqtt_id"
|
||||
CONF_MQTT_JSON_STATE_PAYLOAD = "mqtt_json_state_payload"
|
||||
CONF_MULTIPLE = "multiple"
|
||||
CONF_MULTIPLEXER = "multiplexer"
|
||||
CONF_MULTIPLY = "multiply"
|
||||
|
||||
@@ -609,15 +609,6 @@ void Application::unregister_socket_fd(int fd) {
|
||||
}
|
||||
}
|
||||
|
||||
bool Application::is_socket_ready(int fd) const {
|
||||
// This function is thread-safe for reading the result of select()
|
||||
// However, it should only be called after select() has been executed in the main loop
|
||||
// The read_fds_ is only modified by select() in the main loop
|
||||
if (fd < 0 || fd >= FD_SETSIZE)
|
||||
return false;
|
||||
|
||||
return FD_ISSET(fd, &this->read_fds_);
|
||||
}
|
||||
#endif
|
||||
|
||||
void Application::yield_with_select_(uint32_t delay_ms) {
|
||||
|
||||
@@ -101,6 +101,10 @@
|
||||
#include "esphome/components/update/update_entity.h"
|
||||
#endif
|
||||
|
||||
namespace esphome::socket {
|
||||
class Socket;
|
||||
} // namespace esphome::socket
|
||||
|
||||
namespace esphome {
|
||||
|
||||
// Teardown timeout constant (in milliseconds)
|
||||
@@ -491,7 +495,8 @@ class Application {
|
||||
void unregister_socket_fd(int fd);
|
||||
/// Check if there's data available on a socket without blocking
|
||||
/// This function is thread-safe for reading, but should be called after select() has run
|
||||
bool is_socket_ready(int fd) const;
|
||||
/// The read_fds_ is only modified by select() in the main loop
|
||||
bool is_socket_ready(int fd) const { return fd >= 0 && this->is_socket_ready_(fd); }
|
||||
|
||||
#ifdef USE_WAKE_LOOP_THREADSAFE
|
||||
/// Wake the main event loop from a FreeRTOS task
|
||||
@@ -503,6 +508,15 @@ class Application {
|
||||
|
||||
protected:
|
||||
friend Component;
|
||||
friend class socket::Socket;
|
||||
|
||||
#ifdef USE_SOCKET_SELECT_SUPPORT
|
||||
/// Fast path for Socket::ready() via friendship - skips negative fd check.
|
||||
/// Safe because: fd was validated in register_socket_fd() at registration time,
|
||||
/// and Socket::ready() only calls this when loop_monitored_ is true (registration succeeded).
|
||||
/// FD_ISSET may include its own upper bounds check depending on platform.
|
||||
bool is_socket_ready_(int fd) const { return FD_ISSET(fd, &this->read_fds_); }
|
||||
#endif
|
||||
|
||||
void register_component_(Component *comp);
|
||||
|
||||
|
||||
@@ -145,6 +145,7 @@
|
||||
#define USE_MD5
|
||||
#define USE_SHA256
|
||||
#define USE_MQTT
|
||||
#define USE_MQTT_COVER_JSON
|
||||
#define USE_NETWORK
|
||||
#define USE_ONLINE_IMAGE_BMP_SUPPORT
|
||||
#define USE_ONLINE_IMAGE_PNG_SUPPORT
|
||||
|
||||
@@ -219,6 +219,7 @@ cover:
|
||||
name: Template Cover
|
||||
state_topic: some/topic/cover
|
||||
qos: 2
|
||||
mqtt_json_state_payload: true
|
||||
lambda: |-
|
||||
if (id(some_binary_sensor).state) {
|
||||
return COVER_OPEN;
|
||||
@@ -231,6 +232,53 @@ cover:
|
||||
stop_action:
|
||||
- logger.log: stop_action
|
||||
optimistic: true
|
||||
- platform: template
|
||||
name: Template Cover with Position and Tilt
|
||||
state_topic: some/topic/cover_pt
|
||||
position_state_topic: some/topic/cover_pt/position
|
||||
position_command_topic: some/topic/cover_pt/position/set
|
||||
tilt_state_topic: some/topic/cover_pt/tilt
|
||||
tilt_command_topic: some/topic/cover_pt/tilt/set
|
||||
qos: 2
|
||||
has_position: true
|
||||
lambda: |-
|
||||
if (id(some_binary_sensor).state) {
|
||||
return COVER_OPEN;
|
||||
}
|
||||
return COVER_CLOSED;
|
||||
position_action:
|
||||
- logger.log: position_action
|
||||
tilt_action:
|
||||
- logger.log: tilt_action
|
||||
open_action:
|
||||
- logger.log: open_action
|
||||
close_action:
|
||||
- logger.log: close_action
|
||||
stop_action:
|
||||
- logger.log: stop_action
|
||||
optimistic: true
|
||||
- platform: template
|
||||
name: Template Cover with Position and Tilt JSON
|
||||
state_topic: some/topic/cover_pt_json
|
||||
qos: 2
|
||||
mqtt_json_state_payload: true
|
||||
has_position: true
|
||||
lambda: |-
|
||||
if (id(some_binary_sensor).state) {
|
||||
return COVER_OPEN;
|
||||
}
|
||||
return COVER_CLOSED;
|
||||
position_action:
|
||||
- logger.log: position_action
|
||||
tilt_action:
|
||||
- logger.log: tilt_action
|
||||
open_action:
|
||||
- logger.log: open_action
|
||||
close_action:
|
||||
- logger.log: close_action
|
||||
stop_action:
|
||||
- logger.log: stop_action
|
||||
optimistic: true
|
||||
|
||||
datetime:
|
||||
- platform: template
|
||||
|
||||
Reference in New Issue
Block a user