mirror of
https://github.com/esphome/esphome.git
synced 2025-10-21 19:23:45 +01:00
Merge branch 'unbound_queued_script_fix' into integration
This commit is contained in:
@@ -45,13 +45,26 @@ def get_script(script_id):
|
|||||||
|
|
||||||
|
|
||||||
def check_max_runs(value):
|
def check_max_runs(value):
|
||||||
|
# Set default for queued mode to prevent unbounded queue growth
|
||||||
|
if CONF_MAX_RUNS not in value and value[CONF_MODE] == CONF_QUEUED:
|
||||||
|
value[CONF_MAX_RUNS] = 5
|
||||||
|
|
||||||
if CONF_MAX_RUNS not in value:
|
if CONF_MAX_RUNS not in value:
|
||||||
return value
|
return value
|
||||||
|
|
||||||
if value[CONF_MODE] not in [CONF_QUEUED, CONF_PARALLEL]:
|
if value[CONF_MODE] not in [CONF_QUEUED, CONF_PARALLEL]:
|
||||||
raise cv.Invalid(
|
raise cv.Invalid(
|
||||||
"The option 'max_runs' is only valid in 'queue' and 'parallel' mode.",
|
"The option 'max_runs' is only valid in 'queue' and 'parallel' mode.",
|
||||||
path=[CONF_MAX_RUNS],
|
path=[CONF_MAX_RUNS],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Queued mode must have bounded queue (min 1), parallel mode can be unlimited (0)
|
||||||
|
if value[CONF_MODE] == CONF_QUEUED and value[CONF_MAX_RUNS] < 1:
|
||||||
|
raise cv.Invalid(
|
||||||
|
"The option 'max_runs' must be at least 1 for queued mode.",
|
||||||
|
path=[CONF_MAX_RUNS],
|
||||||
|
)
|
||||||
|
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
@@ -106,7 +119,7 @@ CONFIG_SCHEMA = automation.validate_automation(
|
|||||||
cv.Optional(CONF_MODE, default=CONF_SINGLE): cv.one_of(
|
cv.Optional(CONF_MODE, default=CONF_SINGLE): cv.one_of(
|
||||||
*SCRIPT_MODES, lower=True
|
*SCRIPT_MODES, lower=True
|
||||||
),
|
),
|
||||||
cv.Optional(CONF_MAX_RUNS): cv.positive_int,
|
cv.Optional(CONF_MAX_RUNS): cv.int_range(min=0, max=100),
|
||||||
cv.Optional(CONF_PARAMETERS, default={}): cv.Schema(
|
cv.Optional(CONF_PARAMETERS, default={}): cv.Schema(
|
||||||
{
|
{
|
||||||
validate_parameter_name: validate_parameter_type,
|
validate_parameter_name: validate_parameter_type,
|
||||||
|
@@ -2,9 +2,8 @@
|
|||||||
|
|
||||||
#include "esphome/core/automation.h"
|
#include "esphome/core/automation.h"
|
||||||
#include "esphome/core/component.h"
|
#include "esphome/core/component.h"
|
||||||
|
#include "esphome/core/helpers.h"
|
||||||
#include "esphome/core/log.h"
|
#include "esphome/core/log.h"
|
||||||
|
|
||||||
#include <queue>
|
|
||||||
namespace esphome {
|
namespace esphome {
|
||||||
namespace script {
|
namespace script {
|
||||||
|
|
||||||
@@ -96,14 +95,27 @@ template<typename... Ts> class RestartScript : public Script<Ts...> {
|
|||||||
/** A script type that queues new instances that are created.
|
/** A script type that queues new instances that are created.
|
||||||
*
|
*
|
||||||
* Only one instance of the script can be active at a time.
|
* Only one instance of the script can be active at a time.
|
||||||
|
*
|
||||||
|
* Ring buffer implementation:
|
||||||
|
* - num_queued_ tracks the number of queued (waiting) instances, NOT including the currently running one
|
||||||
|
* - queue_front_ points to the next item to execute (read position)
|
||||||
|
* - Buffer size is max_runs_ - 1 (max total instances minus the running one)
|
||||||
|
* - Write position is calculated as: (queue_front_ + num_queued_) % (max_runs_ - 1)
|
||||||
|
* - When an item finishes, queue_front_ advances: (queue_front_ + 1) % (max_runs_ - 1)
|
||||||
|
* - First execute() runs immediately without queuing (num_queued_ stays 0)
|
||||||
|
* - Subsequent executes while running are queued starting at position 0
|
||||||
|
* - Maximum total instances = max_runs_ (includes 1 running + (max_runs_ - 1) queued)
|
||||||
*/
|
*/
|
||||||
template<typename... Ts> class QueueingScript : public Script<Ts...>, public Component {
|
template<typename... Ts> class QueueingScript : public Script<Ts...>, public Component {
|
||||||
public:
|
public:
|
||||||
void execute(Ts... x) override {
|
void execute(Ts... x) override {
|
||||||
if (this->is_action_running() || this->num_runs_ > 0) {
|
this->lazy_init_queue_();
|
||||||
// num_runs_ is the number of *queued* instances, so total number of instances is
|
|
||||||
// num_runs_ + 1
|
if (this->is_action_running() || this->num_queued_ > 0) {
|
||||||
if (this->max_runs_ != 0 && this->num_runs_ + 1 >= this->max_runs_) {
|
// num_queued_ is the number of *queued* instances (waiting, not including currently running)
|
||||||
|
// max_runs_ is the maximum *total* instances (running + queued)
|
||||||
|
// So we reject when num_queued_ + 1 >= max_runs_ (queued + running >= max)
|
||||||
|
if (this->num_queued_ + 1 >= this->max_runs_) {
|
||||||
this->esp_logw_(__LINE__, ESPHOME_LOG_FORMAT("Script '%s' maximum number of queued runs exceeded!"),
|
this->esp_logw_(__LINE__, ESPHOME_LOG_FORMAT("Script '%s' maximum number of queued runs exceeded!"),
|
||||||
LOG_STR_ARG(this->name_));
|
LOG_STR_ARG(this->name_));
|
||||||
return;
|
return;
|
||||||
@@ -111,8 +123,11 @@ template<typename... Ts> class QueueingScript : public Script<Ts...>, public Com
|
|||||||
|
|
||||||
this->esp_logd_(__LINE__, ESPHOME_LOG_FORMAT("Script '%s' queueing new instance (mode: queued)"),
|
this->esp_logd_(__LINE__, ESPHOME_LOG_FORMAT("Script '%s' queueing new instance (mode: queued)"),
|
||||||
LOG_STR_ARG(this->name_));
|
LOG_STR_ARG(this->name_));
|
||||||
this->num_runs_++;
|
// Ring buffer: write to (queue_front_ + num_queued_) % (max_runs_ - 1)
|
||||||
this->var_queue_.push(std::make_tuple(x...));
|
size_t write_pos = (this->queue_front_ + this->num_queued_) % (this->max_runs_ - 1);
|
||||||
|
// Use reset() to replace the unique_ptr
|
||||||
|
this->var_queue_[write_pos].reset(new std::tuple<Ts...>(std::make_tuple(x...)));
|
||||||
|
this->num_queued_++;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -122,15 +137,17 @@ template<typename... Ts> class QueueingScript : public Script<Ts...>, public Com
|
|||||||
}
|
}
|
||||||
|
|
||||||
void stop() override {
|
void stop() override {
|
||||||
this->num_runs_ = 0;
|
this->num_queued_ = 0;
|
||||||
|
this->queue_front_ = 0;
|
||||||
Script<Ts...>::stop();
|
Script<Ts...>::stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
void loop() override {
|
void loop() override {
|
||||||
if (this->num_runs_ != 0 && !this->is_action_running()) {
|
if (this->num_queued_ != 0 && !this->is_action_running()) {
|
||||||
this->num_runs_--;
|
// Dequeue: decrement count, read from front, advance read position
|
||||||
auto &vars = this->var_queue_.front();
|
this->num_queued_--;
|
||||||
this->var_queue_.pop();
|
auto &vars = *this->var_queue_[this->queue_front_];
|
||||||
|
this->queue_front_ = (this->queue_front_ + 1) % (this->max_runs_ - 1);
|
||||||
this->trigger_tuple_(vars, typename gens<sizeof...(Ts)>::type());
|
this->trigger_tuple_(vars, typename gens<sizeof...(Ts)>::type());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -138,13 +155,27 @@ template<typename... Ts> class QueueingScript : public Script<Ts...>, public Com
|
|||||||
void set_max_runs(int max_runs) { max_runs_ = max_runs; }
|
void set_max_runs(int max_runs) { max_runs_ = max_runs; }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
// Lazy init queue on first use - avoids setup() ordering issues and saves memory
|
||||||
|
// if script is never executed during this boot cycle
|
||||||
|
inline void lazy_init_queue_() {
|
||||||
|
if (this->var_queue_.capacity() == 0) {
|
||||||
|
// Allocate max_runs_ - 1 slots for queued items (running item is separate)
|
||||||
|
this->var_queue_.init(this->max_runs_ - 1);
|
||||||
|
// Initialize all unique_ptr slots to nullptr
|
||||||
|
for (int i = 0; i < this->max_runs_ - 1; i++) {
|
||||||
|
this->var_queue_.push_back(nullptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
template<int... S> void trigger_tuple_(const std::tuple<Ts...> &tuple, seq<S...> /*unused*/) {
|
template<int... S> void trigger_tuple_(const std::tuple<Ts...> &tuple, seq<S...> /*unused*/) {
|
||||||
this->trigger(std::get<S>(tuple)...);
|
this->trigger(std::get<S>(tuple)...);
|
||||||
}
|
}
|
||||||
|
|
||||||
int num_runs_ = 0;
|
int num_queued_ = 0; // Number of queued instances (not including currently running)
|
||||||
int max_runs_ = 0;
|
int max_runs_ = 0; // Maximum total instances (running + queued)
|
||||||
std::queue<std::tuple<Ts...>> var_queue_;
|
size_t queue_front_ = 0; // Ring buffer read position (next item to execute)
|
||||||
|
FixedVector<std::unique_ptr<std::tuple<Ts...>>> var_queue_; // Ring buffer of queued parameters
|
||||||
};
|
};
|
||||||
|
|
||||||
/** A script type that executes new instances in parallel.
|
/** A script type that executes new instances in parallel.
|
||||||
|
@@ -301,6 +301,7 @@ template<typename T> class FixedVector {
|
|||||||
const T &back() const { return data_[size_ - 1]; }
|
const T &back() const { return data_[size_ - 1]; }
|
||||||
|
|
||||||
size_t size() const { return size_; }
|
size_t size() const { return size_; }
|
||||||
|
size_t capacity() const { return capacity_; }
|
||||||
bool empty() const { return size_ == 0; }
|
bool empty() const { return size_ == 0; }
|
||||||
|
|
||||||
/// Access element without bounds checking (matches std::vector behavior)
|
/// Access element without bounds checking (matches std::vector behavior)
|
||||||
|
170
tests/integration/fixtures/script_queued.yaml
Normal file
170
tests/integration/fixtures/script_queued.yaml
Normal file
@@ -0,0 +1,170 @@
|
|||||||
|
esphome:
|
||||||
|
name: test-script-queued
|
||||||
|
|
||||||
|
host:
|
||||||
|
api:
|
||||||
|
actions:
|
||||||
|
# Test 1: Queue depth with default max_runs=5
|
||||||
|
- action: test_queue_depth
|
||||||
|
then:
|
||||||
|
- logger.log: "=== TEST 1: Queue depth (max_runs=5 means 5 total, reject 6-7) ==="
|
||||||
|
- script.execute:
|
||||||
|
id: queue_depth_script
|
||||||
|
value: 1
|
||||||
|
- script.execute:
|
||||||
|
id: queue_depth_script
|
||||||
|
value: 2
|
||||||
|
- script.execute:
|
||||||
|
id: queue_depth_script
|
||||||
|
value: 3
|
||||||
|
- script.execute:
|
||||||
|
id: queue_depth_script
|
||||||
|
value: 4
|
||||||
|
- script.execute:
|
||||||
|
id: queue_depth_script
|
||||||
|
value: 5
|
||||||
|
- script.execute:
|
||||||
|
id: queue_depth_script
|
||||||
|
value: 6
|
||||||
|
- script.execute:
|
||||||
|
id: queue_depth_script
|
||||||
|
value: 7
|
||||||
|
|
||||||
|
# Test 2: Ring buffer wrap test
|
||||||
|
- action: test_ring_buffer
|
||||||
|
then:
|
||||||
|
- logger.log: "=== TEST 2: Ring buffer wrap (should process A, B, C in order) ==="
|
||||||
|
- script.execute:
|
||||||
|
id: wrap_script
|
||||||
|
msg: "A"
|
||||||
|
- script.execute:
|
||||||
|
id: wrap_script
|
||||||
|
msg: "B"
|
||||||
|
- script.execute:
|
||||||
|
id: wrap_script
|
||||||
|
msg: "C"
|
||||||
|
|
||||||
|
# Test 3: Stop clears queue
|
||||||
|
- action: test_stop_clears
|
||||||
|
then:
|
||||||
|
- logger.log: "=== TEST 3: Stop clears queue (should only see 1, then 'STOPPED') ==="
|
||||||
|
- script.execute:
|
||||||
|
id: stop_script
|
||||||
|
num: 1
|
||||||
|
- script.execute:
|
||||||
|
id: stop_script
|
||||||
|
num: 2
|
||||||
|
- script.execute:
|
||||||
|
id: stop_script
|
||||||
|
num: 3
|
||||||
|
- delay: 50ms
|
||||||
|
- logger.log: "STOPPING script now"
|
||||||
|
- script.stop: stop_script
|
||||||
|
|
||||||
|
# Test 4: Verify rejection (max_runs=3)
|
||||||
|
- action: test_rejection
|
||||||
|
then:
|
||||||
|
- logger.log: "=== TEST 4: Verify rejection (max_runs=3 means 3 total, reject 4-8) ==="
|
||||||
|
- script.execute:
|
||||||
|
id: rejection_script
|
||||||
|
val: 1
|
||||||
|
- script.execute:
|
||||||
|
id: rejection_script
|
||||||
|
val: 2
|
||||||
|
- script.execute:
|
||||||
|
id: rejection_script
|
||||||
|
val: 3
|
||||||
|
- script.execute:
|
||||||
|
id: rejection_script
|
||||||
|
val: 4
|
||||||
|
- script.execute:
|
||||||
|
id: rejection_script
|
||||||
|
val: 5
|
||||||
|
- script.execute:
|
||||||
|
id: rejection_script
|
||||||
|
val: 6
|
||||||
|
- script.execute:
|
||||||
|
id: rejection_script
|
||||||
|
val: 7
|
||||||
|
- script.execute:
|
||||||
|
id: rejection_script
|
||||||
|
val: 8
|
||||||
|
|
||||||
|
# Test 5: No parameters test
|
||||||
|
- action: test_no_params
|
||||||
|
then:
|
||||||
|
- logger.log: "=== TEST 5: No params (should process 3 times) ==="
|
||||||
|
- script.execute: no_params_script
|
||||||
|
- script.execute: no_params_script
|
||||||
|
- script.execute: no_params_script
|
||||||
|
|
||||||
|
logger:
|
||||||
|
level: DEBUG
|
||||||
|
|
||||||
|
script:
|
||||||
|
# Test script 1: Queue depth test (default max_runs=5)
|
||||||
|
- id: queue_depth_script
|
||||||
|
mode: queued
|
||||||
|
parameters:
|
||||||
|
value: int
|
||||||
|
then:
|
||||||
|
- logger.log:
|
||||||
|
format: "Queue test: START item %d"
|
||||||
|
args: ['value']
|
||||||
|
- delay: 100ms
|
||||||
|
- logger.log:
|
||||||
|
format: "Queue test: END item %d"
|
||||||
|
args: ['value']
|
||||||
|
|
||||||
|
# Test script 2: Ring buffer wrap test (max_runs=3)
|
||||||
|
- id: wrap_script
|
||||||
|
mode: queued
|
||||||
|
max_runs: 3
|
||||||
|
parameters:
|
||||||
|
msg: string
|
||||||
|
then:
|
||||||
|
- logger.log:
|
||||||
|
format: "Ring buffer: START '%s'"
|
||||||
|
args: ['msg.c_str()']
|
||||||
|
- delay: 50ms
|
||||||
|
- logger.log:
|
||||||
|
format: "Ring buffer: END '%s'"
|
||||||
|
args: ['msg.c_str()']
|
||||||
|
|
||||||
|
# Test script 3: Stop test
|
||||||
|
- id: stop_script
|
||||||
|
mode: queued
|
||||||
|
max_runs: 5
|
||||||
|
parameters:
|
||||||
|
num: int
|
||||||
|
then:
|
||||||
|
- logger.log:
|
||||||
|
format: "Stop test: START %d"
|
||||||
|
args: ['num']
|
||||||
|
- delay: 100ms
|
||||||
|
- logger.log:
|
||||||
|
format: "Stop test: END %d"
|
||||||
|
args: ['num']
|
||||||
|
|
||||||
|
# Test script 4: Rejection test (max_runs=3)
|
||||||
|
- id: rejection_script
|
||||||
|
mode: queued
|
||||||
|
max_runs: 3
|
||||||
|
parameters:
|
||||||
|
val: int
|
||||||
|
then:
|
||||||
|
- logger.log:
|
||||||
|
format: "Rejection test: START %d"
|
||||||
|
args: ['val']
|
||||||
|
- delay: 200ms
|
||||||
|
- logger.log:
|
||||||
|
format: "Rejection test: END %d"
|
||||||
|
args: ['val']
|
||||||
|
|
||||||
|
# Test script 5: No parameters
|
||||||
|
- id: no_params_script
|
||||||
|
mode: queued
|
||||||
|
then:
|
||||||
|
- logger.log: "No params: START"
|
||||||
|
- delay: 50ms
|
||||||
|
- logger.log: "No params: END"
|
207
tests/integration/test_script_queued.py
Normal file
207
tests/integration/test_script_queued.py
Normal file
@@ -0,0 +1,207 @@
|
|||||||
|
"""Test ESPHome queued script functionality."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import re
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from .types import APIClientConnectedFactory, RunCompiledFunction
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_script_queued(
|
||||||
|
yaml_config: str,
|
||||||
|
run_compiled: RunCompiledFunction,
|
||||||
|
api_client_connected: APIClientConnectedFactory,
|
||||||
|
) -> None:
|
||||||
|
"""Test comprehensive queued script functionality."""
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
|
||||||
|
# Track all test results
|
||||||
|
test_results = {
|
||||||
|
"queue_depth": {"processed": [], "rejections": 0},
|
||||||
|
"ring_buffer": {"start_order": [], "end_order": []},
|
||||||
|
"stop": {"processed": [], "stop_logged": False},
|
||||||
|
"rejection": {"processed": [], "rejections": 0},
|
||||||
|
"no_params": {"executions": 0},
|
||||||
|
}
|
||||||
|
|
||||||
|
# Patterns for Test 1: Queue depth
|
||||||
|
queue_start = re.compile(r"Queue test: START item (\d+)")
|
||||||
|
queue_end = re.compile(r"Queue test: END item (\d+)")
|
||||||
|
queue_reject = re.compile(
|
||||||
|
r"Script 'queue_depth_script' maximum number of queued runs exceeded!"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Patterns for Test 2: Ring buffer
|
||||||
|
ring_start = re.compile(r"Ring buffer: START '([A-Z])'")
|
||||||
|
ring_end = re.compile(r"Ring buffer: END '([A-Z])'")
|
||||||
|
|
||||||
|
# Patterns for Test 3: Stop
|
||||||
|
stop_start = re.compile(r"Stop test: START (\d+)")
|
||||||
|
stop_log = re.compile(r"STOPPING script now")
|
||||||
|
|
||||||
|
# Patterns for Test 4: Rejection
|
||||||
|
reject_start = re.compile(r"Rejection test: START (\d+)")
|
||||||
|
reject_end = re.compile(r"Rejection test: END (\d+)")
|
||||||
|
reject_reject = re.compile(
|
||||||
|
r"Script 'rejection_script' maximum number of queued runs exceeded!"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Patterns for Test 5: No params
|
||||||
|
no_params_end = re.compile(r"No params: END")
|
||||||
|
|
||||||
|
# Test completion futures
|
||||||
|
test1_complete = loop.create_future()
|
||||||
|
test2_complete = loop.create_future()
|
||||||
|
test3_complete = loop.create_future()
|
||||||
|
test4_complete = loop.create_future()
|
||||||
|
test5_complete = loop.create_future()
|
||||||
|
|
||||||
|
def check_output(line: str) -> None:
|
||||||
|
"""Check log output for all test messages."""
|
||||||
|
# Test 1: Queue depth
|
||||||
|
if match := queue_start.search(line):
|
||||||
|
item = int(match.group(1))
|
||||||
|
if item not in test_results["queue_depth"]["processed"]:
|
||||||
|
test_results["queue_depth"]["processed"].append(item)
|
||||||
|
|
||||||
|
if match := queue_end.search(line):
|
||||||
|
item = int(match.group(1))
|
||||||
|
if item == 5 and not test1_complete.done():
|
||||||
|
test1_complete.set_result(True)
|
||||||
|
|
||||||
|
if queue_reject.search(line):
|
||||||
|
test_results["queue_depth"]["rejections"] += 1
|
||||||
|
|
||||||
|
# Test 2: Ring buffer
|
||||||
|
if match := ring_start.search(line):
|
||||||
|
msg = match.group(1)
|
||||||
|
test_results["ring_buffer"]["start_order"].append(msg)
|
||||||
|
|
||||||
|
if match := ring_end.search(line):
|
||||||
|
msg = match.group(1)
|
||||||
|
test_results["ring_buffer"]["end_order"].append(msg)
|
||||||
|
if (
|
||||||
|
len(test_results["ring_buffer"]["end_order"]) == 3
|
||||||
|
and not test2_complete.done()
|
||||||
|
):
|
||||||
|
test2_complete.set_result(True)
|
||||||
|
|
||||||
|
# Test 3: Stop
|
||||||
|
if match := stop_start.search(line):
|
||||||
|
item = int(match.group(1))
|
||||||
|
if item not in test_results["stop"]["processed"]:
|
||||||
|
test_results["stop"]["processed"].append(item)
|
||||||
|
|
||||||
|
if stop_log.search(line):
|
||||||
|
test_results["stop"]["stop_logged"] = True
|
||||||
|
# Give time for any queued items to be cleared
|
||||||
|
if not test3_complete.done():
|
||||||
|
loop.call_later(
|
||||||
|
0.3,
|
||||||
|
lambda: test3_complete.set_result(True)
|
||||||
|
if not test3_complete.done()
|
||||||
|
else None,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test 4: Rejection
|
||||||
|
if match := reject_start.search(line):
|
||||||
|
item = int(match.group(1))
|
||||||
|
if item not in test_results["rejection"]["processed"]:
|
||||||
|
test_results["rejection"]["processed"].append(item)
|
||||||
|
|
||||||
|
if match := reject_end.search(line):
|
||||||
|
item = int(match.group(1))
|
||||||
|
if item == 3 and not test4_complete.done():
|
||||||
|
test4_complete.set_result(True)
|
||||||
|
|
||||||
|
if reject_reject.search(line):
|
||||||
|
test_results["rejection"]["rejections"] += 1
|
||||||
|
|
||||||
|
# Test 5: No params
|
||||||
|
if no_params_end.search(line):
|
||||||
|
test_results["no_params"]["executions"] += 1
|
||||||
|
if (
|
||||||
|
test_results["no_params"]["executions"] == 3
|
||||||
|
and not test5_complete.done()
|
||||||
|
):
|
||||||
|
test5_complete.set_result(True)
|
||||||
|
|
||||||
|
async with (
|
||||||
|
run_compiled(yaml_config, line_callback=check_output),
|
||||||
|
api_client_connected() as client,
|
||||||
|
):
|
||||||
|
# Get services
|
||||||
|
entities, services = await client.list_entities_services()
|
||||||
|
|
||||||
|
# Test 1: Queue depth limit
|
||||||
|
test_service = next((s for s in services if s.name == "test_queue_depth"), None)
|
||||||
|
assert test_service is not None, "test_queue_depth service not found"
|
||||||
|
client.execute_service(test_service, {})
|
||||||
|
await asyncio.wait_for(test1_complete, timeout=2.0)
|
||||||
|
await asyncio.sleep(0.1) # Give time for rejections
|
||||||
|
|
||||||
|
# Verify Test 1
|
||||||
|
assert sorted(test_results["queue_depth"]["processed"]) == [1, 2, 3, 4, 5], (
|
||||||
|
f"Test 1: Expected to process items 1-5 (max_runs=5 means 5 total), got {sorted(test_results['queue_depth']['processed'])}"
|
||||||
|
)
|
||||||
|
assert test_results["queue_depth"]["rejections"] >= 2, (
|
||||||
|
"Test 1: Expected at least 2 rejection warnings (items 6-7 should be rejected)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test 2: Ring buffer order
|
||||||
|
test_service = next((s for s in services if s.name == "test_ring_buffer"), None)
|
||||||
|
assert test_service is not None, "test_ring_buffer service not found"
|
||||||
|
client.execute_service(test_service, {})
|
||||||
|
await asyncio.wait_for(test2_complete, timeout=2.0)
|
||||||
|
|
||||||
|
# Verify Test 2
|
||||||
|
assert test_results["ring_buffer"]["start_order"] == ["A", "B", "C"], (
|
||||||
|
f"Test 2: Expected start order [A, B, C], got {test_results['ring_buffer']['start_order']}"
|
||||||
|
)
|
||||||
|
assert test_results["ring_buffer"]["end_order"] == ["A", "B", "C"], (
|
||||||
|
f"Test 2: Expected end order [A, B, C], got {test_results['ring_buffer']['end_order']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test 3: Stop clears queue
|
||||||
|
test_service = next((s for s in services if s.name == "test_stop_clears"), None)
|
||||||
|
assert test_service is not None, "test_stop_clears service not found"
|
||||||
|
client.execute_service(test_service, {})
|
||||||
|
await asyncio.wait_for(test3_complete, timeout=2.0)
|
||||||
|
|
||||||
|
# Verify Test 3
|
||||||
|
assert test_results["stop"]["stop_logged"], (
|
||||||
|
"Test 3: Stop command was not logged"
|
||||||
|
)
|
||||||
|
assert test_results["stop"]["processed"] == [1], (
|
||||||
|
f"Test 3: Expected only item 1 to process, got {test_results['stop']['processed']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test 4: Rejection enforcement (max_runs=3)
|
||||||
|
test_service = next((s for s in services if s.name == "test_rejection"), None)
|
||||||
|
assert test_service is not None, "test_rejection service not found"
|
||||||
|
client.execute_service(test_service, {})
|
||||||
|
await asyncio.wait_for(test4_complete, timeout=2.0)
|
||||||
|
await asyncio.sleep(0.1) # Give time for rejections
|
||||||
|
|
||||||
|
# Verify Test 4
|
||||||
|
assert sorted(test_results["rejection"]["processed"]) == [1, 2, 3], (
|
||||||
|
f"Test 4: Expected to process items 1-3 (max_runs=3 means 3 total), got {sorted(test_results['rejection']['processed'])}"
|
||||||
|
)
|
||||||
|
assert test_results["rejection"]["rejections"] == 5, (
|
||||||
|
f"Test 4: Expected 5 rejections (items 4-8), got {test_results['rejection']['rejections']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test 5: No parameters
|
||||||
|
test_service = next((s for s in services if s.name == "test_no_params"), None)
|
||||||
|
assert test_service is not None, "test_no_params service not found"
|
||||||
|
client.execute_service(test_service, {})
|
||||||
|
await asyncio.wait_for(test5_complete, timeout=2.0)
|
||||||
|
|
||||||
|
# Verify Test 5
|
||||||
|
assert test_results["no_params"]["executions"] == 3, (
|
||||||
|
f"Test 5: Expected 3 executions, got {test_results['no_params']['executions']}"
|
||||||
|
)
|
Reference in New Issue
Block a user