mirror of
https://github.com/esphome/esphome.git
synced 2025-10-24 04:33:49 +01:00
zero copy cleanup
This commit is contained in:
@@ -731,7 +731,7 @@ message SubscribeLogsResponse {
|
||||
option (no_delay) = false;
|
||||
|
||||
LogLevel level = 1;
|
||||
bytes message = 3;
|
||||
bytes message = 3 [(zero_copy) = true];
|
||||
bool send_failed = 4;
|
||||
}
|
||||
|
||||
@@ -888,7 +888,7 @@ message CameraImageResponse {
|
||||
option (ifdef) = "USE_CAMERA";
|
||||
|
||||
fixed32 key = 1;
|
||||
bytes data = 2;
|
||||
bytes data = 2 [(zero_copy) = true];
|
||||
bool done = 3;
|
||||
uint32 device_id = 4 [(field_ifdef) = "USE_DEVICES"];
|
||||
}
|
||||
|
@@ -225,22 +225,13 @@ void APIConnection::loop() {
|
||||
if (this->image_reader_ && this->image_reader_->available() && this->helper_->can_write_without_blocking()) {
|
||||
uint32_t to_send = std::min((size_t) MAX_BATCH_PACKET_SIZE, this->image_reader_->available());
|
||||
bool done = this->image_reader_->available() == to_send;
|
||||
uint32_t msg_size = 0;
|
||||
ProtoSize::add_fixed_field<4>(msg_size, 1, true);
|
||||
// partial message size calculated manually since its a special case
|
||||
// 1 for the data field, varint for the data size, and the data itself
|
||||
msg_size += 1 + ProtoSize::varint(to_send) + to_send;
|
||||
ProtoSize::add_bool_field(msg_size, 1, done);
|
||||
|
||||
auto buffer = this->create_buffer(msg_size);
|
||||
// fixed32 key = 1;
|
||||
buffer.encode_fixed32(1, camera::Camera::instance()->get_object_id_hash());
|
||||
// bytes data = 2;
|
||||
buffer.encode_bytes(2, this->image_reader_->peek_data_buffer(), to_send);
|
||||
// bool done = 3;
|
||||
buffer.encode_bool(3, done);
|
||||
CameraImageResponse msg;
|
||||
msg.key = camera::Camera::instance()->get_object_id_hash();
|
||||
msg.set_data(this->image_reader_->peek_data_buffer(), to_send);
|
||||
msg.done = done;
|
||||
|
||||
bool success = this->send_buffer(buffer, CameraImageResponse::MESSAGE_TYPE);
|
||||
bool success = this->send_message_(msg, CameraImageResponse::MESSAGE_TYPE);
|
||||
|
||||
if (success) {
|
||||
this->image_reader_->consume_data(to_send);
|
||||
@@ -1350,26 +1341,12 @@ void APIConnection::update_command(const UpdateCommandRequest &msg) {
|
||||
#endif
|
||||
|
||||
bool APIConnection::try_send_log_message(int level, const char *tag, const char *line, size_t message_len) {
|
||||
// Pre-calculate message size to avoid reallocations
|
||||
uint32_t msg_size = 0;
|
||||
SubscribeLogsResponse msg;
|
||||
msg.level = static_cast<enums::LogLevel>(level);
|
||||
msg.set_message(reinterpret_cast<const uint8_t *>(line), message_len);
|
||||
msg.send_failed = false;
|
||||
|
||||
// Add size for level field (field ID 1, varint type)
|
||||
// 1 byte for field tag + size of the level varint
|
||||
msg_size += 1 + api::ProtoSize::varint(static_cast<uint32_t>(level));
|
||||
|
||||
// Add size for string field (field ID 3, string type)
|
||||
// 1 byte for field tag + size of length varint + string length
|
||||
msg_size += 1 + api::ProtoSize::varint(static_cast<uint32_t>(message_len)) + message_len;
|
||||
|
||||
// Create a pre-sized buffer
|
||||
auto buffer = this->create_buffer(msg_size);
|
||||
|
||||
// Encode the message (SubscribeLogsResponse)
|
||||
buffer.encode_uint32(1, static_cast<uint32_t>(level)); // LogLevel level = 1
|
||||
buffer.encode_string(3, line, message_len); // string message = 3
|
||||
|
||||
// SubscribeLogsResponse - 29
|
||||
return this->send_buffer(buffer, SubscribeLogsResponse::MESSAGE_TYPE);
|
||||
return this->send_message_(msg, SubscribeLogsResponse::MESSAGE_TYPE);
|
||||
}
|
||||
|
||||
void APIConnection::complete_authentication_() {
|
||||
|
@@ -27,4 +27,5 @@ extend google.protobuf.MessageOptions {
|
||||
extend google.protobuf.FieldOptions {
|
||||
optional string field_ifdef = 1042;
|
||||
optional uint32 fixed_array_size = 50007;
|
||||
optional bool zero_copy = 1043 [default=false];
|
||||
}
|
||||
|
@@ -818,12 +818,12 @@ bool SubscribeLogsRequest::decode_varint(uint32_t field_id, ProtoVarInt value) {
|
||||
}
|
||||
void SubscribeLogsResponse::encode(ProtoWriteBuffer buffer) const {
|
||||
buffer.encode_uint32(1, static_cast<uint32_t>(this->level));
|
||||
buffer.encode_bytes(3, reinterpret_cast<const uint8_t *>(this->message.data()), this->message.size());
|
||||
buffer.encode_bytes(3, this->message_ptr_, this->message_len_);
|
||||
buffer.encode_bool(4, this->send_failed);
|
||||
}
|
||||
void SubscribeLogsResponse::calculate_size(uint32_t &total_size) const {
|
||||
ProtoSize::add_enum_field(total_size, 1, static_cast<uint32_t>(this->level));
|
||||
ProtoSize::add_string_field(total_size, 1, this->message);
|
||||
ProtoSize::add_bytes_field(total_size, 1, this->message_len_);
|
||||
ProtoSize::add_bool_field(total_size, 1, this->send_failed);
|
||||
}
|
||||
#ifdef USE_API_NOISE
|
||||
@@ -1030,7 +1030,7 @@ void ListEntitiesCameraResponse::calculate_size(uint32_t &total_size) const {
|
||||
}
|
||||
void CameraImageResponse::encode(ProtoWriteBuffer buffer) const {
|
||||
buffer.encode_fixed32(1, this->key);
|
||||
buffer.encode_bytes(2, reinterpret_cast<const uint8_t *>(this->data.data()), this->data.size());
|
||||
buffer.encode_bytes(2, this->data_ptr_, this->data_len_);
|
||||
buffer.encode_bool(3, this->done);
|
||||
#ifdef USE_DEVICES
|
||||
buffer.encode_uint32(4, this->device_id);
|
||||
@@ -1038,7 +1038,7 @@ void CameraImageResponse::encode(ProtoWriteBuffer buffer) const {
|
||||
}
|
||||
void CameraImageResponse::calculate_size(uint32_t &total_size) const {
|
||||
ProtoSize::add_fixed32_field(total_size, 1, this->key);
|
||||
ProtoSize::add_string_field(total_size, 1, this->data);
|
||||
ProtoSize::add_bytes_field(total_size, 1, this->data_len_);
|
||||
ProtoSize::add_bool_field(total_size, 1, this->done);
|
||||
#ifdef USE_DEVICES
|
||||
ProtoSize::add_uint32_field(total_size, 1, this->device_id);
|
||||
|
@@ -968,7 +968,12 @@ class SubscribeLogsResponse : public ProtoMessage {
|
||||
const char *message_name() const override { return "subscribe_logs_response"; }
|
||||
#endif
|
||||
enums::LogLevel level{};
|
||||
std::string message{};
|
||||
const uint8_t *message_ptr_{nullptr};
|
||||
size_t message_len_{0};
|
||||
void set_message(const uint8_t *data, size_t len) {
|
||||
this->message_ptr_ = data;
|
||||
this->message_len_ = len;
|
||||
}
|
||||
bool send_failed{false};
|
||||
void encode(ProtoWriteBuffer buffer) const override;
|
||||
void calculate_size(uint32_t &total_size) const override;
|
||||
@@ -1226,7 +1231,12 @@ class CameraImageResponse : public StateResponseProtoMessage {
|
||||
#ifdef HAS_PROTO_MESSAGE_DUMP
|
||||
const char *message_name() const override { return "camera_image_response"; }
|
||||
#endif
|
||||
std::string data{};
|
||||
const uint8_t *data_ptr_{nullptr};
|
||||
size_t data_len_{0};
|
||||
void set_data(const uint8_t *data, size_t len) {
|
||||
this->data_ptr_ = data;
|
||||
this->data_len_ = len;
|
||||
}
|
||||
bool done{false};
|
||||
void encode(ProtoWriteBuffer buffer) const override;
|
||||
void calculate_size(uint32_t &total_size) const override;
|
||||
|
@@ -1666,7 +1666,7 @@ void SubscribeLogsResponse::dump_to(std::string &out) const {
|
||||
out.append("\n");
|
||||
|
||||
out.append(" message: ");
|
||||
out.append(format_hex_pretty(this->message));
|
||||
out.append(format_hex_pretty(this->message_ptr_, this->message_len_));
|
||||
out.append("\n");
|
||||
|
||||
out.append(" send_failed: ");
|
||||
@@ -1932,7 +1932,7 @@ void CameraImageResponse::dump_to(std::string &out) const {
|
||||
out.append("\n");
|
||||
|
||||
out.append(" data: ");
|
||||
out.append(format_hex_pretty(this->data));
|
||||
out.append(format_hex_pretty(this->data_ptr_, this->data_len_));
|
||||
out.append("\n");
|
||||
|
||||
out.append(" done: ");
|
||||
|
@@ -527,25 +527,6 @@ class ProtoSize {
|
||||
total_size += field_id_size + 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Calculates and adds the size of a fixed field to the total message size
|
||||
*
|
||||
* Fixed fields always take exactly N bytes (4 for fixed32/float, 8 for fixed64/double).
|
||||
*
|
||||
* @tparam NumBytes The number of bytes for this fixed field (4 or 8)
|
||||
* @param is_nonzero Whether the value is non-zero
|
||||
*/
|
||||
template<uint32_t NumBytes>
|
||||
static inline void add_fixed_field(uint32_t &total_size, uint32_t field_id_size, bool is_nonzero) {
|
||||
// Skip calculation if value is zero
|
||||
if (!is_nonzero) {
|
||||
return; // No need to update total_size
|
||||
}
|
||||
|
||||
// Fixed fields always take exactly NumBytes
|
||||
total_size += field_id_size + NumBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Calculates and adds the size of a float field to the total message size
|
||||
*/
|
||||
@@ -704,6 +685,19 @@ class ProtoSize {
|
||||
total_size += field_id_size + varint(str_size) + str_size;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Calculates and adds the size of a bytes field to the total message size
|
||||
*/
|
||||
static inline void add_bytes_field(uint32_t &total_size, uint32_t field_id_size, size_t len) {
|
||||
// Skip calculation if bytes is empty
|
||||
if (len == 0) {
|
||||
return; // No need to update total_size
|
||||
}
|
||||
|
||||
// Field ID + length varint + data bytes
|
||||
total_size += field_id_size + varint(static_cast<uint32_t>(len)) + static_cast<uint32_t>(len);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Calculates and adds the size of a nested message field to the total message size
|
||||
*
|
||||
|
@@ -325,6 +325,10 @@ def create_field_type_info(field: descriptor.FieldDescriptorProto) -> TypeInfo:
|
||||
):
|
||||
return FixedArrayBytesType(field, fixed_size)
|
||||
|
||||
# Check for zero_copy option on bytes fields
|
||||
if field.type == 12 and get_field_opt(field, pb.zero_copy, default=False):
|
||||
return ZeroCopyBytesType(field)
|
||||
|
||||
validate_field_type(field.type, field.name)
|
||||
return TYPE_INFO[field.type](field)
|
||||
|
||||
@@ -608,6 +612,45 @@ class BytesType(TypeInfo):
|
||||
return self.calculate_field_id_size() + 8 # field ID + 8 bytes typical bytes
|
||||
|
||||
|
||||
class ZeroCopyBytesType(TypeInfo):
|
||||
"""Special type for zero-copy bytes fields that only accepts const uint8_t* data."""
|
||||
|
||||
cpp_type = "std::string" # Still store as string for compatibility
|
||||
default_value = ""
|
||||
reference_type = "std::string &"
|
||||
const_reference_type = "const std::string &"
|
||||
encode_func = "encode_bytes"
|
||||
wire_type = WireType.LENGTH_DELIMITED
|
||||
decode_length = "value.as_string()"
|
||||
|
||||
@property
|
||||
def public_content(self) -> list[str]:
|
||||
# Store both pointer and length for zero-copy encoding, plus setter method
|
||||
return [
|
||||
f"const uint8_t* {self.field_name}_ptr_{{nullptr}};",
|
||||
f"size_t {self.field_name}_len_{{0}};",
|
||||
f"void set_{self.field_name}(const uint8_t* data, size_t len) {{",
|
||||
f" this->{self.field_name}_ptr_ = data;",
|
||||
f" this->{self.field_name}_len_ = len;",
|
||||
"}",
|
||||
]
|
||||
|
||||
@property
|
||||
def encode_content(self) -> str:
|
||||
# Encode directly from pointer without nullptr check (like original)
|
||||
return f"buffer.encode_bytes({self.number}, this->{self.field_name}_ptr_, this->{self.field_name}_len_);"
|
||||
|
||||
def dump(self, name: str) -> str:
|
||||
return f"out.append(format_hex_pretty(this->{self.field_name}_ptr_, this->{self.field_name}_len_));"
|
||||
|
||||
def get_size_calculation(self, name: str, force: bool = False) -> str:
|
||||
# Use the new add_bytes_field helper
|
||||
return f"ProtoSize::add_bytes_field(total_size, {self.calculate_field_id_size()}, this->{self.field_name}_len_);"
|
||||
|
||||
def get_estimated_size(self) -> int:
|
||||
return self.calculate_field_id_size() + 8 # field ID + 8 bytes typical bytes
|
||||
|
||||
|
||||
class FixedArrayBytesType(TypeInfo):
|
||||
"""Special type for fixed-size byte arrays."""
|
||||
|
||||
|
Reference in New Issue
Block a user