1
0
mirror of https://github.com/esphome/esphome.git synced 2025-09-01 10:52:19 +01:00

[bluetooth_proxy] Eliminate heap allocations in connection state reporting (#10010)

This commit is contained in:
J. Nick Koston
2025-08-01 20:26:00 -10:00
committed by GitHub
parent f1877ca084
commit 00d9baed11
13 changed files with 104 additions and 40 deletions

View File

@@ -342,6 +342,11 @@ def create_field_type_info(
# Check if this repeated field has fixed_array_size option
if (fixed_size := get_field_opt(field, pb.fixed_array_size)) is not None:
return FixedArrayRepeatedType(field, fixed_size)
# Check if this repeated field has fixed_array_size_define option
if (
size_define := get_field_opt(field, pb.fixed_array_size_define)
) is not None:
return FixedArrayRepeatedType(field, size_define)
return RepeatedTypeInfo(field)
# Check for fixed_array_size option on bytes fields
@@ -1066,9 +1071,10 @@ class FixedArrayRepeatedType(TypeInfo):
control how many items we receive when decoding.
"""
def __init__(self, field: descriptor.FieldDescriptorProto, size: int) -> None:
def __init__(self, field: descriptor.FieldDescriptorProto, size: int | str) -> None:
super().__init__(field)
self.array_size = size
self.is_define = isinstance(size, str)
# Check if we should skip encoding when all elements are zero
# Use getattr to handle older versions of api_options_pb2
self.skip_zero = get_field_opt(
@@ -1113,6 +1119,14 @@ class FixedArrayRepeatedType(TypeInfo):
# If skip_zero is enabled, wrap encoding in a zero check
if self.skip_zero:
if self.is_define:
# When using a define, we need to use a loop-based approach
o = f"for (const auto &it : this->{self.field_name}) {{\n"
o += " if (it != 0) {\n"
o += f" {encode_element('it')}\n"
o += " }\n"
o += "}"
return o
# Build the condition to check if at least one element is non-zero
non_zero_checks = " || ".join(
[f"this->{self.field_name}[{i}] != 0" for i in range(self.array_size)]
@@ -1123,6 +1137,13 @@ class FixedArrayRepeatedType(TypeInfo):
]
return f"if ({non_zero_checks}) {{\n" + "\n".join(encode_lines) + "\n}"
# When using a define, always use loop-based approach
if self.is_define:
o = f"for (const auto &it : this->{self.field_name}) {{\n"
o += f" {encode_element('it')}\n"
o += "}"
return o
# Unroll small arrays for efficiency
if self.array_size == 1:
return encode_element(f"this->{self.field_name}[0]")
@@ -1153,6 +1174,14 @@ class FixedArrayRepeatedType(TypeInfo):
def get_size_calculation(self, name: str, force: bool = False) -> str:
# If skip_zero is enabled, wrap size calculation in a zero check
if self.skip_zero:
if self.is_define:
# When using a define, we need to use a loop-based approach
o = f"for (const auto &it : {name}) {{\n"
o += " if (it != 0) {\n"
o += f" {self._ti.get_size_calculation('it', True)}\n"
o += " }\n"
o += "}"
return o
# Build the condition to check if at least one element is non-zero
non_zero_checks = " || ".join(
[f"{name}[{i}] != 0" for i in range(self.array_size)]
@@ -1163,6 +1192,13 @@ class FixedArrayRepeatedType(TypeInfo):
]
return f"if ({non_zero_checks}) {{\n" + "\n".join(size_lines) + "\n}"
# When using a define, always use loop-based approach
if self.is_define:
o = f"for (const auto &it : {name}) {{\n"
o += f" {self._ti.get_size_calculation('it', True)}\n"
o += "}"
return o
# For fixed arrays, we always encode all elements
# Special case for single-element arrays - no loop needed
@@ -1186,6 +1222,11 @@ class FixedArrayRepeatedType(TypeInfo):
def get_estimated_size(self) -> int:
# For fixed arrays, estimate underlying type size * array size
underlying_size = self._ti.get_estimated_size()
if self.is_define:
# When using a define, we don't know the actual size so just guess 3
# This is only used for documentation and never actually used since
# fixed arrays are only for SOURCE_SERVER (encode-only) messages
return underlying_size * 3
return underlying_size * self.array_size