diff --git a/esphome/core/scheduler.cpp b/esphome/core/scheduler.cpp index 0d4715f621..11d59c2499 100644 --- a/esphome/core/scheduler.cpp +++ b/esphome/core/scheduler.cpp @@ -316,59 +316,37 @@ optional HOT Scheduler::next_schedule_in(uint32_t now) { return 0; return next_exec - now_64; } + +void Scheduler::full_cleanup_removed_items_() { + // We hold the lock for the entire cleanup operation because: + // 1. We're rebuilding the entire items_ list, so we need exclusive access throughout + // 2. Other threads must see either the old state or the new state, not intermediate states + // 3. The operation is already expensive (O(n)), so lock overhead is negligible + // 4. No operations inside can block or take other locks, so no deadlock risk + LockGuard guard{this->lock_}; + + std::vector> valid_items; + + // Move all non-removed items to valid_items, recycle removed ones + for (auto &item : this->items_) { + if (!is_item_removed_(item.get())) { + valid_items.push_back(std::move(item)); + } else { + // Recycle removed items + this->recycle_item_(std::move(item)); + } + } + + // Replace items_ with the filtered list + this->items_ = std::move(valid_items); + // Rebuild the heap structure since items are no longer in heap order + std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp); + this->to_remove_ = 0; +} + void HOT Scheduler::call(uint32_t now) { #ifndef ESPHOME_THREAD_SINGLE - // Process defer queue first to guarantee FIFO execution order for deferred items. - // Previously, defer() used the heap which gave undefined order for equal timestamps, - // causing race conditions on multi-core systems (ESP32, BK7200). - // With the defer queue: - // - Deferred items (delay=0) go directly to defer_queue_ in set_timer_common_ - // - Items execute in exact order they were deferred (FIFO guarantee) - // - No deferred items exist in to_add_, so processing order doesn't affect correctness - // Single-core platforms don't use this queue and fall back to the heap-based approach. - // - // Note: Items cancelled via cancel_item_locked_() are marked with remove=true but still - // processed here. They are skipped during execution by should_skip_item_(). - // This is intentional - no memory leak occurs. - // - // We use an index (defer_queue_front_) to track the read position instead of calling - // erase() on every pop, which would be O(n). The queue is processed once per loop - - // any items added during processing are left for the next loop iteration. - - // Snapshot the queue end point - only process items that existed at loop start - // Items added during processing (by callbacks or other threads) run next loop - // No lock needed: single consumer (main loop), stale read just means we process less this iteration - size_t defer_queue_end = this->defer_queue_.size(); - - while (this->defer_queue_front_ < defer_queue_end) { - std::unique_ptr item; - { - LockGuard lock(this->lock_); - // SAFETY: Moving out the unique_ptr leaves a nullptr in the vector at defer_queue_front_. - // This is intentional and safe because: - // 1. The vector is only cleaned up by cleanup_defer_queue_locked_() at the end of this function - // 2. Any code iterating defer_queue_ MUST check for nullptr items (see mark_matching_items_removed_ - // and has_cancelled_timeout_in_container_ in scheduler.h) - // 3. The lock protects concurrent access, but the nullptr remains until cleanup - item = std::move(this->defer_queue_[this->defer_queue_front_]); - this->defer_queue_front_++; - } - - // Execute callback without holding lock to prevent deadlocks - // if the callback tries to call defer() again - if (!this->should_skip_item_(item.get())) { - now = this->execute_item_(item.get(), now); - } - // Recycle the defer item after execution - this->recycle_item_(std::move(item)); - } - - // If we've consumed all items up to the snapshot point, clean up the dead space - // Single consumer (main loop), so no lock needed for this check - if (this->defer_queue_front_ >= defer_queue_end) { - LockGuard lock(this->lock_); - this->cleanup_defer_queue_locked_(); - } + this->process_defer_queue_(now); #endif /* not ESPHOME_THREAD_SINGLE */ // Convert the fresh timestamp from main loop to 64-bit for scheduler operations @@ -429,30 +407,7 @@ void HOT Scheduler::call(uint32_t now) { // If we still have too many cancelled items, do a full cleanup // This only happens if cancelled items are stuck in the middle/bottom of the heap if (this->to_remove_ >= MAX_LOGICALLY_DELETED_ITEMS) { - // We hold the lock for the entire cleanup operation because: - // 1. We're rebuilding the entire items_ list, so we need exclusive access throughout - // 2. Other threads must see either the old state or the new state, not intermediate states - // 3. The operation is already expensive (O(n)), so lock overhead is negligible - // 4. No operations inside can block or take other locks, so no deadlock risk - LockGuard guard{this->lock_}; - - std::vector> valid_items; - - // Move all non-removed items to valid_items, recycle removed ones - for (auto &item : this->items_) { - if (!is_item_removed_(item.get())) { - valid_items.push_back(std::move(item)); - } else { - // Recycle removed items - this->recycle_item_(std::move(item)); - } - } - - // Replace items_ with the filtered list - this->items_ = std::move(valid_items); - // Rebuild the heap structure since items are no longer in heap order - std::make_heap(this->items_.begin(), this->items_.end(), SchedulerItem::cmp); - this->to_remove_ = 0; + this->full_cleanup_removed_items_(); } while (!this->items_.empty()) { // Don't copy-by value yet diff --git a/esphome/core/scheduler.h b/esphome/core/scheduler.h index df0be0e4ce..f6ec07294d 100644 --- a/esphome/core/scheduler.h +++ b/esphome/core/scheduler.h @@ -263,7 +263,65 @@ class Scheduler { // Helper to recycle a SchedulerItem void recycle_item_(std::unique_ptr item); + // Helper to perform full cleanup when too many items are cancelled + void full_cleanup_removed_items_(); + #ifndef ESPHOME_THREAD_SINGLE + // Helper to process defer queue - inline for performance in hot path + inline void process_defer_queue_(uint32_t &now) { + // Process defer queue first to guarantee FIFO execution order for deferred items. + // Previously, defer() used the heap which gave undefined order for equal timestamps, + // causing race conditions on multi-core systems (ESP32, BK7200). + // With the defer queue: + // - Deferred items (delay=0) go directly to defer_queue_ in set_timer_common_ + // - Items execute in exact order they were deferred (FIFO guarantee) + // - No deferred items exist in to_add_, so processing order doesn't affect correctness + // Single-core platforms don't use this queue and fall back to the heap-based approach. + // + // Note: Items cancelled via cancel_item_locked_() are marked with remove=true but still + // processed here. They are skipped during execution by should_skip_item_(). + // This is intentional - no memory leak occurs. + // + // We use an index (defer_queue_front_) to track the read position instead of calling + // erase() on every pop, which would be O(n). The queue is processed once per loop - + // any items added during processing are left for the next loop iteration. + + // Snapshot the queue end point - only process items that existed at loop start + // Items added during processing (by callbacks or other threads) run next loop + // No lock needed: single consumer (main loop), stale read just means we process less this iteration + size_t defer_queue_end = this->defer_queue_.size(); + + while (this->defer_queue_front_ < defer_queue_end) { + std::unique_ptr item; + { + LockGuard lock(this->lock_); + // SAFETY: Moving out the unique_ptr leaves a nullptr in the vector at defer_queue_front_. + // This is intentional and safe because: + // 1. The vector is only cleaned up by cleanup_defer_queue_locked_() at the end of this function + // 2. Any code iterating defer_queue_ MUST check for nullptr items (see mark_matching_items_removed_ + // and has_cancelled_timeout_in_container_ in scheduler.h) + // 3. The lock protects concurrent access, but the nullptr remains until cleanup + item = std::move(this->defer_queue_[this->defer_queue_front_]); + this->defer_queue_front_++; + } + + // Execute callback without holding lock to prevent deadlocks + // if the callback tries to call defer() again + if (!this->should_skip_item_(item.get())) { + now = this->execute_item_(item.get(), now); + } + // Recycle the defer item after execution + this->recycle_item_(std::move(item)); + } + + // If we've consumed all items up to the snapshot point, clean up the dead space + // Single consumer (main loop), so no lock needed for this check + if (this->defer_queue_front_ >= defer_queue_end) { + LockGuard lock(this->lock_); + this->cleanup_defer_queue_locked_(); + } + } + // Helper to cleanup defer_queue_ after processing // IMPORTANT: Caller must hold the scheduler lock before calling this function. inline void cleanup_defer_queue_locked_() {