From 850f18922595650c223cc856887e0639fe94d756 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 4 Jan 2026 13:44:49 -1000 Subject: [PATCH] [api] Fix message batch size mismatch and improve naming consistency (#12940) --- esphome/components/api/api_connection.cpp | 32 +++++++------- esphome/components/api/api_connection.h | 11 ++--- esphome/components/api/api_frame_helper.h | 23 +++++----- .../components/api/api_frame_helper_noise.cpp | 44 +++++++++---------- .../components/api/api_frame_helper_noise.h | 2 +- .../api/api_frame_helper_plaintext.cpp | 37 ++++++++-------- .../api/api_frame_helper_plaintext.h | 2 +- esphome/core/helpers.h | 4 ++ 8 files changed, 76 insertions(+), 79 deletions(-) diff --git a/esphome/components/api/api_connection.cpp b/esphome/components/api/api_connection.cpp index 3ded5e4408..b173ebc8cb 100644 --- a/esphome/components/api/api_connection.cpp +++ b/esphome/components/api/api_connection.cpp @@ -1874,9 +1874,9 @@ bool APIConnection::schedule_batch_() { } void APIConnection::process_batch_() { - // Ensure PacketInfo remains trivially destructible for our placement new approach - static_assert(std::is_trivially_destructible::value, - "PacketInfo must remain trivially destructible with this placement-new approach"); + // Ensure MessageInfo remains trivially destructible for our placement new approach + static_assert(std::is_trivially_destructible::value, + "MessageInfo must remain trivially destructible with this placement-new approach"); if (this->deferred_batch_.empty()) { this->flags_.batch_scheduled = false; @@ -1916,12 +1916,12 @@ void APIConnection::process_batch_() { return; } - size_t packets_to_process = std::min(num_items, MAX_PACKETS_PER_BATCH); + size_t messages_to_process = std::min(num_items, MAX_MESSAGES_PER_BATCH); - // Stack-allocated array for packet info - alignas(PacketInfo) char packet_info_storage[MAX_PACKETS_PER_BATCH * sizeof(PacketInfo)]; - PacketInfo *packet_info = reinterpret_cast(packet_info_storage); - size_t packet_count = 0; + // Stack-allocated array for message info + alignas(MessageInfo) char message_info_storage[MAX_MESSAGES_PER_BATCH * sizeof(MessageInfo)]; + MessageInfo *message_info = reinterpret_cast(message_info_storage); + size_t message_count = 0; // Cache these values to avoid repeated virtual calls const uint8_t header_padding = this->helper_->frame_header_padding(); @@ -1952,7 +1952,7 @@ void APIConnection::process_batch_() { uint32_t current_offset = 0; // Process items and encode directly to buffer (up to our limit) - for (size_t i = 0; i < packets_to_process; i++) { + for (size_t i = 0; i < messages_to_process; i++) { const auto &item = this->deferred_batch_[i]; // Try to encode message // The creator will calculate overhead to determine if the message fits @@ -1966,11 +1966,11 @@ void APIConnection::process_batch_() { // Message was encoded successfully // payload_size is header_padding + actual payload size + footer_size uint16_t proto_payload_size = payload_size - header_padding - footer_size; - // Use placement new to construct PacketInfo in pre-allocated stack array - // This avoids default-constructing all MAX_PACKETS_PER_BATCH elements - // Explicit destruction is not needed because PacketInfo is trivially destructible, + // Use placement new to construct MessageInfo in pre-allocated stack array + // This avoids default-constructing all MAX_MESSAGES_PER_BATCH elements + // Explicit destruction is not needed because MessageInfo is trivially destructible, // as ensured by the static_assert in its definition. - new (&packet_info[packet_count++]) PacketInfo(item.message_type, current_offset, proto_payload_size); + new (&message_info[message_count++]) MessageInfo(item.message_type, current_offset, proto_payload_size); // Update tracking variables items_processed++; @@ -1994,9 +1994,9 @@ void APIConnection::process_batch_() { shared_buf.resize(shared_buf.size() + footer_size); } - // Send all collected packets - APIError err = this->helper_->write_protobuf_packets(ProtoWriteBuffer{&shared_buf}, - std::span(packet_info, packet_count)); + // Send all collected messages + APIError err = this->helper_->write_protobuf_messages(ProtoWriteBuffer{&shared_buf}, + std::span(message_info, message_count)); if (err != APIError::OK && err != APIError::WOULD_BLOCK) { this->fatal_error_with_log_(LOG_STR("Batch write failed"), err); } diff --git a/esphome/components/api/api_connection.h b/esphome/components/api/api_connection.h index ffe3614f20..cffd52bfdb 100644 --- a/esphome/components/api/api_connection.h +++ b/esphome/components/api/api_connection.h @@ -28,14 +28,9 @@ static constexpr uint32_t KEEPALIVE_TIMEOUT_MS = 60000; // TODO: Remove MAX_INITIAL_PER_BATCH_LEGACY before 2026.7.0 - all clients should support API 1.14 by then static constexpr size_t MAX_INITIAL_PER_BATCH_LEGACY = 24; // For clients < API 1.14 (includes object_id) static constexpr size_t MAX_INITIAL_PER_BATCH = 34; // For clients >= API 1.14 (no object_id) -// Maximum number of packets to process in a single batch (platform-dependent) -// This limit exists to prevent stack overflow from the PacketInfo array in process_batch_ -// Each PacketInfo is 8 bytes, so 64 * 8 = 512 bytes, 32 * 8 = 256 bytes -#if defined(USE_ESP32) || defined(USE_HOST) -static constexpr size_t MAX_PACKETS_PER_BATCH = 64; // ESP32 has 8KB+ stack, HOST has plenty -#else -static constexpr size_t MAX_PACKETS_PER_BATCH = 32; // ESP8266/RP2040/etc have smaller stacks -#endif +// Verify MAX_MESSAGES_PER_BATCH (defined in api_frame_helper.h) can hold the initial batch +static_assert(MAX_MESSAGES_PER_BATCH >= MAX_INITIAL_PER_BATCH, + "MAX_MESSAGES_PER_BATCH must be >= MAX_INITIAL_PER_BATCH"); class APIConnection final : public APIServerConnection { public: diff --git a/esphome/components/api/api_frame_helper.h b/esphome/components/api/api_frame_helper.h index b582bcea9a..383e763e6d 100644 --- a/esphome/components/api/api_frame_helper.h +++ b/esphome/components/api/api_frame_helper.h @@ -29,6 +29,10 @@ static constexpr uint16_t MAX_MESSAGE_SIZE = 8192; // 8 KiB for ESP8266 static constexpr uint16_t MAX_MESSAGE_SIZE = 32768; // 32 KiB for ESP32 and other platforms #endif +// Maximum number of messages to batch in a single write operation +// Must be >= MAX_INITIAL_PER_BATCH in api_connection.h (enforced by static_assert there) +static constexpr size_t MAX_MESSAGES_PER_BATCH = 34; + // Forward declaration struct ClientInfo; @@ -40,13 +44,13 @@ struct ReadPacketBuffer { uint16_t type; }; -// Packed packet info structure to minimize memory usage -struct PacketInfo { +// Packed message info structure to minimize memory usage +struct MessageInfo { uint16_t offset; // Offset in buffer where message starts uint16_t payload_size; // Size of the message payload uint8_t message_type; // Message type (0-255) - PacketInfo(uint8_t type, uint16_t off, uint16_t size) : offset(off), payload_size(size), message_type(type) {} + MessageInfo(uint8_t type, uint16_t off, uint16_t size) : offset(off), payload_size(size), message_type(type) {} }; enum class APIError : uint16_t { @@ -108,10 +112,10 @@ class APIFrameHelper { return APIError::OK; } virtual APIError write_protobuf_packet(uint8_t type, ProtoWriteBuffer buffer) = 0; - // Write multiple protobuf packets in a single operation - // packets contains (message_type, offset, length) for each message in the buffer + // Write multiple protobuf messages in a single operation + // messages contains (message_type, offset, length) for each message in the buffer // The buffer contains all messages with appropriate padding before each - virtual APIError write_protobuf_packets(ProtoWriteBuffer buffer, std::span packets) = 0; + virtual APIError write_protobuf_messages(ProtoWriteBuffer buffer, std::span messages) = 0; // Get the frame header padding required by this protocol uint8_t frame_header_padding() const { return frame_header_padding_; } // Get the frame footer size required by this protocol @@ -127,12 +131,6 @@ class APIFrameHelper { // Use swap trick since shrink_to_fit() is non-binding and may be ignored std::vector().swap(this->rx_buf_); } - // reusable_iovs_: Safe to release unconditionally. - // Only used within write_protobuf_packets() calls - cleared at start, - // populated with pointers, used for writev(), then function returns. - // The iovecs contain stale pointers after the call (data was either sent - // or copied to tx_buf_), and are cleared on next write_protobuf_packets(). - std::vector().swap(this->reusable_iovs_); } protected: @@ -186,7 +184,6 @@ class APIFrameHelper { // Containers (size varies, but typically 12+ bytes on 32-bit) std::array, API_MAX_SEND_QUEUE> tx_buf_; - std::vector reusable_iovs_; std::vector rx_buf_; // Pointer to client info (4 bytes on 32-bit) diff --git a/esphome/components/api/api_frame_helper_noise.cpp b/esphome/components/api/api_frame_helper_noise.cpp index 37b497e2a1..be8d93fbf9 100644 --- a/esphome/components/api/api_frame_helper_noise.cpp +++ b/esphome/components/api/api_frame_helper_noise.cpp @@ -429,12 +429,12 @@ APIError APINoiseFrameHelper::read_packet(ReadPacketBuffer *buffer) { APIError APINoiseFrameHelper::write_protobuf_packet(uint8_t type, ProtoWriteBuffer buffer) { // Resize to include MAC space (required for Noise encryption) buffer.get_buffer()->resize(buffer.get_buffer()->size() + frame_footer_size_); - PacketInfo packet{type, 0, - static_cast(buffer.get_buffer()->size() - frame_header_padding_ - frame_footer_size_)}; - return write_protobuf_packets(buffer, std::span(&packet, 1)); + MessageInfo msg{type, 0, + static_cast(buffer.get_buffer()->size() - frame_header_padding_ - frame_footer_size_)}; + return write_protobuf_messages(buffer, std::span(&msg, 1)); } -APIError APINoiseFrameHelper::write_protobuf_packets(ProtoWriteBuffer buffer, std::span packets) { +APIError APINoiseFrameHelper::write_protobuf_messages(ProtoWriteBuffer buffer, std::span messages) { APIError aerr = state_action_(); if (aerr != APIError::OK) { return aerr; @@ -444,20 +444,20 @@ APIError APINoiseFrameHelper::write_protobuf_packets(ProtoWriteBuffer buffer, st return APIError::WOULD_BLOCK; } - if (packets.empty()) { + if (messages.empty()) { return APIError::OK; } uint8_t *buffer_data = buffer.get_buffer()->data(); - this->reusable_iovs_.clear(); - this->reusable_iovs_.reserve(packets.size()); + // Stack-allocated iovec array - no heap allocation + StaticVector iovs; uint16_t total_write_len = 0; - // We need to encrypt each packet in place - for (const auto &packet : packets) { + // We need to encrypt each message in place + for (const auto &msg : messages) { // The buffer already has padding at offset - uint8_t *buf_start = buffer_data + packet.offset; + uint8_t *buf_start = buffer_data + msg.offset; // Write noise header buf_start[0] = 0x01; // indicator @@ -465,10 +465,10 @@ APIError APINoiseFrameHelper::write_protobuf_packets(ProtoWriteBuffer buffer, st // Write message header (to be encrypted) const uint8_t msg_offset = 3; - buf_start[msg_offset] = static_cast(packet.message_type >> 8); // type high byte - buf_start[msg_offset + 1] = static_cast(packet.message_type); // type low byte - buf_start[msg_offset + 2] = static_cast(packet.payload_size >> 8); // data_len high byte - buf_start[msg_offset + 3] = static_cast(packet.payload_size); // data_len low byte + buf_start[msg_offset] = static_cast(msg.message_type >> 8); // type high byte + buf_start[msg_offset + 1] = static_cast(msg.message_type); // type low byte + buf_start[msg_offset + 2] = static_cast(msg.payload_size >> 8); // data_len high byte + buf_start[msg_offset + 3] = static_cast(msg.payload_size); // data_len low byte // payload data is already in the buffer starting at offset + 7 // Make sure we have space for MAC @@ -477,8 +477,8 @@ APIError APINoiseFrameHelper::write_protobuf_packets(ProtoWriteBuffer buffer, st // Encrypt the message in place NoiseBuffer mbuf; noise_buffer_init(mbuf); - noise_buffer_set_inout(mbuf, buf_start + msg_offset, 4 + packet.payload_size, - 4 + packet.payload_size + frame_footer_size_); + noise_buffer_set_inout(mbuf, buf_start + msg_offset, 4 + msg.payload_size, + 4 + msg.payload_size + frame_footer_size_); int err = noise_cipherstate_encrypt(send_cipher_, &mbuf); APIError aerr = @@ -490,14 +490,14 @@ APIError APINoiseFrameHelper::write_protobuf_packets(ProtoWriteBuffer buffer, st buf_start[1] = static_cast(mbuf.size >> 8); buf_start[2] = static_cast(mbuf.size); - // Add iovec for this encrypted packet - size_t packet_len = static_cast(3 + mbuf.size); // indicator + size + encrypted data - this->reusable_iovs_.push_back({buf_start, packet_len}); - total_write_len += packet_len; + // Add iovec for this encrypted message + size_t msg_len = static_cast(3 + mbuf.size); // indicator + size + encrypted data + iovs.push_back({buf_start, msg_len}); + total_write_len += msg_len; } - // Send all encrypted packets in one writev call - return this->write_raw_(this->reusable_iovs_.data(), this->reusable_iovs_.size(), total_write_len); + // Send all encrypted messages in one writev call + return this->write_raw_(iovs.data(), iovs.size(), total_write_len); } APIError APINoiseFrameHelper::write_frame_(const uint8_t *data, uint16_t len) { diff --git a/esphome/components/api/api_frame_helper_noise.h b/esphome/components/api/api_frame_helper_noise.h index 7eb01058db..1268086194 100644 --- a/esphome/components/api/api_frame_helper_noise.h +++ b/esphome/components/api/api_frame_helper_noise.h @@ -23,7 +23,7 @@ class APINoiseFrameHelper final : public APIFrameHelper { APIError loop() override; APIError read_packet(ReadPacketBuffer *buffer) override; APIError write_protobuf_packet(uint8_t type, ProtoWriteBuffer buffer) override; - APIError write_protobuf_packets(ProtoWriteBuffer buffer, std::span packets) override; + APIError write_protobuf_messages(ProtoWriteBuffer buffer, std::span messages) override; protected: APIError state_action_(); diff --git a/esphome/components/api/api_frame_helper_plaintext.cpp b/esphome/components/api/api_frame_helper_plaintext.cpp index 8b7d002d7c..a974a2458e 100644 --- a/esphome/components/api/api_frame_helper_plaintext.cpp +++ b/esphome/components/api/api_frame_helper_plaintext.cpp @@ -230,29 +230,30 @@ APIError APIPlaintextFrameHelper::read_packet(ReadPacketBuffer *buffer) { return APIError::OK; } APIError APIPlaintextFrameHelper::write_protobuf_packet(uint8_t type, ProtoWriteBuffer buffer) { - PacketInfo packet{type, 0, static_cast(buffer.get_buffer()->size() - frame_header_padding_)}; - return write_protobuf_packets(buffer, std::span(&packet, 1)); + MessageInfo msg{type, 0, static_cast(buffer.get_buffer()->size() - frame_header_padding_)}; + return write_protobuf_messages(buffer, std::span(&msg, 1)); } -APIError APIPlaintextFrameHelper::write_protobuf_packets(ProtoWriteBuffer buffer, std::span packets) { +APIError APIPlaintextFrameHelper::write_protobuf_messages(ProtoWriteBuffer buffer, + std::span messages) { if (state_ != State::DATA) { return APIError::BAD_STATE; } - if (packets.empty()) { + if (messages.empty()) { return APIError::OK; } uint8_t *buffer_data = buffer.get_buffer()->data(); - this->reusable_iovs_.clear(); - this->reusable_iovs_.reserve(packets.size()); + // Stack-allocated iovec array - no heap allocation + StaticVector iovs; uint16_t total_write_len = 0; - for (const auto &packet : packets) { + for (const auto &msg : messages) { // Calculate varint sizes for header layout - uint8_t size_varint_len = api::ProtoSize::varint(static_cast(packet.payload_size)); - uint8_t type_varint_len = api::ProtoSize::varint(static_cast(packet.message_type)); + uint8_t size_varint_len = api::ProtoSize::varint(static_cast(msg.payload_size)); + uint8_t type_varint_len = api::ProtoSize::varint(static_cast(msg.message_type)); uint8_t total_header_len = 1 + size_varint_len + type_varint_len; // Calculate where to start writing the header @@ -280,25 +281,25 @@ APIError APIPlaintextFrameHelper::write_protobuf_packets(ProtoWriteBuffer buffer // // The message starts at offset + frame_header_padding_ // So we write the header starting at offset + frame_header_padding_ - total_header_len - uint8_t *buf_start = buffer_data + packet.offset; + uint8_t *buf_start = buffer_data + msg.offset; uint32_t header_offset = frame_header_padding_ - total_header_len; // Write the plaintext header buf_start[header_offset] = 0x00; // indicator // Encode varints directly into buffer - ProtoVarInt(packet.payload_size).encode_to_buffer_unchecked(buf_start + header_offset + 1, size_varint_len); - ProtoVarInt(packet.message_type) + ProtoVarInt(msg.payload_size).encode_to_buffer_unchecked(buf_start + header_offset + 1, size_varint_len); + ProtoVarInt(msg.message_type) .encode_to_buffer_unchecked(buf_start + header_offset + 1 + size_varint_len, type_varint_len); - // Add iovec for this packet (header + payload) - size_t packet_len = static_cast(total_header_len + packet.payload_size); - this->reusable_iovs_.push_back({buf_start + header_offset, packet_len}); - total_write_len += packet_len; + // Add iovec for this message (header + payload) + size_t msg_len = static_cast(total_header_len + msg.payload_size); + iovs.push_back({buf_start + header_offset, msg_len}); + total_write_len += msg_len; } - // Send all packets in one writev call - return write_raw_(this->reusable_iovs_.data(), this->reusable_iovs_.size(), total_write_len); + // Send all messages in one writev call + return write_raw_(iovs.data(), iovs.size(), total_write_len); } } // namespace esphome::api diff --git a/esphome/components/api/api_frame_helper_plaintext.h b/esphome/components/api/api_frame_helper_plaintext.h index bba981d26b..7af9fc64b9 100644 --- a/esphome/components/api/api_frame_helper_plaintext.h +++ b/esphome/components/api/api_frame_helper_plaintext.h @@ -21,7 +21,7 @@ class APIPlaintextFrameHelper final : public APIFrameHelper { APIError loop() override; APIError read_packet(ReadPacketBuffer *buffer) override; APIError write_protobuf_packet(uint8_t type, ProtoWriteBuffer buffer) override; - APIError write_protobuf_packets(ProtoWriteBuffer buffer, std::span packets) override; + APIError write_protobuf_messages(ProtoWriteBuffer buffer, std::span messages) override; protected: APIError try_read_frame_(); diff --git a/esphome/core/helpers.h b/esphome/core/helpers.h index f7a14ed2ec..6c338797a9 100644 --- a/esphome/core/helpers.h +++ b/esphome/core/helpers.h @@ -162,6 +162,10 @@ template class StaticVector { size_t size() const { return count_; } bool empty() const { return count_ == 0; } + // Direct access to underlying data + T *data() { return data_.data(); } + const T *data() const { return data_.data(); } + T &operator[](size_t i) { return data_[i]; } const T &operator[](size_t i) const { return data_[i]; }