diff --git a/esphome/components/esp32_ble/ble.cpp b/esphome/components/esp32_ble/ble.cpp index 62a6f8b91a..8adef79d2f 100644 --- a/esphome/components/esp32_ble/ble.cpp +++ b/esphome/components/esp32_ble/ble.cpp @@ -366,21 +366,29 @@ void ESP32BLE::loop() { } template void enqueue_ble_event(Args... args) { - // Check if buffer is full before allocating - if (global_ble->ble_events_.size() >= (MAX_BLE_QUEUE_SIZE - 1)) { - // Buffer is full, push will fail and increment dropped count internally + // Check if queue is full before allocating + if (global_ble->ble_events_.full()) { + // Queue is full, drop the event + global_ble->ble_events_.increment_dropped_count(); return; } BLEEvent *new_event = EVENT_ALLOCATOR.allocate(1); if (new_event == nullptr) { // Memory too fragmented to allocate new event. Can only drop it until memory comes back + global_ble->ble_events_.increment_dropped_count(); return; } new (new_event) BLEEvent(args...); - // With atomic size, this should never fail due to the size check above - global_ble->ble_events_.push(new_event); + // Push the event - since we're the only producer and we checked full() above, + // this should always succeed unless we have a bug + if (!global_ble->ble_events_.push(new_event)) { + // This should not happen in SPSC queue with single producer + ESP_LOGE(TAG, "BLE queue push failed unexpectedly"); + new_event->~BLEEvent(); + EVENT_ALLOCATOR.deallocate(new_event, 1); + } } // NOLINT(clang-analyzer-unix.Malloc) // Explicit template instantiations for the friend function diff --git a/esphome/components/esp32_ble/ble.h b/esphome/components/esp32_ble/ble.h index 364a5f7608..58c064a2ef 100644 --- a/esphome/components/esp32_ble/ble.h +++ b/esphome/components/esp32_ble/ble.h @@ -30,8 +30,8 @@ static constexpr uint8_t SCAN_RESULT_BUFFER_SIZE = 32; static constexpr uint8_t SCAN_RESULT_BUFFER_SIZE = 20; #endif -// Maximum size of the BLE event queue -static constexpr size_t MAX_BLE_QUEUE_SIZE = SCAN_RESULT_BUFFER_SIZE * 2; +// Maximum size of the BLE event queue - must be power of 2 for lock-free queue +static constexpr size_t MAX_BLE_QUEUE_SIZE = 64; uint64_t ble_addr_to_uint64(const esp_bd_addr_t address); diff --git a/esphome/components/esp32_ble/queue.h b/esphome/components/esp32_ble/queue.h index 09bc7c886c..ce6acd1c96 100644 --- a/esphome/components/esp32_ble/queue.h +++ b/esphome/components/esp32_ble/queue.h @@ -11,8 +11,8 @@ * task to enqueue events without blocking. The main loop() then processes * these events at a safer time. * - * The queue uses atomic operations to ensure thread safety without locks. - * This prevents blocking the time-sensitive BLE stack callbacks. + * This is a Single-Producer Single-Consumer (SPSC) lock-free ring buffer. + * The BLE task is the only producer, and the main loop() is the only consumer. */ namespace esphome { @@ -20,61 +20,63 @@ namespace esp32_ble { template class LockFreeQueue { public: - LockFreeQueue() : write_index_(0), read_index_(0), size_(0), dropped_count_(0) {} + LockFreeQueue() : head_(0), tail_(0), dropped_count_(0) {} bool push(T *element) { if (element == nullptr) return false; - size_t current_size = size_.load(std::memory_order_acquire); - if (current_size >= SIZE - 1) { - // Buffer full, track dropped event + size_t current_tail = tail_.load(std::memory_order_relaxed); + size_t next_tail = (current_tail + 1) % SIZE; + + if (next_tail == head_.load(std::memory_order_acquire)) { + // Buffer full dropped_count_.fetch_add(1, std::memory_order_relaxed); return false; } - size_t write_idx = write_index_.load(std::memory_order_relaxed); - size_t next_write_idx = (write_idx + 1) % SIZE; - - // Store element in buffer - buffer_[write_idx] = element; - write_index_.store(next_write_idx, std::memory_order_release); - size_.fetch_add(1, std::memory_order_release); + buffer_[current_tail] = element; + tail_.store(next_tail, std::memory_order_release); return true; } T *pop() { - size_t current_size = size_.load(std::memory_order_acquire); - if (current_size == 0) { - return nullptr; + size_t current_head = head_.load(std::memory_order_relaxed); + + if (current_head == tail_.load(std::memory_order_acquire)) { + return nullptr; // Empty } - size_t read_idx = read_index_.load(std::memory_order_relaxed); - - // Get element from buffer - T *element = buffer_[read_idx]; - read_index_.store((read_idx + 1) % SIZE, std::memory_order_release); - size_.fetch_sub(1, std::memory_order_release); + T *element = buffer_[current_head]; + head_.store((current_head + 1) % SIZE, std::memory_order_release); return element; } - size_t size() const { return size_.load(std::memory_order_acquire); } + size_t size() const { + size_t tail = tail_.load(std::memory_order_acquire); + size_t head = head_.load(std::memory_order_acquire); + return (tail - head + SIZE) % SIZE; + } size_t get_and_reset_dropped_count() { return dropped_count_.exchange(0, std::memory_order_relaxed); } void increment_dropped_count() { dropped_count_.fetch_add(1, std::memory_order_relaxed); } - bool empty() const { return size_.load(std::memory_order_acquire) == 0; } + bool empty() const { return head_.load(std::memory_order_acquire) == tail_.load(std::memory_order_acquire); } + + bool full() const { + size_t next_tail = (tail_.load(std::memory_order_relaxed) + 1) % SIZE; + return next_tail == head_.load(std::memory_order_acquire); + } protected: T *buffer_[SIZE]; - std::atomic write_index_; - std::atomic read_index_; - std::atomic size_; + std::atomic head_; + std::atomic tail_; std::atomic dropped_count_; }; } // namespace esp32_ble } // namespace esphome -#endif +#endif \ No newline at end of file