mirror of
https://github.com/esphome/esphome.git
synced 2025-10-03 10:32:21 +01:00
[api] Add configurable send queue limit to prevent OOM crashes
This commit is contained in:
@@ -61,6 +61,7 @@ CONF_HOMEASSISTANT_SERVICES = "homeassistant_services"
|
|||||||
CONF_HOMEASSISTANT_STATES = "homeassistant_states"
|
CONF_HOMEASSISTANT_STATES = "homeassistant_states"
|
||||||
CONF_LISTEN_BACKLOG = "listen_backlog"
|
CONF_LISTEN_BACKLOG = "listen_backlog"
|
||||||
CONF_MAX_CONNECTIONS = "max_connections"
|
CONF_MAX_CONNECTIONS = "max_connections"
|
||||||
|
CONF_MAX_SEND_QUEUE = "max_send_queue"
|
||||||
|
|
||||||
|
|
||||||
def validate_encryption_key(value):
|
def validate_encryption_key(value):
|
||||||
@@ -183,6 +184,19 @@ CONFIG_SCHEMA = cv.All(
|
|||||||
host=8, # Abundant resources
|
host=8, # Abundant resources
|
||||||
ln882x=8, # Moderate RAM
|
ln882x=8, # Moderate RAM
|
||||||
): cv.int_range(min=1, max=20),
|
): cv.int_range(min=1, max=20),
|
||||||
|
# Maximum queued send buffers per connection before dropping connection
|
||||||
|
# Each buffer uses ~8-12 bytes overhead plus actual message size
|
||||||
|
# Platform defaults based on available RAM and typical message rates:
|
||||||
|
cv.SplitDefault(
|
||||||
|
CONF_MAX_SEND_QUEUE,
|
||||||
|
esp8266=5, # Limited RAM, need to fail fast
|
||||||
|
esp32=8, # More RAM, can buffer more
|
||||||
|
rp2040=5, # Limited RAM
|
||||||
|
bk72xx=8, # Moderate RAM
|
||||||
|
rtl87xx=8, # Moderate RAM
|
||||||
|
host=16, # Abundant resources
|
||||||
|
ln882x=8, # Moderate RAM
|
||||||
|
): cv.int_range(min=1, max=32),
|
||||||
}
|
}
|
||||||
).extend(cv.COMPONENT_SCHEMA),
|
).extend(cv.COMPONENT_SCHEMA),
|
||||||
cv.rename_key(CONF_SERVICES, CONF_ACTIONS),
|
cv.rename_key(CONF_SERVICES, CONF_ACTIONS),
|
||||||
@@ -205,6 +219,7 @@ async def to_code(config):
|
|||||||
cg.add(var.set_listen_backlog(config[CONF_LISTEN_BACKLOG]))
|
cg.add(var.set_listen_backlog(config[CONF_LISTEN_BACKLOG]))
|
||||||
if CONF_MAX_CONNECTIONS in config:
|
if CONF_MAX_CONNECTIONS in config:
|
||||||
cg.add(var.set_max_connections(config[CONF_MAX_CONNECTIONS]))
|
cg.add(var.set_max_connections(config[CONF_MAX_CONNECTIONS]))
|
||||||
|
cg.add_define("API_MAX_SEND_QUEUE", config.get(CONF_MAX_SEND_QUEUE, 5))
|
||||||
|
|
||||||
# Set USE_API_SERVICES if any services are enabled
|
# Set USE_API_SERVICES if any services are enabled
|
||||||
if config.get(CONF_ACTIONS) or config[CONF_CUSTOM_SERVICES]:
|
if config.get(CONF_ACTIONS) or config[CONF_CUSTOM_SERVICES]:
|
||||||
|
@@ -80,7 +80,7 @@ const LogString *api_error_to_logstr(APIError err) {
|
|||||||
|
|
||||||
// Default implementation for loop - handles sending buffered data
|
// Default implementation for loop - handles sending buffered data
|
||||||
APIError APIFrameHelper::loop() {
|
APIError APIFrameHelper::loop() {
|
||||||
if (!this->tx_buf_.empty()) {
|
if (this->tx_buf_count_ > 0) {
|
||||||
APIError err = try_send_tx_buf_();
|
APIError err = try_send_tx_buf_();
|
||||||
if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
|
if (err != APIError::OK && err != APIError::WOULD_BLOCK) {
|
||||||
return err;
|
return err;
|
||||||
@@ -102,9 +102,16 @@ APIError APIFrameHelper::handle_socket_write_error_() {
|
|||||||
// Helper method to buffer data from IOVs
|
// Helper method to buffer data from IOVs
|
||||||
void APIFrameHelper::buffer_data_from_iov_(const struct iovec *iov, int iovcnt, uint16_t total_write_len,
|
void APIFrameHelper::buffer_data_from_iov_(const struct iovec *iov, int iovcnt, uint16_t total_write_len,
|
||||||
uint16_t offset) {
|
uint16_t offset) {
|
||||||
SendBuffer buffer;
|
// Check if queue is full
|
||||||
buffer.size = total_write_len - offset;
|
if (this->tx_buf_count_ >= API_MAX_SEND_QUEUE) {
|
||||||
buffer.data = std::make_unique<uint8_t[]>(buffer.size);
|
HELPER_LOG("Send queue full (%u buffers), dropping connection", this->tx_buf_count_);
|
||||||
|
this->state_ = State::FAILED;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto buffer = std::make_unique<SendBuffer>();
|
||||||
|
buffer->size = total_write_len - offset;
|
||||||
|
buffer->data = std::make_unique<uint8_t[]>(buffer->size);
|
||||||
|
|
||||||
uint16_t to_skip = offset;
|
uint16_t to_skip = offset;
|
||||||
uint16_t write_pos = 0;
|
uint16_t write_pos = 0;
|
||||||
@@ -117,12 +124,16 @@ void APIFrameHelper::buffer_data_from_iov_(const struct iovec *iov, int iovcnt,
|
|||||||
// Include this segment (partially or fully)
|
// Include this segment (partially or fully)
|
||||||
const uint8_t *src = reinterpret_cast<uint8_t *>(iov[i].iov_base) + to_skip;
|
const uint8_t *src = reinterpret_cast<uint8_t *>(iov[i].iov_base) + to_skip;
|
||||||
uint16_t len = static_cast<uint16_t>(iov[i].iov_len) - to_skip;
|
uint16_t len = static_cast<uint16_t>(iov[i].iov_len) - to_skip;
|
||||||
std::memcpy(buffer.data.get() + write_pos, src, len);
|
std::memcpy(buffer->data.get() + write_pos, src, len);
|
||||||
write_pos += len;
|
write_pos += len;
|
||||||
to_skip = 0;
|
to_skip = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this->tx_buf_.push_back(std::move(buffer));
|
|
||||||
|
// Add to circular buffer
|
||||||
|
this->tx_buf_[this->tx_buf_tail_] = std::move(buffer);
|
||||||
|
this->tx_buf_tail_ = (this->tx_buf_tail_ + 1) % API_MAX_SEND_QUEUE;
|
||||||
|
this->tx_buf_count_++;
|
||||||
}
|
}
|
||||||
|
|
||||||
// This method writes data to socket or buffers it
|
// This method writes data to socket or buffers it
|
||||||
@@ -140,7 +151,7 @@ APIError APIFrameHelper::write_raw_(const struct iovec *iov, int iovcnt, uint16_
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
// Try to send any existing buffered data first if there is any
|
// Try to send any existing buffered data first if there is any
|
||||||
if (!this->tx_buf_.empty()) {
|
if (this->tx_buf_count_ > 0) {
|
||||||
APIError send_result = try_send_tx_buf_();
|
APIError send_result = try_send_tx_buf_();
|
||||||
// If real error occurred (not just WOULD_BLOCK), return it
|
// If real error occurred (not just WOULD_BLOCK), return it
|
||||||
if (send_result != APIError::OK && send_result != APIError::WOULD_BLOCK) {
|
if (send_result != APIError::OK && send_result != APIError::WOULD_BLOCK) {
|
||||||
@@ -149,7 +160,7 @@ APIError APIFrameHelper::write_raw_(const struct iovec *iov, int iovcnt, uint16_
|
|||||||
|
|
||||||
// If there is still data in the buffer, we can't send, buffer
|
// If there is still data in the buffer, we can't send, buffer
|
||||||
// the new data and return
|
// the new data and return
|
||||||
if (!this->tx_buf_.empty()) {
|
if (this->tx_buf_count_ > 0) {
|
||||||
this->buffer_data_from_iov_(iov, iovcnt, total_write_len, 0);
|
this->buffer_data_from_iov_(iov, iovcnt, total_write_len, 0);
|
||||||
return APIError::OK; // Success, data buffered
|
return APIError::OK; // Success, data buffered
|
||||||
}
|
}
|
||||||
@@ -177,32 +188,31 @@ APIError APIFrameHelper::write_raw_(const struct iovec *iov, int iovcnt, uint16_
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Common implementation for trying to send buffered data
|
// Common implementation for trying to send buffered data
|
||||||
// IMPORTANT: Caller MUST ensure tx_buf_ is not empty before calling this method
|
// IMPORTANT: Caller MUST ensure tx_buf_count_ > 0 before calling this method
|
||||||
APIError APIFrameHelper::try_send_tx_buf_() {
|
APIError APIFrameHelper::try_send_tx_buf_() {
|
||||||
// Try to send from tx_buf - we assume it's not empty as it's the caller's responsibility to check
|
// Try to send from tx_buf - we assume it's not empty as it's the caller's responsibility to check
|
||||||
bool tx_buf_empty = false;
|
while (this->tx_buf_count_ > 0) {
|
||||||
while (!tx_buf_empty) {
|
|
||||||
// Get the first buffer in the queue
|
// Get the first buffer in the queue
|
||||||
SendBuffer &front_buffer = this->tx_buf_.front();
|
SendBuffer *front_buffer = this->tx_buf_[this->tx_buf_head_].get();
|
||||||
|
|
||||||
// Try to send the remaining data in this buffer
|
// Try to send the remaining data in this buffer
|
||||||
ssize_t sent = this->socket_->write(front_buffer.current_data(), front_buffer.remaining());
|
ssize_t sent = this->socket_->write(front_buffer->current_data(), front_buffer->remaining());
|
||||||
|
|
||||||
if (sent == -1) {
|
if (sent == -1) {
|
||||||
return this->handle_socket_write_error_();
|
return this->handle_socket_write_error_();
|
||||||
} else if (sent == 0) {
|
} else if (sent == 0) {
|
||||||
// Nothing sent but not an error
|
// Nothing sent but not an error
|
||||||
return APIError::WOULD_BLOCK;
|
return APIError::WOULD_BLOCK;
|
||||||
} else if (static_cast<uint16_t>(sent) < front_buffer.remaining()) {
|
} else if (static_cast<uint16_t>(sent) < front_buffer->remaining()) {
|
||||||
// Partially sent, update offset
|
// Partially sent, update offset
|
||||||
// Cast to ensure no overflow issues with uint16_t
|
// Cast to ensure no overflow issues with uint16_t
|
||||||
front_buffer.offset += static_cast<uint16_t>(sent);
|
front_buffer->offset += static_cast<uint16_t>(sent);
|
||||||
return APIError::WOULD_BLOCK; // Stop processing more buffers if we couldn't send a complete buffer
|
return APIError::WOULD_BLOCK; // Stop processing more buffers if we couldn't send a complete buffer
|
||||||
} else {
|
} else {
|
||||||
// Buffer completely sent, remove it from the queue
|
// Buffer completely sent, remove it from the queue
|
||||||
this->tx_buf_.pop_front();
|
this->tx_buf_[this->tx_buf_head_].reset();
|
||||||
// Update empty status for the loop condition
|
this->tx_buf_head_ = (this->tx_buf_head_ + 1) % API_MAX_SEND_QUEUE;
|
||||||
tx_buf_empty = this->tx_buf_.empty();
|
this->tx_buf_count_--;
|
||||||
// Continue loop to try sending the next buffer
|
// Continue loop to try sending the next buffer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,7 +1,8 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
#include <array>
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <deque>
|
|
||||||
#include <limits>
|
#include <limits>
|
||||||
|
#include <memory>
|
||||||
#include <span>
|
#include <span>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
@@ -79,7 +80,7 @@ class APIFrameHelper {
|
|||||||
virtual APIError init() = 0;
|
virtual APIError init() = 0;
|
||||||
virtual APIError loop();
|
virtual APIError loop();
|
||||||
virtual APIError read_packet(ReadPacketBuffer *buffer) = 0;
|
virtual APIError read_packet(ReadPacketBuffer *buffer) = 0;
|
||||||
bool can_write_without_blocking() { return state_ == State::DATA && tx_buf_.empty(); }
|
bool can_write_without_blocking() { return state_ == State::DATA && tx_buf_count_ == 0; }
|
||||||
std::string getpeername() { return socket_->getpeername(); }
|
std::string getpeername() { return socket_->getpeername(); }
|
||||||
int getpeername(struct sockaddr *addr, socklen_t *addrlen) { return socket_->getpeername(addr, addrlen); }
|
int getpeername(struct sockaddr *addr, socklen_t *addrlen) { return socket_->getpeername(addr, addrlen); }
|
||||||
APIError close() {
|
APIError close() {
|
||||||
@@ -161,7 +162,7 @@ class APIFrameHelper {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Containers (size varies, but typically 12+ bytes on 32-bit)
|
// Containers (size varies, but typically 12+ bytes on 32-bit)
|
||||||
std::deque<SendBuffer> tx_buf_;
|
std::array<std::unique_ptr<SendBuffer>, API_MAX_SEND_QUEUE> tx_buf_;
|
||||||
std::vector<struct iovec> reusable_iovs_;
|
std::vector<struct iovec> reusable_iovs_;
|
||||||
std::vector<uint8_t> rx_buf_;
|
std::vector<uint8_t> rx_buf_;
|
||||||
|
|
||||||
@@ -174,7 +175,10 @@ class APIFrameHelper {
|
|||||||
State state_{State::INITIALIZE};
|
State state_{State::INITIALIZE};
|
||||||
uint8_t frame_header_padding_{0};
|
uint8_t frame_header_padding_{0};
|
||||||
uint8_t frame_footer_size_{0};
|
uint8_t frame_footer_size_{0};
|
||||||
// 5 bytes total, 3 bytes padding
|
uint8_t tx_buf_head_{0};
|
||||||
|
uint8_t tx_buf_tail_{0};
|
||||||
|
uint8_t tx_buf_count_{0};
|
||||||
|
// 8 bytes total, 0 bytes padding
|
||||||
|
|
||||||
// Common initialization for both plaintext and noise protocols
|
// Common initialization for both plaintext and noise protocols
|
||||||
APIError init_common_();
|
APIError init_common_();
|
||||||
|
@@ -115,6 +115,7 @@
|
|||||||
#define USE_API_NOISE
|
#define USE_API_NOISE
|
||||||
#define USE_API_PLAINTEXT
|
#define USE_API_PLAINTEXT
|
||||||
#define USE_API_SERVICES
|
#define USE_API_SERVICES
|
||||||
|
#define API_MAX_SEND_QUEUE 8
|
||||||
#define USE_MD5
|
#define USE_MD5
|
||||||
#define USE_SHA256
|
#define USE_SHA256
|
||||||
#define USE_MQTT
|
#define USE_MQTT
|
||||||
|
Reference in New Issue
Block a user