1
0
mirror of https://github.com/esphome/esphome.git synced 2026-02-08 00:31:58 +00:00

[api] Eliminate rx_buf heap churn and release buffers after initial sync (#12133)

This commit is contained in:
J. Nick Koston
2025-11-28 12:13:29 -06:00
committed by GitHub
parent 26e979d3d5
commit fb82362e9c
9 changed files with 44 additions and 22 deletions

View File

@@ -169,8 +169,7 @@ void APIConnection::loop() {
} else {
this->last_traffic_ = now;
// read a packet
this->read_message(buffer.data_len, buffer.type,
buffer.data_len > 0 ? &buffer.container[buffer.data_offset] : nullptr);
this->read_message(buffer.data_len, buffer.type, buffer.data);
if (this->flags_.remove)
return;
}
@@ -195,6 +194,9 @@ void APIConnection::loop() {
}
// Now that everything is sent, enable immediate sending for future state changes
this->flags_.should_try_send_immediately = true;
// Release excess memory from buffers that grew during initial sync
this->deferred_batch_.release_buffer();
this->helper_->release_buffers();
}
}

View File

@@ -554,10 +554,8 @@ class APIConnection final : public APIServerConnection {
std::vector<BatchItem> items;
uint32_t batch_start_time{0};
DeferredBatch() {
// Pre-allocate capacity for typical batch sizes to avoid reallocation
items.reserve(8);
}
// No pre-allocation - log connections never use batching, and for
// connections that do, buffers are released after initial sync anyway
// Add item to the batch
void add_item(EntityBase *entity, MessageCreator creator, uint8_t message_type, uint8_t estimated_size);
@@ -576,6 +574,15 @@ class APIConnection final : public APIServerConnection {
bool empty() const { return items.empty(); }
size_t size() const { return items.size(); }
const BatchItem &operator[](size_t index) const { return items[index]; }
// Release excess capacity - only releases if items already empty
void release_buffer() {
// Safe to call: batch is processed before release_buffer is called,
// and if any items remain (partial processing), we must not clear them.
// Use swap trick since shrink_to_fit() is non-binding and may be ignored.
if (items.empty()) {
std::vector<BatchItem>().swap(items);
}
}
};
// DeferredBatch here (16 bytes, 4-byte aligned)

View File

@@ -35,10 +35,9 @@ struct ClientInfo;
class ProtoWriteBuffer;
struct ReadPacketBuffer {
std::vector<uint8_t> container;
uint16_t type;
uint16_t data_offset;
const uint8_t *data; // Points directly into frame helper's rx_buf_ (valid until next read_packet call)
uint16_t data_len;
uint16_t type;
};
// Packed packet info structure to minimize memory usage
@@ -119,6 +118,22 @@ class APIFrameHelper {
uint8_t frame_footer_size() const { return frame_footer_size_; }
// Check if socket has data ready to read
bool is_socket_ready() const { return socket_ != nullptr && socket_->ready(); }
// Release excess memory from internal buffers after initial sync
void release_buffers() {
// rx_buf_: Safe to clear only if no partial read in progress.
// rx_buf_len_ tracks bytes read so far; if non-zero, we're mid-frame
// and clearing would lose partially received data.
if (this->rx_buf_len_ == 0) {
// Use swap trick since shrink_to_fit() is non-binding and may be ignored
std::vector<uint8_t>().swap(this->rx_buf_);
}
// reusable_iovs_: Safe to release unconditionally.
// Only used within write_protobuf_packets() calls - cleared at start,
// populated with pointers, used for writev(), then function returns.
// The iovecs contain stale pointers after the call (data was either sent
// or copied to tx_buf_), and are cleared on next write_protobuf_packets().
std::vector<struct iovec>().swap(this->reusable_iovs_);
}
protected:
// Buffer containing data to be sent

View File

@@ -407,8 +407,7 @@ APIError APINoiseFrameHelper::read_packet(ReadPacketBuffer *buffer) {
return APIError::BAD_DATA_PACKET;
}
buffer->container = std::move(this->rx_buf_);
buffer->data_offset = 4;
buffer->data = msg_data + 4; // Skip 4-byte header (type + length)
buffer->data_len = data_len;
buffer->type = type;
return APIError::OK;

View File

@@ -210,8 +210,7 @@ APIError APIPlaintextFrameHelper::read_packet(ReadPacketBuffer *buffer) {
return aerr;
}
buffer->container = std::move(this->rx_buf_);
buffer->data_offset = 0;
buffer->data = this->rx_buf_.data();
buffer->data_len = this->rx_header_parsed_len_;
buffer->type = this->rx_header_parsed_type_;
return APIError::OK;

View File

@@ -13,7 +13,7 @@ void APIServerConnectionBase::log_send_message_(const char *name, const std::str
}
#endif
void APIServerConnectionBase::read_message(uint32_t msg_size, uint32_t msg_type, uint8_t *msg_data) {
void APIServerConnectionBase::read_message(uint32_t msg_size, uint32_t msg_type, const uint8_t *msg_data) {
switch (msg_type) {
case HelloRequest::MESSAGE_TYPE: {
HelloRequest msg;
@@ -827,7 +827,7 @@ void APIServerConnection::on_z_wave_proxy_frame(const ZWaveProxyFrame &msg) { th
void APIServerConnection::on_z_wave_proxy_request(const ZWaveProxyRequest &msg) { this->zwave_proxy_request(msg); }
#endif
void APIServerConnection::read_message(uint32_t msg_size, uint32_t msg_type, uint8_t *msg_data) {
void APIServerConnection::read_message(uint32_t msg_size, uint32_t msg_type, const uint8_t *msg_data) {
// Check authentication/connection requirements for messages
switch (msg_type) {
case HelloRequest::MESSAGE_TYPE: // No setup required

View File

@@ -218,7 +218,7 @@ class APIServerConnectionBase : public ProtoService {
virtual void on_z_wave_proxy_request(const ZWaveProxyRequest &value){};
#endif
protected:
void read_message(uint32_t msg_size, uint32_t msg_type, uint8_t *msg_data) override;
void read_message(uint32_t msg_size, uint32_t msg_type, const uint8_t *msg_data) override;
};
class APIServerConnection : public APIServerConnectionBase {
@@ -480,7 +480,7 @@ class APIServerConnection : public APIServerConnectionBase {
#ifdef USE_ZWAVE_PROXY
void on_z_wave_proxy_request(const ZWaveProxyRequest &msg) override;
#endif
void read_message(uint32_t msg_size, uint32_t msg_type, uint8_t *msg_data) override;
void read_message(uint32_t msg_size, uint32_t msg_type, const uint8_t *msg_data) override;
};
} // namespace esphome::api

View File

@@ -846,7 +846,7 @@ class ProtoService {
*/
virtual ProtoWriteBuffer create_buffer(uint32_t reserve_size) = 0;
virtual bool send_buffer(ProtoWriteBuffer buffer, uint8_t message_type) = 0;
virtual void read_message(uint32_t msg_size, uint32_t msg_type, uint8_t *msg_data) = 0;
virtual void read_message(uint32_t msg_size, uint32_t msg_type, const uint8_t *msg_data) = 0;
// Optimized method that pre-allocates buffer based on message size
bool send_message_(const ProtoMessage &msg, uint8_t message_type) {

View File

@@ -2769,8 +2769,8 @@ static const char *const TAG = "api.service";
cases = list(RECEIVE_CASES.items())
cases.sort()
hpp += " protected:\n"
hpp += " void read_message(uint32_t msg_size, uint32_t msg_type, uint8_t *msg_data) override;\n"
out = f"void {class_name}::read_message(uint32_t msg_size, uint32_t msg_type, uint8_t *msg_data) {{\n"
hpp += " void read_message(uint32_t msg_size, uint32_t msg_type, const uint8_t *msg_data) override;\n"
out = f"void {class_name}::read_message(uint32_t msg_size, uint32_t msg_type, const uint8_t *msg_data) {{\n"
out += " switch (msg_type) {\n"
for i, (case, ifdef, message_name) in cases:
if ifdef is not None:
@@ -2878,9 +2878,9 @@ static const char *const TAG = "api.service";
result += "#endif\n"
return result
hpp_protected += " void read_message(uint32_t msg_size, uint32_t msg_type, uint8_t *msg_data) override;\n"
hpp_protected += " void read_message(uint32_t msg_size, uint32_t msg_type, const uint8_t *msg_data) override;\n"
cpp += f"\nvoid {class_name}::read_message(uint32_t msg_size, uint32_t msg_type, uint8_t *msg_data) {{\n"
cpp += f"\nvoid {class_name}::read_message(uint32_t msg_size, uint32_t msg_type, const uint8_t *msg_data) {{\n"
cpp += " // Check authentication/connection requirements for messages\n"
cpp += " switch (msg_type) {\n"