mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-31 15:12:06 +00:00 
			
		
		
		
	Make defer FIFO
This commit is contained in:
		| @@ -73,8 +73,6 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type | ||||
|   if (delay == SCHEDULER_DONT_RUN) | ||||
|     return; | ||||
|  | ||||
|   const auto now = this->millis_(); | ||||
|  | ||||
|   // Create and populate the scheduler item | ||||
|   auto item = make_unique<SchedulerItem>(); | ||||
|   item->component = component; | ||||
| @@ -83,6 +81,16 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type | ||||
|   item->callback = std::move(func); | ||||
|   item->remove = false; | ||||
|  | ||||
|   // Special handling for defer() (delay = 0, type = TIMEOUT) | ||||
|   if (delay == 0 && type == SchedulerItem::TIMEOUT) { | ||||
|     // Put in defer queue for guaranteed FIFO execution | ||||
|     LockGuard guard{this->lock_}; | ||||
|     this->defer_queue_.push_back(std::move(item)); | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   const auto now = this->millis_(); | ||||
|  | ||||
|   // Type-specific setup | ||||
|   if (type == SchedulerItem::INTERVAL) { | ||||
|     item->interval = delay; | ||||
| @@ -209,6 +217,28 @@ optional<uint32_t> HOT Scheduler::next_schedule_in() { | ||||
|   return item->next_execution_ - now; | ||||
| } | ||||
| void HOT Scheduler::call() { | ||||
|   // Process defer queue first to guarantee FIFO execution order for deferred items. | ||||
|   // Previously, defer() used the heap which gave undefined order for equal timestamps, | ||||
|   // causing race conditions on multi-core systems (ESP32, RP2040, BK7200). | ||||
|   // With the defer queue: | ||||
|   // - Deferred items (delay=0) go directly to defer_queue_ in set_timer_common_ | ||||
|   // - Items execute in exact order they were deferred (FIFO guarantee) | ||||
|   // - No deferred items exist in to_add_, so processing order doesn't affect correctness | ||||
|   while (!this->defer_queue_.empty()) { | ||||
|     std::unique_ptr<SchedulerItem> item; | ||||
|     { | ||||
|       LockGuard guard{this->lock_}; | ||||
|       if (this->defer_queue_.empty())  // Double-check with lock held | ||||
|         break; | ||||
|       item = std::move(this->defer_queue_.front()); | ||||
|       this->defer_queue_.pop_front(); | ||||
|     } | ||||
|     // Skip if item was marked for removal or component failed | ||||
|     if (!this->should_skip_item_(item.get())) { | ||||
|       this->execute_item_(item.get()); | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   const auto now = this->millis_(); | ||||
|   this->process_to_add(); | ||||
|  | ||||
| @@ -294,13 +324,7 @@ void HOT Scheduler::call() { | ||||
|       // Warning: During callback(), a lot of stuff can happen, including: | ||||
|       //  - timeouts/intervals get added, potentially invalidating vector pointers | ||||
|       //  - timeouts/intervals get cancelled | ||||
|       { | ||||
|         uint32_t now_ms = millis(); | ||||
|         WarnIfComponentBlockingGuard guard{item->component, now_ms}; | ||||
|         item->callback(); | ||||
|         // Call finish to ensure blocking time is properly calculated and reported | ||||
|         guard.finish(); | ||||
|       } | ||||
|       this->execute_item_(item.get()); | ||||
|     } | ||||
|  | ||||
|     { | ||||
| @@ -364,6 +388,26 @@ void HOT Scheduler::push_(std::unique_ptr<Scheduler::SchedulerItem> item) { | ||||
|   LockGuard guard{this->lock_}; | ||||
|   this->to_add_.push_back(std::move(item)); | ||||
| } | ||||
| // Helper function to check if item matches criteria for cancellation | ||||
| bool HOT Scheduler::matches_item_(const std::unique_ptr<SchedulerItem> &item, Component *component, | ||||
|                                   const char *name_cstr, SchedulerItem::Type type) { | ||||
|   if (item->component != component || item->type != type || item->remove) { | ||||
|     return false; | ||||
|   } | ||||
|   const char *item_name = item->get_name(); | ||||
|   return item_name != nullptr && strcmp(name_cstr, item_name) == 0; | ||||
| } | ||||
|  | ||||
| // Helper to execute a scheduler item | ||||
| void HOT Scheduler::execute_item_(SchedulerItem *item) { | ||||
|   App.set_current_component(item->component); | ||||
|  | ||||
|   uint32_t now_ms = millis(); | ||||
|   WarnIfComponentBlockingGuard guard{item->component, now_ms}; | ||||
|   item->callback(); | ||||
|   guard.finish(); | ||||
| } | ||||
|  | ||||
| // Common implementation for cancel operations | ||||
| bool HOT Scheduler::cancel_item_common_(Component *component, bool is_static_string, const void *name_ptr, | ||||
|                                         SchedulerItem::Type type) { | ||||
| @@ -379,19 +423,25 @@ bool HOT Scheduler::cancel_item_common_(Component *component, bool is_static_str | ||||
|   LockGuard guard{this->lock_}; | ||||
|   bool ret = false; | ||||
|  | ||||
|   for (auto &it : this->items_) { | ||||
|     const char *item_name = it->get_name(); | ||||
|     if (it->component == component && item_name != nullptr && strcmp(name_cstr, item_name) == 0 && it->type == type && | ||||
|         !it->remove) { | ||||
|       to_remove_++; | ||||
|       it->remove = true; | ||||
|   // Check all containers for matching items | ||||
|   for (auto &item : this->defer_queue_) { | ||||
|     if (this->matches_item_(item, component, name_cstr, type)) { | ||||
|       item->remove = true; | ||||
|       ret = true; | ||||
|     } | ||||
|   } | ||||
|   for (auto &it : this->to_add_) { | ||||
|     const char *item_name = it->get_name(); | ||||
|     if (it->component == component && item_name != nullptr && strcmp(name_cstr, item_name) == 0 && it->type == type) { | ||||
|       it->remove = true; | ||||
|  | ||||
|   for (auto &item : this->items_) { | ||||
|     if (this->matches_item_(item, component, name_cstr, type)) { | ||||
|       item->remove = true; | ||||
|       ret = true; | ||||
|       this->to_remove_++;  // Only track removals for heap items | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   for (auto &item : this->to_add_) { | ||||
|     if (this->matches_item_(item, component, name_cstr, type)) { | ||||
|       item->remove = true; | ||||
|       ret = true; | ||||
|     } | ||||
|   } | ||||
|   | ||||
| @@ -2,6 +2,7 @@ | ||||
|  | ||||
| #include <vector> | ||||
| #include <memory> | ||||
| #include <deque> | ||||
|  | ||||
| #include "esphome/core/component.h" | ||||
| #include "esphome/core/helpers.h" | ||||
| @@ -145,6 +146,18 @@ class Scheduler { | ||||
|   bool cancel_item_(Component *component, const std::string &name, SchedulerItem::Type type); | ||||
|   bool cancel_item_(Component *component, const char *name, SchedulerItem::Type type); | ||||
|  | ||||
|   // Helper functions for cancel operations | ||||
|   bool matches_item_(const std::unique_ptr<SchedulerItem> &item, Component *component, const char *name_cstr, | ||||
|                      SchedulerItem::Type type); | ||||
|  | ||||
|   // Helper to execute a scheduler item | ||||
|   void execute_item_(SchedulerItem *item); | ||||
|  | ||||
|   // Helper to check if item should be skipped | ||||
|   bool should_skip_item_(const SchedulerItem *item) const { | ||||
|     return item->remove || (item->component != nullptr && item->component->is_failed()); | ||||
|   } | ||||
|  | ||||
|   bool empty_() { | ||||
|     this->cleanup_(); | ||||
|     return this->items_.empty(); | ||||
| @@ -153,6 +166,7 @@ class Scheduler { | ||||
|   Mutex lock_; | ||||
|   std::vector<std::unique_ptr<SchedulerItem>> items_; | ||||
|   std::vector<std::unique_ptr<SchedulerItem>> to_add_; | ||||
|   std::deque<std::unique_ptr<SchedulerItem>> defer_queue_;  // FIFO queue for defer() calls | ||||
|   uint32_t last_millis_{0}; | ||||
|   uint16_t millis_major_{0}; | ||||
|   uint32_t to_remove_{0}; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user