mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-30 22:53:59 +00:00 
			
		
		
		
	[script] Fix parallel mode scripts with delays cancelling each other (#10324)
This commit is contained in:
		| @@ -5,6 +5,8 @@ | ||||
| #include "esphome/core/hal.h" | ||||
| #include "esphome/core/defines.h" | ||||
| #include "esphome/core/preferences.h" | ||||
| #include "esphome/core/scheduler.h" | ||||
| #include "esphome/core/application.h" | ||||
|  | ||||
| #include <vector> | ||||
|  | ||||
| @@ -158,7 +160,16 @@ template<typename... Ts> class DelayAction : public Action<Ts...>, public Compon | ||||
|   void play_complex(Ts... x) override { | ||||
|     auto f = std::bind(&DelayAction<Ts...>::play_next_, this, x...); | ||||
|     this->num_running_++; | ||||
|     this->set_timeout("delay", this->delay_.value(x...), f); | ||||
|  | ||||
|     // If num_running_ > 1, we have multiple instances running in parallel | ||||
|     // In single/restart/queued modes, only one instance runs at a time | ||||
|     // Parallel mode uses skip_cancel=true to allow multiple delays to coexist | ||||
|     // WARNING: This can accumulate delays if scripts are triggered faster than they complete! | ||||
|     // Users should set max_runs on parallel scripts to limit concurrent executions. | ||||
|     // Issue #10264: This is a workaround for parallel script delays interfering with each other. | ||||
|     App.scheduler.set_timer_common_(this, Scheduler::SchedulerItem::TIMEOUT, | ||||
|                                     /* is_static_string= */ true, "delay", this->delay_.value(x...), std::move(f), | ||||
|                                     /* is_retry= */ false, /* skip_cancel= */ this->num_running_ > 1); | ||||
|   } | ||||
|   float get_setup_priority() const override { return setup_priority::HARDWARE; } | ||||
|  | ||||
|   | ||||
| @@ -65,14 +65,17 @@ static void validate_static_string(const char *name) { | ||||
|  | ||||
| // Common implementation for both timeout and interval | ||||
| void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type type, bool is_static_string, | ||||
|                                       const void *name_ptr, uint32_t delay, std::function<void()> func, bool is_retry) { | ||||
|                                       const void *name_ptr, uint32_t delay, std::function<void()> func, bool is_retry, | ||||
|                                       bool skip_cancel) { | ||||
|   // Get the name as const char* | ||||
|   const char *name_cstr = this->get_name_cstr_(is_static_string, name_ptr); | ||||
|  | ||||
|   if (delay == SCHEDULER_DONT_RUN) { | ||||
|     // Still need to cancel existing timer if name is not empty | ||||
|     LockGuard guard{this->lock_}; | ||||
|     this->cancel_item_locked_(component, name_cstr, type); | ||||
|     if (!skip_cancel) { | ||||
|       LockGuard guard{this->lock_}; | ||||
|       this->cancel_item_locked_(component, name_cstr, type); | ||||
|     } | ||||
|     return; | ||||
|   } | ||||
|  | ||||
| @@ -97,7 +100,9 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type | ||||
|   if (delay == 0 && type == SchedulerItem::TIMEOUT) { | ||||
|     // Put in defer queue for guaranteed FIFO execution | ||||
|     LockGuard guard{this->lock_}; | ||||
|     this->cancel_item_locked_(component, name_cstr, type); | ||||
|     if (!skip_cancel) { | ||||
|       this->cancel_item_locked_(component, name_cstr, type); | ||||
|     } | ||||
|     this->defer_queue_.push_back(std::move(item)); | ||||
|     return; | ||||
|   } | ||||
| @@ -150,9 +155,11 @@ void HOT Scheduler::set_timer_common_(Component *component, SchedulerItem::Type | ||||
|     return; | ||||
|   } | ||||
|  | ||||
|   // If name is provided, do atomic cancel-and-add | ||||
|   // If name is provided, do atomic cancel-and-add (unless skip_cancel is true) | ||||
|   // Cancel existing items | ||||
|   this->cancel_item_locked_(component, name_cstr, type); | ||||
|   if (!skip_cancel) { | ||||
|     this->cancel_item_locked_(component, name_cstr, type); | ||||
|   } | ||||
|   // Add new item directly to to_add_ | ||||
|   // since we have the lock held | ||||
|   this->to_add_.push_back(std::move(item)); | ||||
|   | ||||
| @@ -21,8 +21,13 @@ struct RetryArgs; | ||||
| void retry_handler(const std::shared_ptr<RetryArgs> &args); | ||||
|  | ||||
| class Scheduler { | ||||
|   // Allow retry_handler to access protected members | ||||
|   // Allow retry_handler to access protected members for internal retry mechanism | ||||
|   friend void ::esphome::retry_handler(const std::shared_ptr<RetryArgs> &args); | ||||
|   // Allow DelayAction to call set_timer_common_ with skip_cancel=true for parallel script delays. | ||||
|   // This is needed to fix issue #10264 where parallel scripts with delays interfere with each other. | ||||
|   // We use friend instead of a public API because skip_cancel is dangerous - it can cause delays | ||||
|   // to accumulate and overload the scheduler if misused. | ||||
|   template<typename... Ts> friend class DelayAction; | ||||
|  | ||||
|  public: | ||||
|   // Public API - accepts std::string for backward compatibility | ||||
| @@ -184,7 +189,7 @@ class Scheduler { | ||||
|  | ||||
|   // Common implementation for both timeout and interval | ||||
|   void set_timer_common_(Component *component, SchedulerItem::Type type, bool is_static_string, const void *name_ptr, | ||||
|                          uint32_t delay, std::function<void()> func, bool is_retry = false); | ||||
|                          uint32_t delay, std::function<void()> func, bool is_retry = false, bool skip_cancel = false); | ||||
|  | ||||
|   // Common implementation for retry | ||||
|   void set_retry_common_(Component *component, bool is_static_string, const void *name_ptr, uint32_t initial_wait_time, | ||||
|   | ||||
							
								
								
									
										45
									
								
								tests/integration/fixtures/parallel_script_delays.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										45
									
								
								tests/integration/fixtures/parallel_script_delays.yaml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,45 @@ | ||||
| esphome: | ||||
|   name: test-parallel-delays | ||||
|  | ||||
| host: | ||||
|  | ||||
| logger: | ||||
|   level: DEBUG | ||||
|  | ||||
| api: | ||||
|   actions: | ||||
|     - action: test_parallel_delays | ||||
|       then: | ||||
|         # Start three parallel script instances with small delays between starts | ||||
|         - globals.set: | ||||
|             id: instance_counter | ||||
|             value: '1' | ||||
|         - script.execute: parallel_delay_script | ||||
|         - delay: 10ms | ||||
|         - globals.set: | ||||
|             id: instance_counter | ||||
|             value: '2' | ||||
|         - script.execute: parallel_delay_script | ||||
|         - delay: 10ms | ||||
|         - globals.set: | ||||
|             id: instance_counter | ||||
|             value: '3' | ||||
|         - script.execute: parallel_delay_script | ||||
|  | ||||
| globals: | ||||
|   - id: instance_counter | ||||
|     type: int | ||||
|     initial_value: '0' | ||||
|  | ||||
| script: | ||||
|   - id: parallel_delay_script | ||||
|     mode: parallel | ||||
|     then: | ||||
|       - lambda: !lambda |- | ||||
|           int instance = id(instance_counter); | ||||
|           ESP_LOGI("TEST", "Parallel script instance %d started", instance); | ||||
|       - delay: 1s | ||||
|       - lambda: !lambda |- | ||||
|           static int completed_counter = 0; | ||||
|           completed_counter++; | ||||
|           ESP_LOGI("TEST", "Parallel script instance %d completed after delay", completed_counter); | ||||
| @@ -89,3 +89,73 @@ async def test_delay_action_cancellation( | ||||
|         assert 0.4 < time_from_second_start < 0.6, ( | ||||
|             f"Delay completed {time_from_second_start:.3f}s after second start, expected ~0.5s" | ||||
|         ) | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_parallel_script_delays( | ||||
|     yaml_config: str, | ||||
|     run_compiled: RunCompiledFunction, | ||||
|     api_client_connected: APIClientConnectedFactory, | ||||
| ) -> None: | ||||
|     """Test that parallel scripts with delays don't interfere with each other.""" | ||||
|     loop = asyncio.get_running_loop() | ||||
|  | ||||
|     # Track script executions | ||||
|     script_starts: list[float] = [] | ||||
|     script_ends: list[float] = [] | ||||
|  | ||||
|     # Patterns to match | ||||
|     start_pattern = re.compile(r"Parallel script instance \d+ started") | ||||
|     end_pattern = re.compile(r"Parallel script instance \d+ completed after delay") | ||||
|  | ||||
|     # Future to track when all scripts have completed | ||||
|     all_scripts_completed = loop.create_future() | ||||
|  | ||||
|     def check_output(line: str) -> None: | ||||
|         """Check log output for parallel script messages.""" | ||||
|         current_time = loop.time() | ||||
|  | ||||
|         if start_pattern.search(line): | ||||
|             script_starts.append(current_time) | ||||
|  | ||||
|         if end_pattern.search(line): | ||||
|             script_ends.append(current_time) | ||||
|             # Check if we have all 3 completions | ||||
|             if len(script_ends) == 3 and not all_scripts_completed.done(): | ||||
|                 all_scripts_completed.set_result(True) | ||||
|  | ||||
|     async with ( | ||||
|         run_compiled(yaml_config, line_callback=check_output), | ||||
|         api_client_connected() as client, | ||||
|     ): | ||||
|         # Get services | ||||
|         entities, services = await client.list_entities_services() | ||||
|  | ||||
|         # Find our test service | ||||
|         test_service = next( | ||||
|             (s for s in services if s.name == "test_parallel_delays"), None | ||||
|         ) | ||||
|         assert test_service is not None, "test_parallel_delays service not found" | ||||
|  | ||||
|         # Execute the test - this will start 3 parallel scripts with 1 second delays | ||||
|         client.execute_service(test_service, {}) | ||||
|  | ||||
|         # Wait for all scripts to complete (should take ~1 second, not 3) | ||||
|         await asyncio.wait_for(all_scripts_completed, timeout=2.0) | ||||
|  | ||||
|         # Verify we had 3 starts and 3 ends | ||||
|         assert len(script_starts) == 3, ( | ||||
|             f"Expected 3 script starts, got {len(script_starts)}" | ||||
|         ) | ||||
|         assert len(script_ends) == 3, f"Expected 3 script ends, got {len(script_ends)}" | ||||
|  | ||||
|         # Verify they ran in parallel - all should complete within ~1.5 seconds | ||||
|         first_start = min(script_starts) | ||||
|         last_end = max(script_ends) | ||||
|         total_time = last_end - first_start | ||||
|  | ||||
|         # If running in parallel, total time should be close to 1 second | ||||
|         # If they were interfering (running sequentially), it would take 3+ seconds | ||||
|         assert total_time < 1.5, ( | ||||
|             f"Parallel scripts took {total_time:.2f}s total, should be ~1s if running in parallel" | ||||
|         ) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user