mirror of
https://github.com/esphome/esphome.git
synced 2025-10-04 02:52:22 +01:00
Allow disabling API batch delay for real-time state updates (#9298)
This commit is contained in:
@@ -90,19 +90,6 @@ APIConnection::~APIConnection() {
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef HAS_PROTO_MESSAGE_DUMP
|
||||
void APIConnection::log_batch_item_(const DeferredBatch::BatchItem &item) {
|
||||
// Set log-only mode
|
||||
this->flags_.log_only_mode = true;
|
||||
|
||||
// Call the creator - it will create the message and log it via encode_message_to_buffer
|
||||
item.creator(item.entity, this, std::numeric_limits<uint16_t>::max(), true, item.message_type);
|
||||
|
||||
// Clear log-only mode
|
||||
this->flags_.log_only_mode = false;
|
||||
}
|
||||
#endif
|
||||
|
||||
void APIConnection::loop() {
|
||||
if (this->flags_.next_close) {
|
||||
// requested a disconnect
|
||||
@@ -154,15 +141,25 @@ void APIConnection::loop() {
|
||||
}
|
||||
}
|
||||
|
||||
// Process deferred batch if scheduled
|
||||
// Process deferred batch if scheduled and timer has expired
|
||||
if (this->flags_.batch_scheduled && now - this->deferred_batch_.batch_start_time >= this->get_batch_delay_ms_()) {
|
||||
this->process_batch_();
|
||||
}
|
||||
|
||||
if (!this->list_entities_iterator_.completed()) {
|
||||
this->list_entities_iterator_.advance();
|
||||
this->process_iterator_batch_(this->list_entities_iterator_);
|
||||
} else if (!this->initial_state_iterator_.completed()) {
|
||||
this->initial_state_iterator_.advance();
|
||||
this->process_iterator_batch_(this->initial_state_iterator_);
|
||||
|
||||
// If we've completed initial states, process any remaining and clear the flag
|
||||
if (this->initial_state_iterator_.completed()) {
|
||||
// Process any remaining batched messages immediately
|
||||
if (!this->deferred_batch_.empty()) {
|
||||
this->process_batch_();
|
||||
}
|
||||
// Now that everything is sent, enable immediate sending for future state changes
|
||||
this->flags_.should_try_send_immediately = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (this->flags_.sent_ping) {
|
||||
@@ -300,8 +297,8 @@ uint16_t APIConnection::encode_message_to_buffer(ProtoMessage &msg, uint16_t mes
|
||||
|
||||
#ifdef USE_BINARY_SENSOR
|
||||
bool APIConnection::send_binary_sensor_state(binary_sensor::BinarySensor *binary_sensor) {
|
||||
return this->schedule_message_(binary_sensor, &APIConnection::try_send_binary_sensor_state,
|
||||
BinarySensorStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(binary_sensor, &APIConnection::try_send_binary_sensor_state,
|
||||
BinarySensorStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
|
||||
uint16_t APIConnection::try_send_binary_sensor_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
@@ -328,7 +325,7 @@ uint16_t APIConnection::try_send_binary_sensor_info(EntityBase *entity, APIConne
|
||||
|
||||
#ifdef USE_COVER
|
||||
bool APIConnection::send_cover_state(cover::Cover *cover) {
|
||||
return this->schedule_message_(cover, &APIConnection::try_send_cover_state, CoverStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(cover, &APIConnection::try_send_cover_state, CoverStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
uint16_t APIConnection::try_send_cover_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
bool is_single) {
|
||||
@@ -389,7 +386,7 @@ void APIConnection::cover_command(const CoverCommandRequest &msg) {
|
||||
|
||||
#ifdef USE_FAN
|
||||
bool APIConnection::send_fan_state(fan::Fan *fan) {
|
||||
return this->schedule_message_(fan, &APIConnection::try_send_fan_state, FanStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(fan, &APIConnection::try_send_fan_state, FanStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
uint16_t APIConnection::try_send_fan_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
bool is_single) {
|
||||
@@ -448,7 +445,7 @@ void APIConnection::fan_command(const FanCommandRequest &msg) {
|
||||
|
||||
#ifdef USE_LIGHT
|
||||
bool APIConnection::send_light_state(light::LightState *light) {
|
||||
return this->schedule_message_(light, &APIConnection::try_send_light_state, LightStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(light, &APIConnection::try_send_light_state, LightStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
uint16_t APIConnection::try_send_light_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
bool is_single) {
|
||||
@@ -540,7 +537,7 @@ void APIConnection::light_command(const LightCommandRequest &msg) {
|
||||
|
||||
#ifdef USE_SENSOR
|
||||
bool APIConnection::send_sensor_state(sensor::Sensor *sensor) {
|
||||
return this->schedule_message_(sensor, &APIConnection::try_send_sensor_state, SensorStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(sensor, &APIConnection::try_send_sensor_state, SensorStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
|
||||
uint16_t APIConnection::try_send_sensor_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
@@ -572,7 +569,7 @@ uint16_t APIConnection::try_send_sensor_info(EntityBase *entity, APIConnection *
|
||||
|
||||
#ifdef USE_SWITCH
|
||||
bool APIConnection::send_switch_state(switch_::Switch *a_switch) {
|
||||
return this->schedule_message_(a_switch, &APIConnection::try_send_switch_state, SwitchStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(a_switch, &APIConnection::try_send_switch_state, SwitchStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
|
||||
uint16_t APIConnection::try_send_switch_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
@@ -609,8 +606,8 @@ void APIConnection::switch_command(const SwitchCommandRequest &msg) {
|
||||
|
||||
#ifdef USE_TEXT_SENSOR
|
||||
bool APIConnection::send_text_sensor_state(text_sensor::TextSensor *text_sensor) {
|
||||
return this->schedule_message_(text_sensor, &APIConnection::try_send_text_sensor_state,
|
||||
TextSensorStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(text_sensor, &APIConnection::try_send_text_sensor_state,
|
||||
TextSensorStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
|
||||
uint16_t APIConnection::try_send_text_sensor_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
@@ -637,7 +634,7 @@ uint16_t APIConnection::try_send_text_sensor_info(EntityBase *entity, APIConnect
|
||||
|
||||
#ifdef USE_CLIMATE
|
||||
bool APIConnection::send_climate_state(climate::Climate *climate) {
|
||||
return this->schedule_message_(climate, &APIConnection::try_send_climate_state, ClimateStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(climate, &APIConnection::try_send_climate_state, ClimateStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
uint16_t APIConnection::try_send_climate_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
bool is_single) {
|
||||
@@ -737,7 +734,7 @@ void APIConnection::climate_command(const ClimateCommandRequest &msg) {
|
||||
|
||||
#ifdef USE_NUMBER
|
||||
bool APIConnection::send_number_state(number::Number *number) {
|
||||
return this->schedule_message_(number, &APIConnection::try_send_number_state, NumberStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(number, &APIConnection::try_send_number_state, NumberStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
|
||||
uint16_t APIConnection::try_send_number_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
@@ -777,7 +774,7 @@ void APIConnection::number_command(const NumberCommandRequest &msg) {
|
||||
|
||||
#ifdef USE_DATETIME_DATE
|
||||
bool APIConnection::send_date_state(datetime::DateEntity *date) {
|
||||
return this->schedule_message_(date, &APIConnection::try_send_date_state, DateStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(date, &APIConnection::try_send_date_state, DateStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
uint16_t APIConnection::try_send_date_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
bool is_single) {
|
||||
@@ -811,7 +808,7 @@ void APIConnection::date_command(const DateCommandRequest &msg) {
|
||||
|
||||
#ifdef USE_DATETIME_TIME
|
||||
bool APIConnection::send_time_state(datetime::TimeEntity *time) {
|
||||
return this->schedule_message_(time, &APIConnection::try_send_time_state, TimeStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(time, &APIConnection::try_send_time_state, TimeStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
uint16_t APIConnection::try_send_time_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
bool is_single) {
|
||||
@@ -845,8 +842,8 @@ void APIConnection::time_command(const TimeCommandRequest &msg) {
|
||||
|
||||
#ifdef USE_DATETIME_DATETIME
|
||||
bool APIConnection::send_datetime_state(datetime::DateTimeEntity *datetime) {
|
||||
return this->schedule_message_(datetime, &APIConnection::try_send_datetime_state,
|
||||
DateTimeStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(datetime, &APIConnection::try_send_datetime_state,
|
||||
DateTimeStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
uint16_t APIConnection::try_send_datetime_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
bool is_single) {
|
||||
@@ -881,7 +878,7 @@ void APIConnection::datetime_command(const DateTimeCommandRequest &msg) {
|
||||
|
||||
#ifdef USE_TEXT
|
||||
bool APIConnection::send_text_state(text::Text *text) {
|
||||
return this->schedule_message_(text, &APIConnection::try_send_text_state, TextStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(text, &APIConnection::try_send_text_state, TextStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
|
||||
uint16_t APIConnection::try_send_text_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
@@ -919,7 +916,7 @@ void APIConnection::text_command(const TextCommandRequest &msg) {
|
||||
|
||||
#ifdef USE_SELECT
|
||||
bool APIConnection::send_select_state(select::Select *select) {
|
||||
return this->schedule_message_(select, &APIConnection::try_send_select_state, SelectStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(select, &APIConnection::try_send_select_state, SelectStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
|
||||
uint16_t APIConnection::try_send_select_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
@@ -974,7 +971,7 @@ void esphome::api::APIConnection::button_command(const ButtonCommandRequest &msg
|
||||
|
||||
#ifdef USE_LOCK
|
||||
bool APIConnection::send_lock_state(lock::Lock *a_lock) {
|
||||
return this->schedule_message_(a_lock, &APIConnection::try_send_lock_state, LockStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(a_lock, &APIConnection::try_send_lock_state, LockStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
|
||||
uint16_t APIConnection::try_send_lock_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
@@ -1018,7 +1015,7 @@ void APIConnection::lock_command(const LockCommandRequest &msg) {
|
||||
|
||||
#ifdef USE_VALVE
|
||||
bool APIConnection::send_valve_state(valve::Valve *valve) {
|
||||
return this->schedule_message_(valve, &APIConnection::try_send_valve_state, ValveStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(valve, &APIConnection::try_send_valve_state, ValveStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
uint16_t APIConnection::try_send_valve_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
bool is_single) {
|
||||
@@ -1058,8 +1055,8 @@ void APIConnection::valve_command(const ValveCommandRequest &msg) {
|
||||
|
||||
#ifdef USE_MEDIA_PLAYER
|
||||
bool APIConnection::send_media_player_state(media_player::MediaPlayer *media_player) {
|
||||
return this->schedule_message_(media_player, &APIConnection::try_send_media_player_state,
|
||||
MediaPlayerStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(media_player, &APIConnection::try_send_media_player_state,
|
||||
MediaPlayerStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
uint16_t APIConnection::try_send_media_player_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
bool is_single) {
|
||||
@@ -1320,8 +1317,8 @@ void APIConnection::voice_assistant_set_configuration(const VoiceAssistantSetCon
|
||||
|
||||
#ifdef USE_ALARM_CONTROL_PANEL
|
||||
bool APIConnection::send_alarm_control_panel_state(alarm_control_panel::AlarmControlPanel *a_alarm_control_panel) {
|
||||
return this->schedule_message_(a_alarm_control_panel, &APIConnection::try_send_alarm_control_panel_state,
|
||||
AlarmControlPanelStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(a_alarm_control_panel, &APIConnection::try_send_alarm_control_panel_state,
|
||||
AlarmControlPanelStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
uint16_t APIConnection::try_send_alarm_control_panel_state(EntityBase *entity, APIConnection *conn,
|
||||
uint32_t remaining_size, bool is_single) {
|
||||
@@ -1404,7 +1401,7 @@ uint16_t APIConnection::try_send_event_info(EntityBase *entity, APIConnection *c
|
||||
|
||||
#ifdef USE_UPDATE
|
||||
bool APIConnection::send_update_state(update::UpdateEntity *update) {
|
||||
return this->schedule_message_(update, &APIConnection::try_send_update_state, UpdateStateResponse::MESSAGE_TYPE);
|
||||
return this->send_message_smart_(update, &APIConnection::try_send_update_state, UpdateStateResponse::MESSAGE_TYPE);
|
||||
}
|
||||
uint16_t APIConnection::try_send_update_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
bool is_single) {
|
||||
@@ -1751,11 +1748,16 @@ void APIConnection::process_batch_() {
|
||||
|
||||
if (payload_size > 0 &&
|
||||
this->send_buffer(ProtoWriteBuffer{&this->parent_->get_shared_buffer_ref()}, item.message_type)) {
|
||||
this->deferred_batch_.clear();
|
||||
#ifdef HAS_PROTO_MESSAGE_DUMP
|
||||
// Log messages after send attempt for VV debugging
|
||||
// It's safe to use the buffer for logging at this point regardless of send result
|
||||
this->log_batch_item_(item);
|
||||
#endif
|
||||
this->clear_batch_();
|
||||
} else if (payload_size == 0) {
|
||||
// Message too large
|
||||
ESP_LOGW(TAG, "Message too large to send: type=%u", item.message_type);
|
||||
this->deferred_batch_.clear();
|
||||
this->clear_batch_();
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -1864,7 +1866,7 @@ void APIConnection::process_batch_() {
|
||||
this->schedule_batch_();
|
||||
} else {
|
||||
// All items processed
|
||||
this->deferred_batch_.clear();
|
||||
this->clear_batch_();
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -18,6 +18,8 @@ namespace api {
|
||||
|
||||
// Keepalive timeout in milliseconds
|
||||
static constexpr uint32_t KEEPALIVE_TIMEOUT_MS = 60000;
|
||||
// Maximum number of entities to process in a single batch during initial state/info sending
|
||||
static constexpr size_t MAX_INITIAL_PER_BATCH = 20;
|
||||
|
||||
class APIConnection : public APIServerConnection {
|
||||
public:
|
||||
@@ -296,6 +298,20 @@ class APIConnection : public APIServerConnection {
|
||||
static uint16_t encode_message_to_buffer(ProtoMessage &msg, uint16_t message_type, APIConnection *conn,
|
||||
uint32_t remaining_size, bool is_single);
|
||||
|
||||
// Helper method to process multiple entities from an iterator in a batch
|
||||
template<typename Iterator> void process_iterator_batch_(Iterator &iterator) {
|
||||
size_t initial_size = this->deferred_batch_.size();
|
||||
while (!iterator.completed() && (this->deferred_batch_.size() - initial_size) < MAX_INITIAL_PER_BATCH) {
|
||||
iterator.advance();
|
||||
}
|
||||
|
||||
// If the batch is full, process it immediately
|
||||
// Note: iterator.advance() already calls schedule_batch_() via schedule_message_()
|
||||
if (this->deferred_batch_.size() >= MAX_INITIAL_PER_BATCH) {
|
||||
this->process_batch_();
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef USE_BINARY_SENSOR
|
||||
static uint16_t try_send_binary_sensor_state(EntityBase *entity, APIConnection *conn, uint32_t remaining_size,
|
||||
bool is_single);
|
||||
@@ -582,7 +598,8 @@ class APIConnection : public APIServerConnection {
|
||||
uint8_t service_call_subscription : 1;
|
||||
uint8_t next_close : 1;
|
||||
uint8_t batch_scheduled : 1;
|
||||
uint8_t batch_first_message : 1; // For batch buffer allocation
|
||||
uint8_t batch_first_message : 1; // For batch buffer allocation
|
||||
uint8_t should_try_send_immediately : 1; // True after initial states are sent
|
||||
#ifdef HAS_PROTO_MESSAGE_DUMP
|
||||
uint8_t log_only_mode : 1;
|
||||
#endif
|
||||
@@ -609,11 +626,50 @@ class APIConnection : public APIServerConnection {
|
||||
|
||||
bool schedule_batch_();
|
||||
void process_batch_();
|
||||
void clear_batch_() {
|
||||
this->deferred_batch_.clear();
|
||||
this->flags_.batch_scheduled = false;
|
||||
}
|
||||
|
||||
#ifdef HAS_PROTO_MESSAGE_DUMP
|
||||
void log_batch_item_(const DeferredBatch::BatchItem &item);
|
||||
// Helper to log a proto message from a MessageCreator object
|
||||
void log_proto_message_(EntityBase *entity, const MessageCreator &creator, uint16_t message_type) {
|
||||
this->flags_.log_only_mode = true;
|
||||
creator(entity, this, MAX_PACKET_SIZE, true, message_type);
|
||||
this->flags_.log_only_mode = false;
|
||||
}
|
||||
|
||||
void log_batch_item_(const DeferredBatch::BatchItem &item) {
|
||||
// Use the helper to log the message
|
||||
this->log_proto_message_(item.entity, item.creator, item.message_type);
|
||||
}
|
||||
#endif
|
||||
|
||||
// Helper method to send a message either immediately or via batching
|
||||
bool send_message_smart_(EntityBase *entity, MessageCreatorPtr creator, uint16_t message_type) {
|
||||
// Try to send immediately if:
|
||||
// 1. We should try to send immediately (should_try_send_immediately = true)
|
||||
// 2. Batch delay is 0 (user has opted in to immediate sending)
|
||||
// 3. Buffer has space available
|
||||
if (this->flags_.should_try_send_immediately && this->get_batch_delay_ms_() == 0 &&
|
||||
this->helper_->can_write_without_blocking()) {
|
||||
// Now actually encode and send
|
||||
if (creator(entity, this, MAX_PACKET_SIZE, true) &&
|
||||
this->send_buffer(ProtoWriteBuffer{&this->parent_->get_shared_buffer_ref()}, message_type)) {
|
||||
#ifdef HAS_PROTO_MESSAGE_DUMP
|
||||
// Log the message in verbose mode
|
||||
this->log_proto_message_(entity, MessageCreator(creator), message_type);
|
||||
#endif
|
||||
return true;
|
||||
}
|
||||
|
||||
// If immediate send failed, fall through to batching
|
||||
}
|
||||
|
||||
// Fall back to scheduled batching
|
||||
return this->schedule_message_(entity, creator, message_type);
|
||||
}
|
||||
|
||||
// Helper function to schedule a deferred message with known message type
|
||||
bool schedule_message_(EntityBase *entity, MessageCreator creator, uint16_t message_type) {
|
||||
this->deferred_batch_.add_item(entity, std::move(creator), message_type);
|
||||
|
Reference in New Issue
Block a user