mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-31 15:12:06 +00:00 
			
		
		
		
	Merge branch 'unbound_queued_script_fix' into ci_impact_analysis_script
This commit is contained in:
		| @@ -45,13 +45,26 @@ def get_script(script_id): | ||||
|  | ||||
|  | ||||
| def check_max_runs(value): | ||||
|     # Set default for queued mode to prevent unbounded queue growth | ||||
|     if CONF_MAX_RUNS not in value and value[CONF_MODE] == CONF_QUEUED: | ||||
|         value[CONF_MAX_RUNS] = 5 | ||||
|  | ||||
|     if CONF_MAX_RUNS not in value: | ||||
|         return value | ||||
|  | ||||
|     if value[CONF_MODE] not in [CONF_QUEUED, CONF_PARALLEL]: | ||||
|         raise cv.Invalid( | ||||
|             "The option 'max_runs' is only valid in 'queue' and 'parallel' mode.", | ||||
|             "The option 'max_runs' is only valid in 'queued' and 'parallel' mode.", | ||||
|             path=[CONF_MAX_RUNS], | ||||
|         ) | ||||
|  | ||||
|     # Queued mode must have bounded queue (min 1), parallel mode can be unlimited (0) | ||||
|     if value[CONF_MODE] == CONF_QUEUED and value[CONF_MAX_RUNS] < 1: | ||||
|         raise cv.Invalid( | ||||
|             "The option 'max_runs' must be at least 1 for queued mode.", | ||||
|             path=[CONF_MAX_RUNS], | ||||
|         ) | ||||
|  | ||||
|     return value | ||||
|  | ||||
|  | ||||
| @@ -106,7 +119,7 @@ CONFIG_SCHEMA = automation.validate_automation( | ||||
|         cv.Optional(CONF_MODE, default=CONF_SINGLE): cv.one_of( | ||||
|             *SCRIPT_MODES, lower=True | ||||
|         ), | ||||
|         cv.Optional(CONF_MAX_RUNS): cv.positive_int, | ||||
|         cv.Optional(CONF_MAX_RUNS): cv.int_range(min=0, max=100), | ||||
|         cv.Optional(CONF_PARAMETERS, default={}): cv.Schema( | ||||
|             { | ||||
|                 validate_parameter_name: validate_parameter_type, | ||||
|   | ||||
| @@ -1,10 +1,11 @@ | ||||
| #pragma once | ||||
|  | ||||
| #include <memory> | ||||
| #include <tuple> | ||||
| #include "esphome/core/automation.h" | ||||
| #include "esphome/core/component.h" | ||||
| #include "esphome/core/helpers.h" | ||||
| #include "esphome/core/log.h" | ||||
|  | ||||
| #include <queue> | ||||
| namespace esphome { | ||||
| namespace script { | ||||
|  | ||||
| @@ -96,14 +97,27 @@ template<typename... Ts> class RestartScript : public Script<Ts...> { | ||||
| /** A script type that queues new instances that are created. | ||||
|  * | ||||
|  * Only one instance of the script can be active at a time. | ||||
|  * | ||||
|  * Ring buffer implementation: | ||||
|  * - num_queued_ tracks the number of queued (waiting) instances, NOT including the currently running one | ||||
|  * - queue_front_ points to the next item to execute (read position) | ||||
|  * - Buffer size is max_runs_ - 1 (max total instances minus the running one) | ||||
|  * - Write position is calculated as: (queue_front_ + num_queued_) % (max_runs_ - 1) | ||||
|  * - When an item finishes, queue_front_ advances: (queue_front_ + 1) % (max_runs_ - 1) | ||||
|  * - First execute() runs immediately without queuing (num_queued_ stays 0) | ||||
|  * - Subsequent executes while running are queued starting at position 0 | ||||
|  * - Maximum total instances = max_runs_ (includes 1 running + (max_runs_ - 1) queued) | ||||
|  */ | ||||
| template<typename... Ts> class QueueingScript : public Script<Ts...>, public Component { | ||||
|  public: | ||||
|   void execute(Ts... x) override { | ||||
|     if (this->is_action_running() || this->num_runs_ > 0) { | ||||
|       // num_runs_ is the number of *queued* instances, so total number of instances is | ||||
|       // num_runs_ + 1 | ||||
|       if (this->max_runs_ != 0 && this->num_runs_ + 1 >= this->max_runs_) { | ||||
|     this->lazy_init_queue_(); | ||||
|  | ||||
|     if (this->is_action_running() || this->num_queued_ > 0) { | ||||
|       // num_queued_ is the number of *queued* instances (waiting, not including currently running) | ||||
|       // max_runs_ is the maximum *total* instances (running + queued) | ||||
|       // So we reject when num_queued_ + 1 >= max_runs_ (queued + running >= max) | ||||
|       if (this->num_queued_ + 1 >= this->max_runs_) { | ||||
|         this->esp_logw_(__LINE__, ESPHOME_LOG_FORMAT("Script '%s' maximum number of queued runs exceeded!"), | ||||
|                         LOG_STR_ARG(this->name_)); | ||||
|         return; | ||||
| @@ -111,8 +125,11 @@ template<typename... Ts> class QueueingScript : public Script<Ts...>, public Com | ||||
|  | ||||
|       this->esp_logd_(__LINE__, ESPHOME_LOG_FORMAT("Script '%s' queueing new instance (mode: queued)"), | ||||
|                       LOG_STR_ARG(this->name_)); | ||||
|       this->num_runs_++; | ||||
|       this->var_queue_.push(std::make_tuple(x...)); | ||||
|       // Ring buffer: write to (queue_front_ + num_queued_) % (max_runs_ - 1) | ||||
|       size_t write_pos = (this->queue_front_ + this->num_queued_) % (this->max_runs_ - 1); | ||||
|       // Use reset() to replace the unique_ptr | ||||
|       this->var_queue_[write_pos].reset(new std::tuple<Ts...>(std::make_tuple(x...))); | ||||
|       this->num_queued_++; | ||||
|       return; | ||||
|     } | ||||
|  | ||||
| @@ -122,29 +139,49 @@ template<typename... Ts> class QueueingScript : public Script<Ts...>, public Com | ||||
|   } | ||||
|  | ||||
|   void stop() override { | ||||
|     this->num_runs_ = 0; | ||||
|     // Clear all queued items to free memory immediately | ||||
|     for (int i = 0; i < this->max_runs_ - 1; i++) { | ||||
|       this->var_queue_[i].reset(); | ||||
|     } | ||||
|     this->num_queued_ = 0; | ||||
|     this->queue_front_ = 0; | ||||
|     Script<Ts...>::stop(); | ||||
|   } | ||||
|  | ||||
|   void loop() override { | ||||
|     if (this->num_runs_ != 0 && !this->is_action_running()) { | ||||
|       this->num_runs_--; | ||||
|       auto &vars = this->var_queue_.front(); | ||||
|       this->var_queue_.pop(); | ||||
|       this->trigger_tuple_(vars, typename gens<sizeof...(Ts)>::type()); | ||||
|     if (this->num_queued_ != 0 && !this->is_action_running()) { | ||||
|       // Dequeue: decrement count, move tuple out (frees slot), advance read position | ||||
|       this->num_queued_--; | ||||
|       auto tuple_ptr = std::move(this->var_queue_[this->queue_front_]); | ||||
|       this->queue_front_ = (this->queue_front_ + 1) % (this->max_runs_ - 1); | ||||
|       this->trigger_tuple_(*tuple_ptr, typename gens<sizeof...(Ts)>::type()); | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   void set_max_runs(int max_runs) { max_runs_ = max_runs; } | ||||
|  | ||||
|  protected: | ||||
|   // Lazy init queue on first use - avoids setup() ordering issues and saves memory | ||||
|   // if script is never executed during this boot cycle | ||||
|   inline void lazy_init_queue_() { | ||||
|     if (this->var_queue_.capacity() == 0) { | ||||
|       // Allocate max_runs_ - 1 slots for queued items (running item is separate) | ||||
|       this->var_queue_.init(this->max_runs_ - 1); | ||||
|       // Initialize all unique_ptr slots to nullptr | ||||
|       for (int i = 0; i < this->max_runs_ - 1; i++) { | ||||
|         this->var_queue_.push_back(nullptr); | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   template<int... S> void trigger_tuple_(const std::tuple<Ts...> &tuple, seq<S...> /*unused*/) { | ||||
|     this->trigger(std::get<S>(tuple)...); | ||||
|   } | ||||
|  | ||||
|   int num_runs_ = 0; | ||||
|   int max_runs_ = 0; | ||||
|   std::queue<std::tuple<Ts...>> var_queue_; | ||||
|   int num_queued_ = 0;      // Number of queued instances (not including currently running) | ||||
|   int max_runs_ = 0;        // Maximum total instances (running + queued) | ||||
|   size_t queue_front_ = 0;  // Ring buffer read position (next item to execute) | ||||
|   FixedVector<std::unique_ptr<std::tuple<Ts...>>> var_queue_;  // Ring buffer of queued parameters | ||||
| }; | ||||
|  | ||||
| /** A script type that executes new instances in parallel. | ||||
|   | ||||
| @@ -298,6 +298,7 @@ template<typename T> class FixedVector { | ||||
|   const T &back() const { return data_[size_ - 1]; } | ||||
|  | ||||
|   size_t size() const { return size_; } | ||||
|   size_t capacity() const { return capacity_; } | ||||
|   bool empty() const { return size_ == 0; } | ||||
|  | ||||
|   /// Access element without bounds checking (matches std::vector behavior) | ||||
|   | ||||
							
								
								
									
										170
									
								
								tests/integration/fixtures/script_queued.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										170
									
								
								tests/integration/fixtures/script_queued.yaml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,170 @@ | ||||
| esphome: | ||||
|   name: test-script-queued | ||||
|  | ||||
| host: | ||||
| api: | ||||
|   actions: | ||||
|     # Test 1: Queue depth with default max_runs=5 | ||||
|     - action: test_queue_depth | ||||
|       then: | ||||
|         - logger.log: "=== TEST 1: Queue depth (max_runs=5 means 5 total, reject 6-7) ===" | ||||
|         - script.execute: | ||||
|             id: queue_depth_script | ||||
|             value: 1 | ||||
|         - script.execute: | ||||
|             id: queue_depth_script | ||||
|             value: 2 | ||||
|         - script.execute: | ||||
|             id: queue_depth_script | ||||
|             value: 3 | ||||
|         - script.execute: | ||||
|             id: queue_depth_script | ||||
|             value: 4 | ||||
|         - script.execute: | ||||
|             id: queue_depth_script | ||||
|             value: 5 | ||||
|         - script.execute: | ||||
|             id: queue_depth_script | ||||
|             value: 6 | ||||
|         - script.execute: | ||||
|             id: queue_depth_script | ||||
|             value: 7 | ||||
|  | ||||
|     # Test 2: Ring buffer wrap test | ||||
|     - action: test_ring_buffer | ||||
|       then: | ||||
|         - logger.log: "=== TEST 2: Ring buffer wrap (should process A, B, C in order) ===" | ||||
|         - script.execute: | ||||
|             id: wrap_script | ||||
|             msg: "A" | ||||
|         - script.execute: | ||||
|             id: wrap_script | ||||
|             msg: "B" | ||||
|         - script.execute: | ||||
|             id: wrap_script | ||||
|             msg: "C" | ||||
|  | ||||
|     # Test 3: Stop clears queue | ||||
|     - action: test_stop_clears | ||||
|       then: | ||||
|         - logger.log: "=== TEST 3: Stop clears queue (should only see 1, then 'STOPPED') ===" | ||||
|         - script.execute: | ||||
|             id: stop_script | ||||
|             num: 1 | ||||
|         - script.execute: | ||||
|             id: stop_script | ||||
|             num: 2 | ||||
|         - script.execute: | ||||
|             id: stop_script | ||||
|             num: 3 | ||||
|         - delay: 50ms | ||||
|         - logger.log: "STOPPING script now" | ||||
|         - script.stop: stop_script | ||||
|  | ||||
|     # Test 4: Verify rejection (max_runs=3) | ||||
|     - action: test_rejection | ||||
|       then: | ||||
|         - logger.log: "=== TEST 4: Verify rejection (max_runs=3 means 3 total, reject 4-8) ===" | ||||
|         - script.execute: | ||||
|             id: rejection_script | ||||
|             val: 1 | ||||
|         - script.execute: | ||||
|             id: rejection_script | ||||
|             val: 2 | ||||
|         - script.execute: | ||||
|             id: rejection_script | ||||
|             val: 3 | ||||
|         - script.execute: | ||||
|             id: rejection_script | ||||
|             val: 4 | ||||
|         - script.execute: | ||||
|             id: rejection_script | ||||
|             val: 5 | ||||
|         - script.execute: | ||||
|             id: rejection_script | ||||
|             val: 6 | ||||
|         - script.execute: | ||||
|             id: rejection_script | ||||
|             val: 7 | ||||
|         - script.execute: | ||||
|             id: rejection_script | ||||
|             val: 8 | ||||
|  | ||||
|     # Test 5: No parameters test | ||||
|     - action: test_no_params | ||||
|       then: | ||||
|         - logger.log: "=== TEST 5: No params (should process 3 times) ===" | ||||
|         - script.execute: no_params_script | ||||
|         - script.execute: no_params_script | ||||
|         - script.execute: no_params_script | ||||
|  | ||||
| logger: | ||||
|   level: DEBUG | ||||
|  | ||||
| script: | ||||
|   # Test script 1: Queue depth test (default max_runs=5) | ||||
|   - id: queue_depth_script | ||||
|     mode: queued | ||||
|     parameters: | ||||
|       value: int | ||||
|     then: | ||||
|       - logger.log: | ||||
|           format: "Queue test: START item %d" | ||||
|           args: ['value'] | ||||
|       - delay: 100ms | ||||
|       - logger.log: | ||||
|           format: "Queue test: END item %d" | ||||
|           args: ['value'] | ||||
|  | ||||
|   # Test script 2: Ring buffer wrap test (max_runs=3) | ||||
|   - id: wrap_script | ||||
|     mode: queued | ||||
|     max_runs: 3 | ||||
|     parameters: | ||||
|       msg: string | ||||
|     then: | ||||
|       - logger.log: | ||||
|           format: "Ring buffer: START '%s'" | ||||
|           args: ['msg.c_str()'] | ||||
|       - delay: 50ms | ||||
|       - logger.log: | ||||
|           format: "Ring buffer: END '%s'" | ||||
|           args: ['msg.c_str()'] | ||||
|  | ||||
|   # Test script 3: Stop test | ||||
|   - id: stop_script | ||||
|     mode: queued | ||||
|     max_runs: 5 | ||||
|     parameters: | ||||
|       num: int | ||||
|     then: | ||||
|       - logger.log: | ||||
|           format: "Stop test: START %d" | ||||
|           args: ['num'] | ||||
|       - delay: 100ms | ||||
|       - logger.log: | ||||
|           format: "Stop test: END %d" | ||||
|           args: ['num'] | ||||
|  | ||||
|   # Test script 4: Rejection test (max_runs=3) | ||||
|   - id: rejection_script | ||||
|     mode: queued | ||||
|     max_runs: 3 | ||||
|     parameters: | ||||
|       val: int | ||||
|     then: | ||||
|       - logger.log: | ||||
|           format: "Rejection test: START %d" | ||||
|           args: ['val'] | ||||
|       - delay: 200ms | ||||
|       - logger.log: | ||||
|           format: "Rejection test: END %d" | ||||
|           args: ['val'] | ||||
|  | ||||
|   # Test script 5: No parameters | ||||
|   - id: no_params_script | ||||
|     mode: queued | ||||
|     then: | ||||
|       - logger.log: "No params: START" | ||||
|       - delay: 50ms | ||||
|       - logger.log: "No params: END" | ||||
							
								
								
									
										207
									
								
								tests/integration/test_script_queued.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										207
									
								
								tests/integration/test_script_queued.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,207 @@ | ||||
| """Test ESPHome queued script functionality.""" | ||||
|  | ||||
| from __future__ import annotations | ||||
|  | ||||
| import asyncio | ||||
| import re | ||||
|  | ||||
| import pytest | ||||
|  | ||||
| from .types import APIClientConnectedFactory, RunCompiledFunction | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_script_queued( | ||||
|     yaml_config: str, | ||||
|     run_compiled: RunCompiledFunction, | ||||
|     api_client_connected: APIClientConnectedFactory, | ||||
| ) -> None: | ||||
|     """Test comprehensive queued script functionality.""" | ||||
|     loop = asyncio.get_running_loop() | ||||
|  | ||||
|     # Track all test results | ||||
|     test_results = { | ||||
|         "queue_depth": {"processed": [], "rejections": 0}, | ||||
|         "ring_buffer": {"start_order": [], "end_order": []}, | ||||
|         "stop": {"processed": [], "stop_logged": False}, | ||||
|         "rejection": {"processed": [], "rejections": 0}, | ||||
|         "no_params": {"executions": 0}, | ||||
|     } | ||||
|  | ||||
|     # Patterns for Test 1: Queue depth | ||||
|     queue_start = re.compile(r"Queue test: START item (\d+)") | ||||
|     queue_end = re.compile(r"Queue test: END item (\d+)") | ||||
|     queue_reject = re.compile( | ||||
|         r"Script 'queue_depth_script' maximum number of queued runs exceeded!" | ||||
|     ) | ||||
|  | ||||
|     # Patterns for Test 2: Ring buffer | ||||
|     ring_start = re.compile(r"Ring buffer: START '([A-Z])'") | ||||
|     ring_end = re.compile(r"Ring buffer: END '([A-Z])'") | ||||
|  | ||||
|     # Patterns for Test 3: Stop | ||||
|     stop_start = re.compile(r"Stop test: START (\d+)") | ||||
|     stop_log = re.compile(r"STOPPING script now") | ||||
|  | ||||
|     # Patterns for Test 4: Rejection | ||||
|     reject_start = re.compile(r"Rejection test: START (\d+)") | ||||
|     reject_end = re.compile(r"Rejection test: END (\d+)") | ||||
|     reject_reject = re.compile( | ||||
|         r"Script 'rejection_script' maximum number of queued runs exceeded!" | ||||
|     ) | ||||
|  | ||||
|     # Patterns for Test 5: No params | ||||
|     no_params_end = re.compile(r"No params: END") | ||||
|  | ||||
|     # Test completion futures | ||||
|     test1_complete = loop.create_future() | ||||
|     test2_complete = loop.create_future() | ||||
|     test3_complete = loop.create_future() | ||||
|     test4_complete = loop.create_future() | ||||
|     test5_complete = loop.create_future() | ||||
|  | ||||
|     def check_output(line: str) -> None: | ||||
|         """Check log output for all test messages.""" | ||||
|         # Test 1: Queue depth | ||||
|         if match := queue_start.search(line): | ||||
|             item = int(match.group(1)) | ||||
|             if item not in test_results["queue_depth"]["processed"]: | ||||
|                 test_results["queue_depth"]["processed"].append(item) | ||||
|  | ||||
|         if match := queue_end.search(line): | ||||
|             item = int(match.group(1)) | ||||
|             if item == 5 and not test1_complete.done(): | ||||
|                 test1_complete.set_result(True) | ||||
|  | ||||
|         if queue_reject.search(line): | ||||
|             test_results["queue_depth"]["rejections"] += 1 | ||||
|  | ||||
|         # Test 2: Ring buffer | ||||
|         if match := ring_start.search(line): | ||||
|             msg = match.group(1) | ||||
|             test_results["ring_buffer"]["start_order"].append(msg) | ||||
|  | ||||
|         if match := ring_end.search(line): | ||||
|             msg = match.group(1) | ||||
|             test_results["ring_buffer"]["end_order"].append(msg) | ||||
|             if ( | ||||
|                 len(test_results["ring_buffer"]["end_order"]) == 3 | ||||
|                 and not test2_complete.done() | ||||
|             ): | ||||
|                 test2_complete.set_result(True) | ||||
|  | ||||
|         # Test 3: Stop | ||||
|         if match := stop_start.search(line): | ||||
|             item = int(match.group(1)) | ||||
|             if item not in test_results["stop"]["processed"]: | ||||
|                 test_results["stop"]["processed"].append(item) | ||||
|  | ||||
|         if stop_log.search(line): | ||||
|             test_results["stop"]["stop_logged"] = True | ||||
|             # Give time for any queued items to be cleared | ||||
|             if not test3_complete.done(): | ||||
|                 loop.call_later( | ||||
|                     0.3, | ||||
|                     lambda: test3_complete.set_result(True) | ||||
|                     if not test3_complete.done() | ||||
|                     else None, | ||||
|                 ) | ||||
|  | ||||
|         # Test 4: Rejection | ||||
|         if match := reject_start.search(line): | ||||
|             item = int(match.group(1)) | ||||
|             if item not in test_results["rejection"]["processed"]: | ||||
|                 test_results["rejection"]["processed"].append(item) | ||||
|  | ||||
|         if match := reject_end.search(line): | ||||
|             item = int(match.group(1)) | ||||
|             if item == 3 and not test4_complete.done(): | ||||
|                 test4_complete.set_result(True) | ||||
|  | ||||
|         if reject_reject.search(line): | ||||
|             test_results["rejection"]["rejections"] += 1 | ||||
|  | ||||
|         # Test 5: No params | ||||
|         if no_params_end.search(line): | ||||
|             test_results["no_params"]["executions"] += 1 | ||||
|             if ( | ||||
|                 test_results["no_params"]["executions"] == 3 | ||||
|                 and not test5_complete.done() | ||||
|             ): | ||||
|                 test5_complete.set_result(True) | ||||
|  | ||||
|     async with ( | ||||
|         run_compiled(yaml_config, line_callback=check_output), | ||||
|         api_client_connected() as client, | ||||
|     ): | ||||
|         # Get services | ||||
|         _, services = await client.list_entities_services() | ||||
|  | ||||
|         # Test 1: Queue depth limit | ||||
|         test_service = next((s for s in services if s.name == "test_queue_depth"), None) | ||||
|         assert test_service is not None, "test_queue_depth service not found" | ||||
|         client.execute_service(test_service, {}) | ||||
|         await asyncio.wait_for(test1_complete, timeout=2.0) | ||||
|         await asyncio.sleep(0.1)  # Give time for rejections | ||||
|  | ||||
|         # Verify Test 1 | ||||
|         assert sorted(test_results["queue_depth"]["processed"]) == [1, 2, 3, 4, 5], ( | ||||
|             f"Test 1: Expected to process items 1-5 (max_runs=5 means 5 total), got {sorted(test_results['queue_depth']['processed'])}" | ||||
|         ) | ||||
|         assert test_results["queue_depth"]["rejections"] >= 2, ( | ||||
|             "Test 1: Expected at least 2 rejection warnings (items 6-7 should be rejected)" | ||||
|         ) | ||||
|  | ||||
|         # Test 2: Ring buffer order | ||||
|         test_service = next((s for s in services if s.name == "test_ring_buffer"), None) | ||||
|         assert test_service is not None, "test_ring_buffer service not found" | ||||
|         client.execute_service(test_service, {}) | ||||
|         await asyncio.wait_for(test2_complete, timeout=2.0) | ||||
|  | ||||
|         # Verify Test 2 | ||||
|         assert test_results["ring_buffer"]["start_order"] == ["A", "B", "C"], ( | ||||
|             f"Test 2: Expected start order [A, B, C], got {test_results['ring_buffer']['start_order']}" | ||||
|         ) | ||||
|         assert test_results["ring_buffer"]["end_order"] == ["A", "B", "C"], ( | ||||
|             f"Test 2: Expected end order [A, B, C], got {test_results['ring_buffer']['end_order']}" | ||||
|         ) | ||||
|  | ||||
|         # Test 3: Stop clears queue | ||||
|         test_service = next((s for s in services if s.name == "test_stop_clears"), None) | ||||
|         assert test_service is not None, "test_stop_clears service not found" | ||||
|         client.execute_service(test_service, {}) | ||||
|         await asyncio.wait_for(test3_complete, timeout=2.0) | ||||
|  | ||||
|         # Verify Test 3 | ||||
|         assert test_results["stop"]["stop_logged"], ( | ||||
|             "Test 3: Stop command was not logged" | ||||
|         ) | ||||
|         assert test_results["stop"]["processed"] == [1], ( | ||||
|             f"Test 3: Expected only item 1 to process, got {test_results['stop']['processed']}" | ||||
|         ) | ||||
|  | ||||
|         # Test 4: Rejection enforcement (max_runs=3) | ||||
|         test_service = next((s for s in services if s.name == "test_rejection"), None) | ||||
|         assert test_service is not None, "test_rejection service not found" | ||||
|         client.execute_service(test_service, {}) | ||||
|         await asyncio.wait_for(test4_complete, timeout=2.0) | ||||
|         await asyncio.sleep(0.1)  # Give time for rejections | ||||
|  | ||||
|         # Verify Test 4 | ||||
|         assert sorted(test_results["rejection"]["processed"]) == [1, 2, 3], ( | ||||
|             f"Test 4: Expected to process items 1-3 (max_runs=3 means 3 total), got {sorted(test_results['rejection']['processed'])}" | ||||
|         ) | ||||
|         assert test_results["rejection"]["rejections"] == 5, ( | ||||
|             f"Test 4: Expected 5 rejections (items 4-8), got {test_results['rejection']['rejections']}" | ||||
|         ) | ||||
|  | ||||
|         # Test 5: No parameters | ||||
|         test_service = next((s for s in services if s.name == "test_no_params"), None) | ||||
|         assert test_service is not None, "test_no_params service not found" | ||||
|         client.execute_service(test_service, {}) | ||||
|         await asyncio.wait_for(test5_complete, timeout=2.0) | ||||
|  | ||||
|         # Verify Test 5 | ||||
|         assert test_results["no_params"]["executions"] == 3, ( | ||||
|             f"Test 5: Expected 3 executions, got {test_results['no_params']['executions']}" | ||||
|         ) | ||||
		Reference in New Issue
	
	Block a user