1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-31 15:12:06 +00:00

Merge branch 'integration' into memory_api

This commit is contained in:
J. Nick Koston
2025-10-25 13:46:41 -07:00
4 changed files with 58 additions and 14 deletions

View File

@@ -82,6 +82,12 @@ struct TransferStatus {
using transfer_cb_t = std::function<void(const TransferStatus &)>;
enum TransferResult : uint8_t {
TRANSFER_OK = 0,
TRANSFER_ERROR_NO_SLOTS,
TRANSFER_ERROR_SUBMIT_FAILED,
};
class USBClient;
// struct used to capture all data needed for a transfer
@@ -134,7 +140,7 @@ class USBClient : public Component {
void on_opened(uint8_t addr);
void on_removed(usb_device_handle_t handle);
void control_transfer_callback(const usb_transfer_t *xfer) const;
void transfer_in(uint8_t ep_address, const transfer_cb_t &callback, uint16_t length);
TransferResult transfer_in(uint8_t ep_address, const transfer_cb_t &callback, uint16_t length);
void transfer_out(uint8_t ep_address, const transfer_cb_t &callback, const uint8_t *data, uint16_t length);
void dump_config() override;
void release_trq(TransferRequest *trq);

View File

@@ -334,7 +334,7 @@ static void control_callback(const usb_transfer_t *xfer) {
// This multi-threaded access is intentional for performance - USB task can
// immediately restart transfers without waiting for main loop scheduling.
TransferRequest *USBClient::get_trq_() {
trq_bitmask_t mask = this->trq_in_use_.load(std::memory_order_relaxed);
trq_bitmask_t mask = this->trq_in_use_.load(std::memory_order_acquire);
// Find first available slot (bit = 0) and try to claim it atomically
// We use a while loop to allow retrying the same slot after CAS failure
@@ -443,14 +443,15 @@ static void transfer_callback(usb_transfer_t *xfer) {
* @param ep_address The endpoint address.
* @param callback The callback function to be called when the transfer is complete.
* @param length The length of the data to be transferred.
* @return TransferResult indicating success or specific failure reason
*
* @throws None.
*/
void USBClient::transfer_in(uint8_t ep_address, const transfer_cb_t &callback, uint16_t length) {
TransferResult USBClient::transfer_in(uint8_t ep_address, const transfer_cb_t &callback, uint16_t length) {
auto *trq = this->get_trq_();
if (trq == nullptr) {
ESP_LOGE(TAG, "Too many requests queued");
return;
return TRANSFER_ERROR_NO_SLOTS;
}
trq->callback = callback;
trq->transfer->callback = transfer_callback;
@@ -460,7 +461,9 @@ void USBClient::transfer_in(uint8_t ep_address, const transfer_cb_t &callback, u
if (err != ESP_OK) {
ESP_LOGE(TAG, "Failed to submit transfer, address=%x, length=%d, err=%x", ep_address, length, err);
this->release_trq(trq);
return TRANSFER_ERROR_SUBMIT_FAILED;
}
return TRANSFER_OK;
}
/**

View File

@@ -169,6 +169,25 @@ bool USBUartChannel::read_array(uint8_t *data, size_t len) {
this->parent_->start_input(this);
return status;
}
void USBUartComponent::defer_input_retry_(USBUartChannel *channel) {
static constexpr uint8_t MAX_INPUT_RETRIES = 10;
// Atomically increment and get previous value
uint8_t retry_count = channel->input_retry_count_.fetch_add(1);
if (retry_count >= MAX_INPUT_RETRIES) {
ESP_LOGE(TAG, "Input retry limit reached for channel %d, stopping retries", channel->index_);
channel->input_started_.store(false);
return;
}
// Keep input_started_ as true during defer to prevent multiple retries from queueing
// The deferred lambda will clear it before calling start_input()
this->defer([this, channel] {
channel->input_started_.store(false);
this->start_input(channel);
});
}
void USBUartComponent::setup() { USBClient::setup(); }
void USBUartComponent::loop() {
USBClient::loop();
@@ -214,8 +233,13 @@ void USBUartComponent::dump_config() {
}
}
void USBUartComponent::start_input(USBUartChannel *channel) {
if (!channel->initialised_.load() || channel->input_started_.load())
if (!channel->initialised_.load())
return;
// Atomically check if not started and set to started in one operation
bool expected = false;
if (!channel->input_started_.compare_exchange_strong(expected, true))
return; // Already started, another thread won the race
// THREAD CONTEXT: Called from both USB task and main loop threads
// - USB task: Immediate restart after successful transfer for continuous data flow
// - Main loop: Controlled restart after consuming data (backpressure mechanism)
@@ -232,8 +256,8 @@ void USBUartComponent::start_input(USBUartChannel *channel) {
ESP_LOGV(TAG, "Transfer result: length: %u; status %X", status.data_len, status.error_code);
if (!status.success) {
ESP_LOGE(TAG, "Control transfer failed, status=%s", esp_err_to_name(status.error_code));
// On failure, don't restart - let next read_array() trigger it
channel->input_started_.store(false);
// On failure, defer retry to main loop
this->defer_input_retry_(channel);
return;
}
@@ -241,10 +265,9 @@ void USBUartComponent::start_input(USBUartChannel *channel) {
// Allocate a chunk from the pool
UsbDataChunk *chunk = this->chunk_pool_.allocate();
if (chunk == nullptr) {
// No chunks available - queue is full or we're out of memory
// No chunks available - defer retry to main loop for backpressure
this->usb_data_queue_.increment_dropped_count();
// Mark input as not started so we can retry
channel->input_started_.store(false);
this->defer_input_retry_(channel);
return;
}
@@ -258,13 +281,22 @@ void USBUartComponent::start_input(USBUartChannel *channel) {
this->usb_data_queue_.push(chunk);
}
// On success, restart input immediately from USB task for performance
// On success, reset retry count and restart input immediately from USB task for performance
// The lock-free queue will handle backpressure
channel->input_retry_count_.store(0);
channel->input_started_.store(false);
this->start_input(channel);
};
channel->input_started_.store(true);
this->transfer_in(ep->bEndpointAddress, callback, ep->wMaxPacketSize);
// input_started_ already set to true by compare_exchange_strong above
auto result = this->transfer_in(ep->bEndpointAddress, callback, ep->wMaxPacketSize);
if (result == usb_host::TRANSFER_ERROR_NO_SLOTS) {
// No slots available - defer retry to main loop
this->defer_input_retry_(channel);
} else if (result != usb_host::TRANSFER_OK) {
// Other error (submit failed) - don't retry, just clear flag
// Error already logged by transfer_in()
channel->input_started_.store(false);
}
}
void USBUartComponent::start_output(USBUartChannel *channel) {
@@ -370,6 +402,7 @@ void USBUartTypeCdcAcm::enable_channels() {
for (auto *channel : this->channels_) {
if (!channel->initialised_.load())
continue;
channel->input_retry_count_.store(0);
channel->input_started_.store(false);
channel->output_started_.store(false);
this->start_input(channel);

View File

@@ -111,10 +111,11 @@ class USBUartChannel : public uart::UARTComponent, public Parented<USBUartCompon
CdcEps cdc_dev_{};
// Enum (likely 4 bytes)
UARTParityOptions parity_{UART_CONFIG_PARITY_NONE};
// Group atomics together (each 1 byte)
// Group atomics together
std::atomic<bool> input_started_{true};
std::atomic<bool> output_started_{true};
std::atomic<bool> initialised_{false};
std::atomic<uint8_t> input_retry_count_{0};
// Group regular bytes together to minimize padding
const uint8_t index_;
bool debug_{};
@@ -140,6 +141,7 @@ class USBUartComponent : public usb_host::USBClient {
EventPool<UsbDataChunk, USB_DATA_QUEUE_SIZE> chunk_pool_;
protected:
void defer_input_retry_(USBUartChannel *channel);
std::vector<USBUartChannel *> channels_{};
};