mirror of
https://github.com/esphome/esphome.git
synced 2025-09-14 17:22:20 +01:00
Merge branch 'ble_queue_lock_free' into integration
This commit is contained in:
@@ -360,11 +360,18 @@ void ESP32BLE::loop() {
|
||||
if (this->advertising_ != nullptr) {
|
||||
this->advertising_->loop();
|
||||
}
|
||||
|
||||
// Log dropped events periodically
|
||||
size_t dropped = this->ble_events_.get_and_reset_dropped_count();
|
||||
if (dropped > 0) {
|
||||
ESP_LOGW(TAG, "Dropped %zu BLE events due to buffer overflow", dropped);
|
||||
}
|
||||
}
|
||||
|
||||
template<typename... Args> void enqueue_ble_event(Args... args) {
|
||||
if (global_ble->ble_events_.size() >= MAX_BLE_QUEUE_SIZE) {
|
||||
ESP_LOGD(TAG, "BLE event queue full (%zu), dropping event", MAX_BLE_QUEUE_SIZE);
|
||||
// Check if buffer is full before allocating
|
||||
if (global_ble->ble_events_.size() >= (SCAN_RESULT_BUFFER_SIZE * 2 - 1)) {
|
||||
// Buffer is full, push will fail and increment dropped count internally
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -374,6 +381,8 @@ template<typename... Args> void enqueue_ble_event(Args... args) {
|
||||
return;
|
||||
}
|
||||
new (new_event) BLEEvent(args...);
|
||||
|
||||
// With atomic size, this should never fail due to the size check above
|
||||
global_ble->ble_events_.push(new_event);
|
||||
} // NOLINT(clang-analyzer-unix.Malloc)
|
||||
|
||||
|
@@ -144,7 +144,7 @@ class ESP32BLE : public Component {
|
||||
std::vector<BLEStatusEventHandler *> ble_status_event_handlers_;
|
||||
BLEComponentState state_{BLE_COMPONENT_STATE_OFF};
|
||||
|
||||
Queue<BLEEvent> ble_events_;
|
||||
LockFreeQueue<BLEEvent, SCAN_RESULT_BUFFER_SIZE * 2> ble_events_;
|
||||
BLEAdvertising *advertising_{};
|
||||
esp_ble_io_cap_t io_cap_{ESP_IO_CAP_NONE};
|
||||
uint32_t advertising_cycle_time_{};
|
||||
|
@@ -2,63 +2,76 @@
|
||||
|
||||
#ifdef USE_ESP32
|
||||
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
|
||||
#include <freertos/FreeRTOS.h>
|
||||
#include <freertos/semphr.h>
|
||||
#include <atomic>
|
||||
#include <cstddef>
|
||||
|
||||
/*
|
||||
* BLE events come in from a separate Task (thread) in the ESP32 stack. Rather
|
||||
* than trying to deal with various locking strategies, all incoming GAP and GATT
|
||||
* events will simply be placed on a semaphore guarded queue. The next time the
|
||||
* component runs loop(), these events are popped off the queue and handed at
|
||||
* this safer time.
|
||||
* than using mutex-based locking, this lock-free queue allows the BLE
|
||||
* task to enqueue events without blocking. The main loop() then processes
|
||||
* these events at a safer time.
|
||||
*
|
||||
* The queue uses atomic operations to ensure thread safety without locks.
|
||||
* This prevents blocking the time-sensitive BLE stack callbacks.
|
||||
*/
|
||||
|
||||
namespace esphome {
|
||||
namespace esp32_ble {
|
||||
|
||||
template<class T> class Queue {
|
||||
template<class T, size_t SIZE> class LockFreeQueue {
|
||||
public:
|
||||
Queue() { m_ = xSemaphoreCreateMutex(); }
|
||||
LockFreeQueue() : write_index_(0), read_index_(0), size_(0), dropped_count_(0) {}
|
||||
|
||||
void push(T *element) {
|
||||
bool push(T *element) {
|
||||
if (element == nullptr)
|
||||
return;
|
||||
// It is not called from main loop. Thus it won't block main thread.
|
||||
xSemaphoreTake(m_, portMAX_DELAY);
|
||||
q_.push(element);
|
||||
xSemaphoreGive(m_);
|
||||
return false;
|
||||
|
||||
size_t current_size = size_.load(std::memory_order_acquire);
|
||||
if (current_size >= SIZE - 1) {
|
||||
// Buffer full, track dropped event
|
||||
dropped_count_.fetch_add(1, std::memory_order_relaxed);
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t write_idx = write_index_.load(std::memory_order_relaxed);
|
||||
size_t next_write_idx = (write_idx + 1) % SIZE;
|
||||
|
||||
// Store element in buffer
|
||||
buffer_[write_idx] = element;
|
||||
write_index_.store(next_write_idx, std::memory_order_release);
|
||||
size_.fetch_add(1, std::memory_order_release);
|
||||
return true;
|
||||
}
|
||||
|
||||
T *pop() {
|
||||
T *element = nullptr;
|
||||
size_t current_size = size_.load(std::memory_order_acquire);
|
||||
if (current_size == 0) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (xSemaphoreTake(m_, 5L / portTICK_PERIOD_MS)) {
|
||||
if (!q_.empty()) {
|
||||
element = q_.front();
|
||||
q_.pop();
|
||||
}
|
||||
xSemaphoreGive(m_);
|
||||
}
|
||||
size_t read_idx = read_index_.load(std::memory_order_relaxed);
|
||||
|
||||
// Get element from buffer
|
||||
T *element = buffer_[read_idx];
|
||||
read_index_.store((read_idx + 1) % SIZE, std::memory_order_release);
|
||||
size_.fetch_sub(1, std::memory_order_release);
|
||||
return element;
|
||||
}
|
||||
|
||||
size_t size() const {
|
||||
// Lock-free size check. While std::queue::size() is not thread-safe, we intentionally
|
||||
// avoid locking here to prevent blocking the BLE callback thread. The size is only
|
||||
// used to decide whether to drop incoming events when the queue is near capacity.
|
||||
// With a queue limit of 40-64 events and normal processing, dropping events should
|
||||
// be extremely rare. When it does approach capacity, being off by 1-2 events is
|
||||
// acceptable to avoid blocking the BLE stack's time-sensitive callbacks.
|
||||
// Trade-off: We prefer occasional dropped events over potential BLE stack delays.
|
||||
return q_.size();
|
||||
}
|
||||
size_t size() const { return size_.load(std::memory_order_acquire); }
|
||||
|
||||
size_t get_and_reset_dropped_count() { return dropped_count_.exchange(0, std::memory_order_relaxed); }
|
||||
|
||||
void increment_dropped_count() { dropped_count_.fetch_add(1, std::memory_order_relaxed); }
|
||||
|
||||
bool empty() const { return size_.load(std::memory_order_acquire) == 0; }
|
||||
|
||||
protected:
|
||||
std::queue<T *> q_;
|
||||
SemaphoreHandle_t m_;
|
||||
T *buffer_[SIZE];
|
||||
std::atomic<size_t> write_index_;
|
||||
std::atomic<size_t> read_index_;
|
||||
std::atomic<size_t> size_;
|
||||
std::atomic<size_t> dropped_count_;
|
||||
};
|
||||
|
||||
} // namespace esp32_ble
|
||||
|
Reference in New Issue
Block a user