mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-31 23:21:54 +00:00 
			
		
		
		
	Merge branch 'usb_memory_order_retry' into integration
This commit is contained in:
		| @@ -82,6 +82,12 @@ struct TransferStatus { | |||||||
|  |  | ||||||
| using transfer_cb_t = std::function<void(const TransferStatus &)>; | using transfer_cb_t = std::function<void(const TransferStatus &)>; | ||||||
|  |  | ||||||
|  | enum TransferResult : uint8_t { | ||||||
|  |   TRANSFER_OK = 0, | ||||||
|  |   TRANSFER_ERROR_NO_SLOTS, | ||||||
|  |   TRANSFER_ERROR_SUBMIT_FAILED, | ||||||
|  | }; | ||||||
|  |  | ||||||
| class USBClient; | class USBClient; | ||||||
|  |  | ||||||
| // struct used to capture all data needed for a transfer | // struct used to capture all data needed for a transfer | ||||||
| @@ -134,7 +140,7 @@ class USBClient : public Component { | |||||||
|   void on_opened(uint8_t addr); |   void on_opened(uint8_t addr); | ||||||
|   void on_removed(usb_device_handle_t handle); |   void on_removed(usb_device_handle_t handle); | ||||||
|   void control_transfer_callback(const usb_transfer_t *xfer) const; |   void control_transfer_callback(const usb_transfer_t *xfer) const; | ||||||
|   void transfer_in(uint8_t ep_address, const transfer_cb_t &callback, uint16_t length); |   TransferResult transfer_in(uint8_t ep_address, const transfer_cb_t &callback, uint16_t length); | ||||||
|   void transfer_out(uint8_t ep_address, const transfer_cb_t &callback, const uint8_t *data, uint16_t length); |   void transfer_out(uint8_t ep_address, const transfer_cb_t &callback, const uint8_t *data, uint16_t length); | ||||||
|   void dump_config() override; |   void dump_config() override; | ||||||
|   void release_trq(TransferRequest *trq); |   void release_trq(TransferRequest *trq); | ||||||
|   | |||||||
| @@ -334,7 +334,7 @@ static void control_callback(const usb_transfer_t *xfer) { | |||||||
| // This multi-threaded access is intentional for performance - USB task can | // This multi-threaded access is intentional for performance - USB task can | ||||||
| // immediately restart transfers without waiting for main loop scheduling. | // immediately restart transfers without waiting for main loop scheduling. | ||||||
| TransferRequest *USBClient::get_trq_() { | TransferRequest *USBClient::get_trq_() { | ||||||
|   trq_bitmask_t mask = this->trq_in_use_.load(std::memory_order_relaxed); |   trq_bitmask_t mask = this->trq_in_use_.load(std::memory_order_acquire); | ||||||
|  |  | ||||||
|   // Find first available slot (bit = 0) and try to claim it atomically |   // Find first available slot (bit = 0) and try to claim it atomically | ||||||
|   // We use a while loop to allow retrying the same slot after CAS failure |   // We use a while loop to allow retrying the same slot after CAS failure | ||||||
| @@ -443,14 +443,15 @@ static void transfer_callback(usb_transfer_t *xfer) { | |||||||
|  * @param ep_address The endpoint address. |  * @param ep_address The endpoint address. | ||||||
|  * @param callback The callback function to be called when the transfer is complete. |  * @param callback The callback function to be called when the transfer is complete. | ||||||
|  * @param length The length of the data to be transferred. |  * @param length The length of the data to be transferred. | ||||||
|  |  * @return TransferResult indicating success or specific failure reason | ||||||
|  * |  * | ||||||
|  * @throws None. |  * @throws None. | ||||||
|  */ |  */ | ||||||
| void USBClient::transfer_in(uint8_t ep_address, const transfer_cb_t &callback, uint16_t length) { | TransferResult USBClient::transfer_in(uint8_t ep_address, const transfer_cb_t &callback, uint16_t length) { | ||||||
|   auto *trq = this->get_trq_(); |   auto *trq = this->get_trq_(); | ||||||
|   if (trq == nullptr) { |   if (trq == nullptr) { | ||||||
|     ESP_LOGE(TAG, "Too many requests queued"); |     ESP_LOGE(TAG, "Too many requests queued"); | ||||||
|     return; |     return TRANSFER_ERROR_NO_SLOTS; | ||||||
|   } |   } | ||||||
|   trq->callback = callback; |   trq->callback = callback; | ||||||
|   trq->transfer->callback = transfer_callback; |   trq->transfer->callback = transfer_callback; | ||||||
| @@ -460,7 +461,9 @@ void USBClient::transfer_in(uint8_t ep_address, const transfer_cb_t &callback, u | |||||||
|   if (err != ESP_OK) { |   if (err != ESP_OK) { | ||||||
|     ESP_LOGE(TAG, "Failed to submit transfer, address=%x, length=%d, err=%x", ep_address, length, err); |     ESP_LOGE(TAG, "Failed to submit transfer, address=%x, length=%d, err=%x", ep_address, length, err); | ||||||
|     this->release_trq(trq); |     this->release_trq(trq); | ||||||
|  |     return TRANSFER_ERROR_SUBMIT_FAILED; | ||||||
|   } |   } | ||||||
|  |   return TRANSFER_OK; | ||||||
| } | } | ||||||
|  |  | ||||||
| /** | /** | ||||||
|   | |||||||
| @@ -169,6 +169,25 @@ bool USBUartChannel::read_array(uint8_t *data, size_t len) { | |||||||
|   this->parent_->start_input(this); |   this->parent_->start_input(this); | ||||||
|   return status; |   return status; | ||||||
| } | } | ||||||
|  | void USBUartComponent::defer_input_retry_(USBUartChannel *channel) { | ||||||
|  |   static constexpr uint8_t MAX_INPUT_RETRIES = 10; | ||||||
|  |  | ||||||
|  |   // Atomically increment and get previous value | ||||||
|  |   uint8_t retry_count = channel->input_retry_count_.fetch_add(1); | ||||||
|  |   if (retry_count >= MAX_INPUT_RETRIES) { | ||||||
|  |     ESP_LOGE(TAG, "Input retry limit reached for channel %d, stopping retries", channel->index_); | ||||||
|  |     channel->input_started_.store(false); | ||||||
|  |     return; | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   // Keep input_started_ as true during defer to prevent multiple retries from queueing | ||||||
|  |   // The deferred lambda will clear it before calling start_input() | ||||||
|  |   this->defer([this, channel] { | ||||||
|  |     channel->input_started_.store(false); | ||||||
|  |     this->start_input(channel); | ||||||
|  |   }); | ||||||
|  | } | ||||||
|  |  | ||||||
| void USBUartComponent::setup() { USBClient::setup(); } | void USBUartComponent::setup() { USBClient::setup(); } | ||||||
| void USBUartComponent::loop() { | void USBUartComponent::loop() { | ||||||
|   USBClient::loop(); |   USBClient::loop(); | ||||||
| @@ -214,8 +233,13 @@ void USBUartComponent::dump_config() { | |||||||
|   } |   } | ||||||
| } | } | ||||||
| void USBUartComponent::start_input(USBUartChannel *channel) { | void USBUartComponent::start_input(USBUartChannel *channel) { | ||||||
|   if (!channel->initialised_.load() || channel->input_started_.load()) |   if (!channel->initialised_.load()) | ||||||
|     return; |     return; | ||||||
|  |  | ||||||
|  |   // Atomically check if not started and set to started in one operation | ||||||
|  |   bool expected = false; | ||||||
|  |   if (!channel->input_started_.compare_exchange_strong(expected, true)) | ||||||
|  |     return;  // Already started, another thread won the race | ||||||
|   // THREAD CONTEXT: Called from both USB task and main loop threads |   // THREAD CONTEXT: Called from both USB task and main loop threads | ||||||
|   // - USB task: Immediate restart after successful transfer for continuous data flow |   // - USB task: Immediate restart after successful transfer for continuous data flow | ||||||
|   // - Main loop: Controlled restart after consuming data (backpressure mechanism) |   // - Main loop: Controlled restart after consuming data (backpressure mechanism) | ||||||
| @@ -232,8 +256,8 @@ void USBUartComponent::start_input(USBUartChannel *channel) { | |||||||
|     ESP_LOGV(TAG, "Transfer result: length: %u; status %X", status.data_len, status.error_code); |     ESP_LOGV(TAG, "Transfer result: length: %u; status %X", status.data_len, status.error_code); | ||||||
|     if (!status.success) { |     if (!status.success) { | ||||||
|       ESP_LOGE(TAG, "Control transfer failed, status=%s", esp_err_to_name(status.error_code)); |       ESP_LOGE(TAG, "Control transfer failed, status=%s", esp_err_to_name(status.error_code)); | ||||||
|       // On failure, don't restart - let next read_array() trigger it |       // On failure, defer retry to main loop | ||||||
|       channel->input_started_.store(false); |       this->defer_input_retry_(channel); | ||||||
|       return; |       return; | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -241,10 +265,9 @@ void USBUartComponent::start_input(USBUartChannel *channel) { | |||||||
|       // Allocate a chunk from the pool |       // Allocate a chunk from the pool | ||||||
|       UsbDataChunk *chunk = this->chunk_pool_.allocate(); |       UsbDataChunk *chunk = this->chunk_pool_.allocate(); | ||||||
|       if (chunk == nullptr) { |       if (chunk == nullptr) { | ||||||
|         // No chunks available - queue is full or we're out of memory |         // No chunks available - defer retry to main loop for backpressure | ||||||
|         this->usb_data_queue_.increment_dropped_count(); |         this->usb_data_queue_.increment_dropped_count(); | ||||||
|         // Mark input as not started so we can retry |         this->defer_input_retry_(channel); | ||||||
|         channel->input_started_.store(false); |  | ||||||
|         return; |         return; | ||||||
|       } |       } | ||||||
|  |  | ||||||
| @@ -258,13 +281,22 @@ void USBUartComponent::start_input(USBUartChannel *channel) { | |||||||
|       this->usb_data_queue_.push(chunk); |       this->usb_data_queue_.push(chunk); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     // On success, restart input immediately from USB task for performance |     // On success, reset retry count and restart input immediately from USB task for performance | ||||||
|     // The lock-free queue will handle backpressure |     // The lock-free queue will handle backpressure | ||||||
|  |     channel->input_retry_count_.store(0); | ||||||
|     channel->input_started_.store(false); |     channel->input_started_.store(false); | ||||||
|     this->start_input(channel); |     this->start_input(channel); | ||||||
|   }; |   }; | ||||||
|   channel->input_started_.store(true); |   // input_started_ already set to true by compare_exchange_strong above | ||||||
|   this->transfer_in(ep->bEndpointAddress, callback, ep->wMaxPacketSize); |   auto result = this->transfer_in(ep->bEndpointAddress, callback, ep->wMaxPacketSize); | ||||||
|  |   if (result == usb_host::TRANSFER_ERROR_NO_SLOTS) { | ||||||
|  |     // No slots available - defer retry to main loop | ||||||
|  |     this->defer_input_retry_(channel); | ||||||
|  |   } else if (result != usb_host::TRANSFER_OK) { | ||||||
|  |     // Other error (submit failed) - don't retry, just clear flag | ||||||
|  |     // Error already logged by transfer_in() | ||||||
|  |     channel->input_started_.store(false); | ||||||
|  |   } | ||||||
| } | } | ||||||
|  |  | ||||||
| void USBUartComponent::start_output(USBUartChannel *channel) { | void USBUartComponent::start_output(USBUartChannel *channel) { | ||||||
| @@ -370,6 +402,7 @@ void USBUartTypeCdcAcm::enable_channels() { | |||||||
|   for (auto *channel : this->channels_) { |   for (auto *channel : this->channels_) { | ||||||
|     if (!channel->initialised_.load()) |     if (!channel->initialised_.load()) | ||||||
|       continue; |       continue; | ||||||
|  |     channel->input_retry_count_.store(0); | ||||||
|     channel->input_started_.store(false); |     channel->input_started_.store(false); | ||||||
|     channel->output_started_.store(false); |     channel->output_started_.store(false); | ||||||
|     this->start_input(channel); |     this->start_input(channel); | ||||||
|   | |||||||
| @@ -111,10 +111,11 @@ class USBUartChannel : public uart::UARTComponent, public Parented<USBUartCompon | |||||||
|   CdcEps cdc_dev_{}; |   CdcEps cdc_dev_{}; | ||||||
|   // Enum (likely 4 bytes) |   // Enum (likely 4 bytes) | ||||||
|   UARTParityOptions parity_{UART_CONFIG_PARITY_NONE}; |   UARTParityOptions parity_{UART_CONFIG_PARITY_NONE}; | ||||||
|   // Group atomics together (each 1 byte) |   // Group atomics together | ||||||
|   std::atomic<bool> input_started_{true}; |   std::atomic<bool> input_started_{true}; | ||||||
|   std::atomic<bool> output_started_{true}; |   std::atomic<bool> output_started_{true}; | ||||||
|   std::atomic<bool> initialised_{false}; |   std::atomic<bool> initialised_{false}; | ||||||
|  |   std::atomic<uint8_t> input_retry_count_{0}; | ||||||
|   // Group regular bytes together to minimize padding |   // Group regular bytes together to minimize padding | ||||||
|   const uint8_t index_; |   const uint8_t index_; | ||||||
|   bool debug_{}; |   bool debug_{}; | ||||||
| @@ -140,6 +141,7 @@ class USBUartComponent : public usb_host::USBClient { | |||||||
|   EventPool<UsbDataChunk, USB_DATA_QUEUE_SIZE> chunk_pool_; |   EventPool<UsbDataChunk, USB_DATA_QUEUE_SIZE> chunk_pool_; | ||||||
|  |  | ||||||
|  protected: |  protected: | ||||||
|  |   void defer_input_retry_(USBUartChannel *channel); | ||||||
|   std::vector<USBUartChannel *> channels_{}; |   std::vector<USBUartChannel *> channels_{}; | ||||||
| }; | }; | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user