From 527039211e914a09a7c7b434f57f8d52096c3cab Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sat, 25 Oct 2025 14:53:38 -0700 Subject: [PATCH 1/2] fix off by one --- esphome/components/usb_uart/usb_uart.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/esphome/components/usb_uart/usb_uart.cpp b/esphome/components/usb_uart/usb_uart.cpp index a97db9cefd..f379106a53 100644 --- a/esphome/components/usb_uart/usb_uart.cpp +++ b/esphome/components/usb_uart/usb_uart.cpp @@ -186,9 +186,9 @@ void USBUartComponent::restart_input_(USBUartChannel *channel) { void USBUartComponent::defer_input_retry_(USBUartChannel *channel) { static constexpr uint8_t MAX_INPUT_RETRIES = 10; - // Atomically increment and get previous value - uint8_t retry_count = channel->input_retry_count_.fetch_add(1); - if (retry_count >= MAX_INPUT_RETRIES) { + // Atomically increment and get the NEW value (previous + 1) + uint8_t new_retry_count = channel->input_retry_count_.fetch_add(1) + 1; + if (new_retry_count > MAX_INPUT_RETRIES) { ESP_LOGE(TAG, "Input retry limit reached for channel %d, stopping retries", channel->index_); this->reset_input_state_(channel); return; @@ -250,7 +250,7 @@ void USBUartComponent::start_input(USBUartChannel *channel) { // Atomically check if not started and set to started in one operation bool expected = false; if (!channel->input_started_.compare_exchange_strong(expected, true)) - return; // Already started, another thread won the race + return; // Already started - prevents duplicate transfers from concurrent threads // THREAD CONTEXT: Called from both USB task and main loop threads // - USB task: Immediate restart after successful transfer for continuous data flow // - Main loop: Controlled restart after consuming data (backpressure mechanism) From 2c6b9d38261b1f79b42723004b41c9a85a688045 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sat, 25 Oct 2025 14:56:59 -0700 Subject: [PATCH 2/2] no race window --- esphome/components/usb_uart/usb_uart.cpp | 112 ++++++++++++----------- esphome/components/usb_uart/usb_uart.h | 1 + 2 files changed, 62 insertions(+), 51 deletions(-) diff --git a/esphome/components/usb_uart/usb_uart.cpp b/esphome/components/usb_uart/usb_uart.cpp index f379106a53..661901d0a8 100644 --- a/esphome/components/usb_uart/usb_uart.cpp +++ b/esphome/components/usb_uart/usb_uart.cpp @@ -175,11 +175,66 @@ void USBUartComponent::reset_input_state_(USBUartChannel *channel) { } void USBUartComponent::restart_input_(USBUartChannel *channel) { - // Atomically check if still started and clear it before calling start_input - // This prevents race with concurrent restart attempts from different threads + // Atomically verify it's still started (true) and keep it started + // This prevents the race window of toggling true->false->true bool expected = true; - if (channel->input_started_.compare_exchange_strong(expected, false)) { + if (channel->input_started_.compare_exchange_strong(expected, true)) { + // Still started - do the actual restart work without toggling the flag + this->do_start_input_(channel); + } +} + +void USBUartComponent::do_start_input_(USBUartChannel *channel) { + // This function does the actual work of starting input + // Caller must ensure input_started_ is already set to true + const auto *ep = channel->cdc_dev_.in_ep; + // CALLBACK CONTEXT: This lambda is executed in USB task via transfer_callback + auto callback = [this, channel](const usb_host::TransferStatus &status) { + ESP_LOGV(TAG, "Transfer result: length: %u; status %X", status.data_len, status.error_code); + if (!status.success) { + ESP_LOGE(TAG, "Control transfer failed, status=%s", esp_err_to_name(status.error_code)); + // Transfer failed, slot already released + // Reset state so normal operations can restart later + this->reset_input_state_(channel); + return; + } + + if (!channel->dummy_receiver_ && status.data_len > 0) { + // Allocate a chunk from the pool + UsbDataChunk *chunk = this->chunk_pool_.allocate(); + if (chunk == nullptr) { + // No chunks available - queue is full, data dropped, slot already released + this->usb_data_queue_.increment_dropped_count(); + // Reset state so normal operations can restart later + this->reset_input_state_(channel); + return; + } + + // Copy data to chunk (this is fast, happens in USB task) + memcpy(chunk->data, status.data, status.data_len); + chunk->length = status.data_len; + chunk->channel = channel; + + // Push to lock-free queue for main loop processing + // Push always succeeds because pool size == queue size + this->usb_data_queue_.push(chunk); + } + + // On success, reset retry count and restart input immediately from USB task for performance + // The lock-free queue will handle backpressure + channel->input_retry_count_.store(0); + channel->input_started_.store(false); this->start_input(channel); + }; + // input_started_ already set to true by caller + auto result = this->transfer_in(ep->bEndpointAddress, callback, ep->wMaxPacketSize); + if (result == usb_host::TRANSFER_ERROR_NO_SLOTS) { + // No slots available - defer retry to main loop + this->defer_input_retry_(channel); + } else if (result != usb_host::TRANSFER_OK) { + // Other error (submit failed) - don't retry, just reset state + // Error already logged by transfer_in() + this->reset_input_state_(channel); } } @@ -251,6 +306,7 @@ void USBUartComponent::start_input(USBUartChannel *channel) { bool expected = false; if (!channel->input_started_.compare_exchange_strong(expected, true)) return; // Already started - prevents duplicate transfers from concurrent threads + // THREAD CONTEXT: Called from both USB task and main loop threads // - USB task: Immediate restart after successful transfer for continuous data flow // - Main loop: Controlled restart after consuming data (backpressure mechanism) @@ -261,55 +317,9 @@ void USBUartComponent::start_input(USBUartChannel *channel) { // // The underlying transfer_in() uses lock-free atomic allocation from the // TransferRequest pool, making this multi-threaded access safe - const auto *ep = channel->cdc_dev_.in_ep; - // CALLBACK CONTEXT: This lambda is executed in USB task via transfer_callback - auto callback = [this, channel](const usb_host::TransferStatus &status) { - ESP_LOGV(TAG, "Transfer result: length: %u; status %X", status.data_len, status.error_code); - if (!status.success) { - ESP_LOGE(TAG, "Control transfer failed, status=%s", esp_err_to_name(status.error_code)); - // Transfer failed, slot already released - // Reset state so normal operations can restart later - this->reset_input_state_(channel); - return; - } - if (!channel->dummy_receiver_ && status.data_len > 0) { - // Allocate a chunk from the pool - UsbDataChunk *chunk = this->chunk_pool_.allocate(); - if (chunk == nullptr) { - // No chunks available - queue is full, data dropped, slot already released - this->usb_data_queue_.increment_dropped_count(); - // Reset state so normal operations can restart later - this->reset_input_state_(channel); - return; - } - - // Copy data to chunk (this is fast, happens in USB task) - memcpy(chunk->data, status.data, status.data_len); - chunk->length = status.data_len; - chunk->channel = channel; - - // Push to lock-free queue for main loop processing - // Push always succeeds because pool size == queue size - this->usb_data_queue_.push(chunk); - } - - // On success, reset retry count and restart input immediately from USB task for performance - // The lock-free queue will handle backpressure - channel->input_retry_count_.store(0); - channel->input_started_.store(false); - this->start_input(channel); - }; - // input_started_ already set to true by compare_exchange_strong above - auto result = this->transfer_in(ep->bEndpointAddress, callback, ep->wMaxPacketSize); - if (result == usb_host::TRANSFER_ERROR_NO_SLOTS) { - // No slots available - defer retry to main loop - this->defer_input_retry_(channel); - } else if (result != usb_host::TRANSFER_OK) { - // Other error (submit failed) - don't retry, just reset state - // Error already logged by transfer_in() - this->reset_input_state_(channel); - } + // Do the actual work (input_started_ already set to true by CAS above) + this->do_start_input_(channel); } void USBUartComponent::start_output(USBUartChannel *channel) { diff --git a/esphome/components/usb_uart/usb_uart.h b/esphome/components/usb_uart/usb_uart.h index 62b96b7faa..330cb119bf 100644 --- a/esphome/components/usb_uart/usb_uart.h +++ b/esphome/components/usb_uart/usb_uart.h @@ -144,6 +144,7 @@ class USBUartComponent : public usb_host::USBClient { void defer_input_retry_(USBUartChannel *channel); void reset_input_state_(USBUartChannel *channel); void restart_input_(USBUartChannel *channel); + void do_start_input_(USBUartChannel *channel); std::vector channels_{}; };