mirror of
https://github.com/esphome/esphome.git
synced 2025-09-15 01:32:19 +01:00
simplify
This commit is contained in:
@@ -366,21 +366,29 @@ void ESP32BLE::loop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
template<typename... Args> void enqueue_ble_event(Args... args) {
|
template<typename... Args> void enqueue_ble_event(Args... args) {
|
||||||
// Check if buffer is full before allocating
|
// Check if queue is full before allocating
|
||||||
if (global_ble->ble_events_.size() >= (MAX_BLE_QUEUE_SIZE - 1)) {
|
if (global_ble->ble_events_.full()) {
|
||||||
// Buffer is full, push will fail and increment dropped count internally
|
// Queue is full, drop the event
|
||||||
|
global_ble->ble_events_.increment_dropped_count();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
BLEEvent *new_event = EVENT_ALLOCATOR.allocate(1);
|
BLEEvent *new_event = EVENT_ALLOCATOR.allocate(1);
|
||||||
if (new_event == nullptr) {
|
if (new_event == nullptr) {
|
||||||
// Memory too fragmented to allocate new event. Can only drop it until memory comes back
|
// Memory too fragmented to allocate new event. Can only drop it until memory comes back
|
||||||
|
global_ble->ble_events_.increment_dropped_count();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
new (new_event) BLEEvent(args...);
|
new (new_event) BLEEvent(args...);
|
||||||
|
|
||||||
// With atomic size, this should never fail due to the size check above
|
// Push the event - since we're the only producer and we checked full() above,
|
||||||
global_ble->ble_events_.push(new_event);
|
// this should always succeed unless we have a bug
|
||||||
|
if (!global_ble->ble_events_.push(new_event)) {
|
||||||
|
// This should not happen in SPSC queue with single producer
|
||||||
|
ESP_LOGE(TAG, "BLE queue push failed unexpectedly");
|
||||||
|
new_event->~BLEEvent();
|
||||||
|
EVENT_ALLOCATOR.deallocate(new_event, 1);
|
||||||
|
}
|
||||||
} // NOLINT(clang-analyzer-unix.Malloc)
|
} // NOLINT(clang-analyzer-unix.Malloc)
|
||||||
|
|
||||||
// Explicit template instantiations for the friend function
|
// Explicit template instantiations for the friend function
|
||||||
|
@@ -30,8 +30,8 @@ static constexpr uint8_t SCAN_RESULT_BUFFER_SIZE = 32;
|
|||||||
static constexpr uint8_t SCAN_RESULT_BUFFER_SIZE = 20;
|
static constexpr uint8_t SCAN_RESULT_BUFFER_SIZE = 20;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// Maximum size of the BLE event queue
|
// Maximum size of the BLE event queue - must be power of 2 for lock-free queue
|
||||||
static constexpr size_t MAX_BLE_QUEUE_SIZE = SCAN_RESULT_BUFFER_SIZE * 2;
|
static constexpr size_t MAX_BLE_QUEUE_SIZE = 64;
|
||||||
|
|
||||||
uint64_t ble_addr_to_uint64(const esp_bd_addr_t address);
|
uint64_t ble_addr_to_uint64(const esp_bd_addr_t address);
|
||||||
|
|
||||||
|
@@ -11,8 +11,8 @@
|
|||||||
* task to enqueue events without blocking. The main loop() then processes
|
* task to enqueue events without blocking. The main loop() then processes
|
||||||
* these events at a safer time.
|
* these events at a safer time.
|
||||||
*
|
*
|
||||||
* The queue uses atomic operations to ensure thread safety without locks.
|
* This is a Single-Producer Single-Consumer (SPSC) lock-free ring buffer.
|
||||||
* This prevents blocking the time-sensitive BLE stack callbacks.
|
* The BLE task is the only producer, and the main loop() is the only consumer.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
namespace esphome {
|
namespace esphome {
|
||||||
@@ -20,61 +20,63 @@ namespace esp32_ble {
|
|||||||
|
|
||||||
template<class T, size_t SIZE> class LockFreeQueue {
|
template<class T, size_t SIZE> class LockFreeQueue {
|
||||||
public:
|
public:
|
||||||
LockFreeQueue() : write_index_(0), read_index_(0), size_(0), dropped_count_(0) {}
|
LockFreeQueue() : head_(0), tail_(0), dropped_count_(0) {}
|
||||||
|
|
||||||
bool push(T *element) {
|
bool push(T *element) {
|
||||||
if (element == nullptr)
|
if (element == nullptr)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
size_t current_size = size_.load(std::memory_order_acquire);
|
size_t current_tail = tail_.load(std::memory_order_relaxed);
|
||||||
if (current_size >= SIZE - 1) {
|
size_t next_tail = (current_tail + 1) % SIZE;
|
||||||
// Buffer full, track dropped event
|
|
||||||
|
if (next_tail == head_.load(std::memory_order_acquire)) {
|
||||||
|
// Buffer full
|
||||||
dropped_count_.fetch_add(1, std::memory_order_relaxed);
|
dropped_count_.fetch_add(1, std::memory_order_relaxed);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t write_idx = write_index_.load(std::memory_order_relaxed);
|
buffer_[current_tail] = element;
|
||||||
size_t next_write_idx = (write_idx + 1) % SIZE;
|
tail_.store(next_tail, std::memory_order_release);
|
||||||
|
|
||||||
// Store element in buffer
|
|
||||||
buffer_[write_idx] = element;
|
|
||||||
write_index_.store(next_write_idx, std::memory_order_release);
|
|
||||||
size_.fetch_add(1, std::memory_order_release);
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
T *pop() {
|
T *pop() {
|
||||||
size_t current_size = size_.load(std::memory_order_acquire);
|
size_t current_head = head_.load(std::memory_order_relaxed);
|
||||||
if (current_size == 0) {
|
|
||||||
return nullptr;
|
if (current_head == tail_.load(std::memory_order_acquire)) {
|
||||||
|
return nullptr; // Empty
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t read_idx = read_index_.load(std::memory_order_relaxed);
|
T *element = buffer_[current_head];
|
||||||
|
head_.store((current_head + 1) % SIZE, std::memory_order_release);
|
||||||
// Get element from buffer
|
|
||||||
T *element = buffer_[read_idx];
|
|
||||||
read_index_.store((read_idx + 1) % SIZE, std::memory_order_release);
|
|
||||||
size_.fetch_sub(1, std::memory_order_release);
|
|
||||||
return element;
|
return element;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t size() const { return size_.load(std::memory_order_acquire); }
|
size_t size() const {
|
||||||
|
size_t tail = tail_.load(std::memory_order_acquire);
|
||||||
|
size_t head = head_.load(std::memory_order_acquire);
|
||||||
|
return (tail - head + SIZE) % SIZE;
|
||||||
|
}
|
||||||
|
|
||||||
size_t get_and_reset_dropped_count() { return dropped_count_.exchange(0, std::memory_order_relaxed); }
|
size_t get_and_reset_dropped_count() { return dropped_count_.exchange(0, std::memory_order_relaxed); }
|
||||||
|
|
||||||
void increment_dropped_count() { dropped_count_.fetch_add(1, std::memory_order_relaxed); }
|
void increment_dropped_count() { dropped_count_.fetch_add(1, std::memory_order_relaxed); }
|
||||||
|
|
||||||
bool empty() const { return size_.load(std::memory_order_acquire) == 0; }
|
bool empty() const { return head_.load(std::memory_order_acquire) == tail_.load(std::memory_order_acquire); }
|
||||||
|
|
||||||
|
bool full() const {
|
||||||
|
size_t next_tail = (tail_.load(std::memory_order_relaxed) + 1) % SIZE;
|
||||||
|
return next_tail == head_.load(std::memory_order_acquire);
|
||||||
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
T *buffer_[SIZE];
|
T *buffer_[SIZE];
|
||||||
std::atomic<size_t> write_index_;
|
std::atomic<size_t> head_;
|
||||||
std::atomic<size_t> read_index_;
|
std::atomic<size_t> tail_;
|
||||||
std::atomic<size_t> size_;
|
|
||||||
std::atomic<size_t> dropped_count_;
|
std::atomic<size_t> dropped_count_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace esp32_ble
|
} // namespace esp32_ble
|
||||||
} // namespace esphome
|
} // namespace esphome
|
||||||
|
|
||||||
#endif
|
#endif
|
Reference in New Issue
Block a user