mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-31 15:12:06 +00:00 
			
		
		
		
	Merge branch 'scheduler_defer_drains_each_loop' into integration
This commit is contained in:
		| @@ -344,6 +344,12 @@ void HOT Scheduler::call(uint32_t now) { | |||||||
|     std::unique_ptr<SchedulerItem> item; |     std::unique_ptr<SchedulerItem> item; | ||||||
|     { |     { | ||||||
|       LockGuard lock(this->lock_); |       LockGuard lock(this->lock_); | ||||||
|  |       // SAFETY: Moving out the unique_ptr leaves a nullptr in the vector at defer_queue_front_. | ||||||
|  |       // This is intentional and safe because: | ||||||
|  |       // 1. The vector is only cleaned up by cleanup_defer_queue_locked_() at the end of this function | ||||||
|  |       // 2. Any code iterating defer_queue_ MUST check for nullptr items (see mark_matching_items_removed_ | ||||||
|  |       //    and has_cancelled_timeout_in_container_ in scheduler.h) | ||||||
|  |       // 3. The lock protects concurrent access, but the nullptr remains until cleanup | ||||||
|       item = std::move(this->defer_queue_[this->defer_queue_front_]); |       item = std::move(this->defer_queue_[this->defer_queue_front_]); | ||||||
|       this->defer_queue_front_++; |       this->defer_queue_front_++; | ||||||
|     } |     } | ||||||
| @@ -361,29 +367,7 @@ void HOT Scheduler::call(uint32_t now) { | |||||||
|   // Single consumer (main loop), so no lock needed for this check |   // Single consumer (main loop), so no lock needed for this check | ||||||
|   if (this->defer_queue_front_ >= defer_queue_end) { |   if (this->defer_queue_front_ >= defer_queue_end) { | ||||||
|     LockGuard lock(this->lock_); |     LockGuard lock(this->lock_); | ||||||
|     // Check if new items were added by producers during processing |     this->cleanup_defer_queue_locked_(); | ||||||
|     if (this->defer_queue_front_ >= this->defer_queue_.size()) { |  | ||||||
|       // Common case: no new items - clear everything |  | ||||||
|       this->defer_queue_.clear(); |  | ||||||
|     } else { |  | ||||||
|       // Rare case: new items were added during processing - compact the vector |  | ||||||
|       // This only happens when: |  | ||||||
|       // 1. A deferred callback calls defer() again, or |  | ||||||
|       // 2. Another thread calls defer() while we're processing |  | ||||||
|       // |  | ||||||
|       // Move unprocessed items (added during this loop) to the front for next iteration |  | ||||||
|       // |  | ||||||
|       // SAFETY: Compacted items may include cancelled items (marked for removal via |  | ||||||
|       // cancel_item_locked_() during execution). This is safe because should_skip_item_() |  | ||||||
|       // checks is_item_removed_() before executing, so cancelled items will be skipped |  | ||||||
|       // and recycled on the next loop iteration. |  | ||||||
|       size_t remaining = this->defer_queue_.size() - this->defer_queue_front_; |  | ||||||
|       for (size_t i = 0; i < remaining; i++) { |  | ||||||
|         this->defer_queue_[i] = std::move(this->defer_queue_[this->defer_queue_front_ + i]); |  | ||||||
|       } |  | ||||||
|       this->defer_queue_.resize(remaining); |  | ||||||
|     } |  | ||||||
|     this->defer_queue_front_ = 0; |  | ||||||
|   } |   } | ||||||
| #endif /* not ESPHOME_THREAD_SINGLE */ | #endif /* not ESPHOME_THREAD_SINGLE */ | ||||||
|  |  | ||||||
|   | |||||||
| @@ -264,6 +264,36 @@ class Scheduler { | |||||||
|   // Helper to recycle a SchedulerItem |   // Helper to recycle a SchedulerItem | ||||||
|   void recycle_item_(std::unique_ptr<SchedulerItem> item); |   void recycle_item_(std::unique_ptr<SchedulerItem> item); | ||||||
|  |  | ||||||
|  | #ifndef ESPHOME_THREAD_SINGLE | ||||||
|  |   // Helper to cleanup defer_queue_ after processing | ||||||
|  |   // IMPORTANT: Caller must hold the scheduler lock before calling this function. | ||||||
|  |   inline void cleanup_defer_queue_locked_() { | ||||||
|  |     // Check if new items were added by producers during processing | ||||||
|  |     if (this->defer_queue_front_ >= this->defer_queue_.size()) { | ||||||
|  |       // Common case: no new items - clear everything | ||||||
|  |       this->defer_queue_.clear(); | ||||||
|  |     } else { | ||||||
|  |       // Rare case: new items were added during processing - compact the vector | ||||||
|  |       // This only happens when: | ||||||
|  |       // 1. A deferred callback calls defer() again, or | ||||||
|  |       // 2. Another thread calls defer() while we're processing | ||||||
|  |       // | ||||||
|  |       // Move unprocessed items (added during this loop) to the front for next iteration | ||||||
|  |       // | ||||||
|  |       // SAFETY: Compacted items may include cancelled items (marked for removal via | ||||||
|  |       // cancel_item_locked_() during execution). This is safe because should_skip_item_() | ||||||
|  |       // checks is_item_removed_() before executing, so cancelled items will be skipped | ||||||
|  |       // and recycled on the next loop iteration. | ||||||
|  |       size_t remaining = this->defer_queue_.size() - this->defer_queue_front_; | ||||||
|  |       for (size_t i = 0; i < remaining; i++) { | ||||||
|  |         this->defer_queue_[i] = std::move(this->defer_queue_[this->defer_queue_front_ + i]); | ||||||
|  |       } | ||||||
|  |       this->defer_queue_.resize(remaining); | ||||||
|  |     } | ||||||
|  |     this->defer_queue_front_ = 0; | ||||||
|  |   } | ||||||
|  | #endif /* not ESPHOME_THREAD_SINGLE */ | ||||||
|  |  | ||||||
|   // Helper to check if item is marked for removal (platform-specific) |   // Helper to check if item is marked for removal (platform-specific) | ||||||
|   // Returns true if item should be skipped, handles platform-specific synchronization |   // Returns true if item should be skipped, handles platform-specific synchronization | ||||||
|   // For ESPHOME_THREAD_MULTI_NO_ATOMICS platforms, the caller must hold the scheduler lock before calling this |   // For ESPHOME_THREAD_MULTI_NO_ATOMICS platforms, the caller must hold the scheduler lock before calling this | ||||||
| @@ -282,8 +312,7 @@ class Scheduler { | |||||||
|  |  | ||||||
|   // Helper to mark matching items in a container as removed |   // Helper to mark matching items in a container as removed | ||||||
|   // Returns the number of items marked for removal |   // Returns the number of items marked for removal | ||||||
|   // For ESPHOME_THREAD_MULTI_NO_ATOMICS platforms, the caller must hold the scheduler lock before calling this |   // IMPORTANT: Caller must hold the scheduler lock before calling this function. | ||||||
|   // function. |  | ||||||
|   template<typename Container> |   template<typename Container> | ||||||
|   size_t mark_matching_items_removed_(Container &container, Component *component, const char *name_cstr, |   size_t mark_matching_items_removed_(Container &container, Component *component, const char *name_cstr, | ||||||
|                                       SchedulerItem::Type type, bool match_retry) { |                                       SchedulerItem::Type type, bool match_retry) { | ||||||
| @@ -291,8 +320,8 @@ class Scheduler { | |||||||
|     for (auto &item : container) { |     for (auto &item : container) { | ||||||
|       // Skip nullptr items (can happen in defer_queue_ when items are being processed) |       // Skip nullptr items (can happen in defer_queue_ when items are being processed) | ||||||
|       // The defer_queue_ uses index-based processing: items are std::moved out but left in the |       // The defer_queue_ uses index-based processing: items are std::moved out but left in the | ||||||
|       // vector as nullptr until cleanup. If cancel_item_locked_() is called from a callback during |       // vector as nullptr until cleanup. Even though this function is called with lock held, | ||||||
|       // defer queue processing, it will iterate over these nullptr items. This check prevents crashes. |       // the vector can still contain nullptr items from the processing loop. This check prevents crashes. | ||||||
|       if (!item) |       if (!item) | ||||||
|         continue; |         continue; | ||||||
|       if (this->matches_item_(item, component, name_cstr, type, match_retry)) { |       if (this->matches_item_(item, component, name_cstr, type, match_retry)) { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user