1
0
mirror of https://github.com/esphome/esphome.git synced 2025-11-01 07:31:51 +00:00

Compare commits

...

10 Commits

Author SHA1 Message Date
J. Nick Koston
c18a0f538f preen 2025-10-25 15:05:13 -07:00
J. Nick Koston
7e31149584 readable 2025-10-25 15:02:56 -07:00
J. Nick Koston
2c6b9d3826 no race window 2025-10-25 14:56:59 -07:00
J. Nick Koston
527039211e fix off by one 2025-10-25 14:53:48 -07:00
J. Nick Koston
1ea17607f3 fix race. 2025-10-25 14:44:36 -07:00
J. Nick Koston
6cfca87ca7 safer 2025-10-25 14:39:28 -07:00
J. Nick Koston
8bd640875f touch ups 2025-10-25 14:20:57 -07:00
J. Nick Koston
1e17ed8c1e narrow scope 2025-10-25 13:51:29 -07:00
J. Nick Koston
d3b4b11302 narrow scope 2025-10-25 13:50:16 -07:00
J. Nick Koston
c5ff19d3ab [usb_host] Fix atomic memory ordering in transfer slot allocation 2025-10-25 13:43:53 -07:00
4 changed files with 122 additions and 45 deletions

View File

@@ -82,6 +82,12 @@ struct TransferStatus {
using transfer_cb_t = std::function<void(const TransferStatus &)>;
enum TransferResult : uint8_t {
TRANSFER_OK = 0,
TRANSFER_ERROR_NO_SLOTS,
TRANSFER_ERROR_SUBMIT_FAILED,
};
class USBClient;
// struct used to capture all data needed for a transfer
@@ -134,7 +140,7 @@ class USBClient : public Component {
void on_opened(uint8_t addr);
void on_removed(usb_device_handle_t handle);
void control_transfer_callback(const usb_transfer_t *xfer) const;
void transfer_in(uint8_t ep_address, const transfer_cb_t &callback, uint16_t length);
TransferResult transfer_in(uint8_t ep_address, const transfer_cb_t &callback, uint16_t length);
void transfer_out(uint8_t ep_address, const transfer_cb_t &callback, const uint8_t *data, uint16_t length);
void dump_config() override;
void release_trq(TransferRequest *trq);

View File

@@ -334,7 +334,7 @@ static void control_callback(const usb_transfer_t *xfer) {
// This multi-threaded access is intentional for performance - USB task can
// immediately restart transfers without waiting for main loop scheduling.
TransferRequest *USBClient::get_trq_() {
trq_bitmask_t mask = this->trq_in_use_.load(std::memory_order_relaxed);
trq_bitmask_t mask = this->trq_in_use_.load(std::memory_order_acquire);
// Find first available slot (bit = 0) and try to claim it atomically
// We use a while loop to allow retrying the same slot after CAS failure
@@ -443,14 +443,15 @@ static void transfer_callback(usb_transfer_t *xfer) {
* @param ep_address The endpoint address.
* @param callback The callback function to be called when the transfer is complete.
* @param length The length of the data to be transferred.
* @return TransferResult indicating success or specific failure reason
*
* @throws None.
*/
void USBClient::transfer_in(uint8_t ep_address, const transfer_cb_t &callback, uint16_t length) {
TransferResult USBClient::transfer_in(uint8_t ep_address, const transfer_cb_t &callback, uint16_t length) {
auto *trq = this->get_trq_();
if (trq == nullptr) {
ESP_LOGE(TAG, "Too many requests queued");
return;
return TRANSFER_ERROR_NO_SLOTS;
}
trq->callback = callback;
trq->transfer->callback = transfer_callback;
@@ -460,7 +461,9 @@ void USBClient::transfer_in(uint8_t ep_address, const transfer_cb_t &callback, u
if (err != ESP_OK) {
ESP_LOGE(TAG, "Failed to submit transfer, address=%x, length=%d, err=%x", ep_address, length, err);
this->release_trq(trq);
return TRANSFER_ERROR_SUBMIT_FAILED;
}
return TRANSFER_OK;
}
/**

View File

@@ -169,6 +169,98 @@ bool USBUartChannel::read_array(uint8_t *data, size_t len) {
this->parent_->start_input(this);
return status;
}
void USBUartComponent::reset_input_state_(USBUartChannel *channel) {
channel->input_retry_count_.store(0);
channel->input_started_.store(false);
}
void USBUartComponent::restart_input_(USBUartChannel *channel) {
// Atomically verify it's still started (true) and keep it started
// This prevents the race window of toggling true->false->true
bool expected = true;
if (channel->input_started_.compare_exchange_strong(expected, true)) {
// Still started - do the actual restart work without toggling the flag
this->do_start_input_(channel);
}
}
void USBUartComponent::input_transfer_callback_(USBUartChannel *channel, const usb_host::TransferStatus &status) {
// CALLBACK CONTEXT: This function is executed in USB task via transfer_callback
ESP_LOGV(TAG, "Transfer result: length: %u; status %X", status.data_len, status.error_code);
if (!status.success) {
ESP_LOGE(TAG, "Control transfer failed, status=%s", esp_err_to_name(status.error_code));
// Transfer failed, slot already released
// Reset state so normal operations can restart later
this->reset_input_state_(channel);
return;
}
if (!channel->dummy_receiver_ && status.data_len > 0) {
// Allocate a chunk from the pool
UsbDataChunk *chunk = this->chunk_pool_.allocate();
if (chunk == nullptr) {
// No chunks available - queue is full, data dropped, slot already released
this->usb_data_queue_.increment_dropped_count();
// Reset state so normal operations can restart later
this->reset_input_state_(channel);
return;
}
// Copy data to chunk (this is fast, happens in USB task)
memcpy(chunk->data, status.data, status.data_len);
chunk->length = status.data_len;
chunk->channel = channel;
// Push to lock-free queue for main loop processing
// Push always succeeds because pool size == queue size
this->usb_data_queue_.push(chunk);
}
// On success, reset retry count and restart input immediately from USB task for performance
// The lock-free queue will handle backpressure
channel->input_retry_count_.store(0);
channel->input_started_.store(false);
this->start_input(channel);
}
void USBUartComponent::do_start_input_(USBUartChannel *channel) {
// This function does the actual work of starting input
// Caller must ensure input_started_ is already set to true
const auto *ep = channel->cdc_dev_.in_ep;
// input_started_ already set to true by caller
auto result = this->transfer_in(
ep->bEndpointAddress,
[this, channel](const usb_host::TransferStatus &status) { this->input_transfer_callback_(channel, status); },
ep->wMaxPacketSize);
if (result == usb_host::TRANSFER_ERROR_NO_SLOTS) {
// No slots available - defer retry to main loop
this->defer_input_retry_(channel);
} else if (result != usb_host::TRANSFER_OK) {
// Other error (submit failed) - don't retry, just reset state
// Error already logged by transfer_in()
this->reset_input_state_(channel);
}
}
void USBUartComponent::defer_input_retry_(USBUartChannel *channel) {
static constexpr uint8_t MAX_INPUT_RETRIES = 10;
// Atomically increment and get the NEW value (previous + 1)
uint8_t new_retry_count = channel->input_retry_count_.fetch_add(1) + 1;
if (new_retry_count > MAX_INPUT_RETRIES) {
ESP_LOGE(TAG, "Input retry limit reached for channel %d, stopping retries", channel->index_);
this->reset_input_state_(channel);
return;
}
// Keep input_started_ as true during defer to prevent multiple retries from queueing
// The deferred lambda will atomically restart
this->defer([this, channel] { this->restart_input_(channel); });
}
void USBUartComponent::setup() { USBClient::setup(); }
void USBUartComponent::loop() {
USBClient::loop();
@@ -214,8 +306,14 @@ void USBUartComponent::dump_config() {
}
}
void USBUartComponent::start_input(USBUartChannel *channel) {
if (!channel->initialised_.load() || channel->input_started_.load())
if (!channel->initialised_.load())
return;
// Atomically check if not started and set to started in one operation
bool expected = false;
if (!channel->input_started_.compare_exchange_strong(expected, true))
return; // Already started - prevents duplicate transfers from concurrent threads
// THREAD CONTEXT: Called from both USB task and main loop threads
// - USB task: Immediate restart after successful transfer for continuous data flow
// - Main loop: Controlled restart after consuming data (backpressure mechanism)
@@ -226,45 +324,9 @@ void USBUartComponent::start_input(USBUartChannel *channel) {
//
// The underlying transfer_in() uses lock-free atomic allocation from the
// TransferRequest pool, making this multi-threaded access safe
const auto *ep = channel->cdc_dev_.in_ep;
// CALLBACK CONTEXT: This lambda is executed in USB task via transfer_callback
auto callback = [this, channel](const usb_host::TransferStatus &status) {
ESP_LOGV(TAG, "Transfer result: length: %u; status %X", status.data_len, status.error_code);
if (!status.success) {
ESP_LOGE(TAG, "Control transfer failed, status=%s", esp_err_to_name(status.error_code));
// On failure, don't restart - let next read_array() trigger it
channel->input_started_.store(false);
return;
}
if (!channel->dummy_receiver_ && status.data_len > 0) {
// Allocate a chunk from the pool
UsbDataChunk *chunk = this->chunk_pool_.allocate();
if (chunk == nullptr) {
// No chunks available - queue is full or we're out of memory
this->usb_data_queue_.increment_dropped_count();
// Mark input as not started so we can retry
channel->input_started_.store(false);
return;
}
// Copy data to chunk (this is fast, happens in USB task)
memcpy(chunk->data, status.data, status.data_len);
chunk->length = status.data_len;
chunk->channel = channel;
// Push to lock-free queue for main loop processing
// Push always succeeds because pool size == queue size
this->usb_data_queue_.push(chunk);
}
// On success, restart input immediately from USB task for performance
// The lock-free queue will handle backpressure
channel->input_started_.store(false);
this->start_input(channel);
};
channel->input_started_.store(true);
this->transfer_in(ep->bEndpointAddress, callback, ep->wMaxPacketSize);
// Do the actual work (input_started_ already set to true by CAS above)
this->do_start_input_(channel);
}
void USBUartComponent::start_output(USBUartChannel *channel) {
@@ -370,7 +432,7 @@ void USBUartTypeCdcAcm::enable_channels() {
for (auto *channel : this->channels_) {
if (!channel->initialised_.load())
continue;
channel->input_started_.store(false);
this->reset_input_state_(channel);
channel->output_started_.store(false);
this->start_input(channel);
}

View File

@@ -111,10 +111,11 @@ class USBUartChannel : public uart::UARTComponent, public Parented<USBUartCompon
CdcEps cdc_dev_{};
// Enum (likely 4 bytes)
UARTParityOptions parity_{UART_CONFIG_PARITY_NONE};
// Group atomics together (each 1 byte)
// Group atomics together
std::atomic<bool> input_started_{true};
std::atomic<bool> output_started_{true};
std::atomic<bool> initialised_{false};
std::atomic<uint8_t> input_retry_count_{0};
// Group regular bytes together to minimize padding
const uint8_t index_;
bool debug_{};
@@ -140,6 +141,11 @@ class USBUartComponent : public usb_host::USBClient {
EventPool<UsbDataChunk, USB_DATA_QUEUE_SIZE> chunk_pool_;
protected:
void defer_input_retry_(USBUartChannel *channel);
void reset_input_state_(USBUartChannel *channel);
void restart_input_(USBUartChannel *channel);
void do_start_input_(USBUartChannel *channel);
void input_transfer_callback_(USBUartChannel *channel, const usb_host::TransferStatus &status);
std::vector<USBUartChannel *> channels_{};
};