mirror of
https://github.com/esphome/esphome.git
synced 2025-09-13 16:52:18 +01:00
Merge branch 'dev' into idf_webserver_ota
This commit is contained in:
71
tests/integration/fixtures/api_conditional_memory.yaml
Normal file
71
tests/integration/fixtures/api_conditional_memory.yaml
Normal file
@@ -0,0 +1,71 @@
|
||||
esphome:
|
||||
name: api-conditional-memory-test
|
||||
host:
|
||||
api:
|
||||
actions:
|
||||
- action: test_simple_service
|
||||
then:
|
||||
- logger.log: "Simple service called"
|
||||
- binary_sensor.template.publish:
|
||||
id: service_called_sensor
|
||||
state: ON
|
||||
- action: test_service_with_args
|
||||
variables:
|
||||
arg_string: string
|
||||
arg_int: int
|
||||
arg_bool: bool
|
||||
arg_float: float
|
||||
then:
|
||||
- logger.log:
|
||||
format: "Service called with: %s, %d, %d, %.2f"
|
||||
args: [arg_string.c_str(), arg_int, arg_bool, arg_float]
|
||||
- sensor.template.publish:
|
||||
id: service_arg_sensor
|
||||
state: !lambda 'return arg_float;'
|
||||
on_client_connected:
|
||||
- logger.log:
|
||||
format: "Client %s connected from %s"
|
||||
args: [client_info.c_str(), client_address.c_str()]
|
||||
- binary_sensor.template.publish:
|
||||
id: client_connected
|
||||
state: ON
|
||||
- text_sensor.template.publish:
|
||||
id: last_client_info
|
||||
state: !lambda 'return client_info;'
|
||||
on_client_disconnected:
|
||||
- logger.log:
|
||||
format: "Client %s disconnected from %s"
|
||||
args: [client_info.c_str(), client_address.c_str()]
|
||||
- binary_sensor.template.publish:
|
||||
id: client_connected
|
||||
state: OFF
|
||||
- binary_sensor.template.publish:
|
||||
id: client_disconnected_event
|
||||
state: ON
|
||||
|
||||
logger:
|
||||
level: DEBUG
|
||||
|
||||
binary_sensor:
|
||||
- platform: template
|
||||
name: "Client Connected"
|
||||
id: client_connected
|
||||
device_class: connectivity
|
||||
- platform: template
|
||||
name: "Client Disconnected Event"
|
||||
id: client_disconnected_event
|
||||
- platform: template
|
||||
name: "Service Called"
|
||||
id: service_called_sensor
|
||||
|
||||
sensor:
|
||||
- platform: template
|
||||
name: "Service Argument Value"
|
||||
id: service_arg_sensor
|
||||
unit_of_measurement: ""
|
||||
accuracy_decimals: 2
|
||||
|
||||
text_sensor:
|
||||
- platform: template
|
||||
name: "Last Client Info"
|
||||
id: last_client_info
|
164
tests/integration/fixtures/scheduler_string_test.yaml
Normal file
164
tests/integration/fixtures/scheduler_string_test.yaml
Normal file
@@ -0,0 +1,164 @@
|
||||
esphome:
|
||||
name: scheduler-string-test
|
||||
on_boot:
|
||||
priority: -100
|
||||
then:
|
||||
- logger.log: "Starting scheduler string tests"
|
||||
platformio_options:
|
||||
build_flags:
|
||||
- "-DESPHOME_DEBUG_SCHEDULER" # Enable scheduler debug logging
|
||||
|
||||
host:
|
||||
api:
|
||||
logger:
|
||||
level: VERBOSE
|
||||
|
||||
globals:
|
||||
- id: timeout_counter
|
||||
type: int
|
||||
initial_value: '0'
|
||||
- id: interval_counter
|
||||
type: int
|
||||
initial_value: '0'
|
||||
- id: dynamic_counter
|
||||
type: int
|
||||
initial_value: '0'
|
||||
- id: static_tests_done
|
||||
type: bool
|
||||
initial_value: 'false'
|
||||
- id: dynamic_tests_done
|
||||
type: bool
|
||||
initial_value: 'false'
|
||||
- id: results_reported
|
||||
type: bool
|
||||
initial_value: 'false'
|
||||
|
||||
script:
|
||||
- id: test_static_strings
|
||||
then:
|
||||
- logger.log: "Testing static string timeouts and intervals"
|
||||
- lambda: |-
|
||||
auto *component1 = id(test_sensor1);
|
||||
// Test 1: Static string literals with set_timeout
|
||||
App.scheduler.set_timeout(component1, "static_timeout_1", 50, []() {
|
||||
ESP_LOGI("test", "Static timeout 1 fired");
|
||||
id(timeout_counter) += 1;
|
||||
});
|
||||
|
||||
// Test 2: Static const char* with set_timeout
|
||||
static const char* TIMEOUT_NAME = "static_timeout_2";
|
||||
App.scheduler.set_timeout(component1, TIMEOUT_NAME, 100, []() {
|
||||
ESP_LOGI("test", "Static timeout 2 fired");
|
||||
id(timeout_counter) += 1;
|
||||
});
|
||||
|
||||
// Test 3: Static string literal with set_interval
|
||||
App.scheduler.set_interval(component1, "static_interval_1", 200, []() {
|
||||
ESP_LOGI("test", "Static interval 1 fired, count: %d", id(interval_counter));
|
||||
id(interval_counter) += 1;
|
||||
if (id(interval_counter) >= 3) {
|
||||
App.scheduler.cancel_interval(id(test_sensor1), "static_interval_1");
|
||||
ESP_LOGI("test", "Cancelled static interval 1");
|
||||
}
|
||||
});
|
||||
|
||||
// Test 4: Empty string (should be handled safely)
|
||||
App.scheduler.set_timeout(component1, "", 150, []() {
|
||||
ESP_LOGI("test", "Empty string timeout fired");
|
||||
});
|
||||
|
||||
// Test 5: Cancel timeout with const char* literal
|
||||
App.scheduler.set_timeout(component1, "cancel_static_timeout", 5000, []() {
|
||||
ESP_LOGI("test", "This static timeout should be cancelled");
|
||||
});
|
||||
// Cancel using const char* directly
|
||||
App.scheduler.cancel_timeout(component1, "cancel_static_timeout");
|
||||
ESP_LOGI("test", "Cancelled static timeout using const char*");
|
||||
|
||||
- id: test_dynamic_strings
|
||||
then:
|
||||
- logger.log: "Testing dynamic string timeouts and intervals"
|
||||
- lambda: |-
|
||||
auto *component2 = id(test_sensor2);
|
||||
|
||||
// Test 6: Dynamic string with set_timeout (std::string)
|
||||
std::string dynamic_name = "dynamic_timeout_" + std::to_string(id(dynamic_counter)++);
|
||||
App.scheduler.set_timeout(component2, dynamic_name, 100, []() {
|
||||
ESP_LOGI("test", "Dynamic timeout fired");
|
||||
id(timeout_counter) += 1;
|
||||
});
|
||||
|
||||
// Test 7: Dynamic string with set_interval
|
||||
std::string interval_name = "dynamic_interval_" + std::to_string(id(dynamic_counter)++);
|
||||
App.scheduler.set_interval(component2, interval_name, 250, [interval_name]() {
|
||||
ESP_LOGI("test", "Dynamic interval fired: %s", interval_name.c_str());
|
||||
id(interval_counter) += 1;
|
||||
if (id(interval_counter) >= 6) {
|
||||
App.scheduler.cancel_interval(id(test_sensor2), interval_name);
|
||||
ESP_LOGI("test", "Cancelled dynamic interval");
|
||||
}
|
||||
});
|
||||
|
||||
// Test 8: Cancel with different string object but same content
|
||||
std::string cancel_name = "cancel_test";
|
||||
App.scheduler.set_timeout(component2, cancel_name, 2000, []() {
|
||||
ESP_LOGI("test", "This should be cancelled");
|
||||
});
|
||||
|
||||
// Cancel using a different string object
|
||||
std::string cancel_name_2 = "cancel_test";
|
||||
App.scheduler.cancel_timeout(component2, cancel_name_2);
|
||||
ESP_LOGI("test", "Cancelled timeout using different string object");
|
||||
|
||||
- id: report_results
|
||||
then:
|
||||
- lambda: |-
|
||||
ESP_LOGI("test", "Final results - Timeouts: %d, Intervals: %d",
|
||||
id(timeout_counter), id(interval_counter));
|
||||
|
||||
sensor:
|
||||
- platform: template
|
||||
name: Test Sensor 1
|
||||
id: test_sensor1
|
||||
lambda: return 1.0;
|
||||
update_interval: never
|
||||
|
||||
- platform: template
|
||||
name: Test Sensor 2
|
||||
id: test_sensor2
|
||||
lambda: return 2.0;
|
||||
update_interval: never
|
||||
|
||||
interval:
|
||||
# Run static string tests after boot - using script to run once
|
||||
- interval: 0.1s
|
||||
then:
|
||||
- if:
|
||||
condition:
|
||||
lambda: 'return id(static_tests_done) == false;'
|
||||
then:
|
||||
- lambda: 'id(static_tests_done) = true;'
|
||||
- script.execute: test_static_strings
|
||||
- logger.log: "Started static string tests"
|
||||
|
||||
# Run dynamic string tests after static tests
|
||||
- interval: 0.2s
|
||||
then:
|
||||
- if:
|
||||
condition:
|
||||
lambda: 'return id(static_tests_done) && !id(dynamic_tests_done);'
|
||||
then:
|
||||
- lambda: 'id(dynamic_tests_done) = true;'
|
||||
- delay: 0.2s
|
||||
- script.execute: test_dynamic_strings
|
||||
|
||||
# Report results after all tests
|
||||
- interval: 0.2s
|
||||
then:
|
||||
- if:
|
||||
condition:
|
||||
lambda: 'return id(dynamic_tests_done) && !id(results_reported);'
|
||||
then:
|
||||
- lambda: 'id(results_reported) = true;'
|
||||
- delay: 1s
|
||||
- script.execute: report_results
|
205
tests/integration/test_api_conditional_memory.py
Normal file
205
tests/integration/test_api_conditional_memory.py
Normal file
@@ -0,0 +1,205 @@
|
||||
"""Integration test for API conditional memory optimization with triggers and services."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
from aioesphomeapi import (
|
||||
BinarySensorInfo,
|
||||
EntityState,
|
||||
SensorInfo,
|
||||
TextSensorInfo,
|
||||
UserService,
|
||||
UserServiceArgType,
|
||||
)
|
||||
import pytest
|
||||
|
||||
from .types import APIClientConnectedFactory, RunCompiledFunction
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_api_conditional_memory(
|
||||
yaml_config: str,
|
||||
run_compiled: RunCompiledFunction,
|
||||
api_client_connected: APIClientConnectedFactory,
|
||||
) -> None:
|
||||
"""Test API triggers and services work correctly with conditional compilation."""
|
||||
loop = asyncio.get_running_loop()
|
||||
# Keep ESPHome process running throughout the test
|
||||
async with run_compiled(yaml_config):
|
||||
# First connection
|
||||
async with api_client_connected() as client:
|
||||
# Verify device info
|
||||
device_info = await client.device_info()
|
||||
assert device_info is not None
|
||||
assert device_info.name == "api-conditional-memory-test"
|
||||
|
||||
# List entities and services
|
||||
entity_info, services = await asyncio.wait_for(
|
||||
client.list_entities_services(), timeout=5.0
|
||||
)
|
||||
|
||||
# Find our entities
|
||||
client_connected: BinarySensorInfo | None = None
|
||||
client_disconnected_event: BinarySensorInfo | None = None
|
||||
service_called_sensor: BinarySensorInfo | None = None
|
||||
service_arg_sensor: SensorInfo | None = None
|
||||
last_client_info: TextSensorInfo | None = None
|
||||
|
||||
for entity in entity_info:
|
||||
if isinstance(entity, BinarySensorInfo):
|
||||
if entity.object_id == "client_connected":
|
||||
client_connected = entity
|
||||
elif entity.object_id == "client_disconnected_event":
|
||||
client_disconnected_event = entity
|
||||
elif entity.object_id == "service_called":
|
||||
service_called_sensor = entity
|
||||
elif isinstance(entity, SensorInfo):
|
||||
if entity.object_id == "service_argument_value":
|
||||
service_arg_sensor = entity
|
||||
elif isinstance(entity, TextSensorInfo):
|
||||
if entity.object_id == "last_client_info":
|
||||
last_client_info = entity
|
||||
|
||||
# Verify all entities exist
|
||||
assert client_connected is not None, "client_connected sensor not found"
|
||||
assert client_disconnected_event is not None, (
|
||||
"client_disconnected_event sensor not found"
|
||||
)
|
||||
assert service_called_sensor is not None, "service_called sensor not found"
|
||||
assert service_arg_sensor is not None, "service_arg_sensor not found"
|
||||
assert last_client_info is not None, "last_client_info sensor not found"
|
||||
|
||||
# Verify services exist
|
||||
assert len(services) == 2, f"Expected 2 services, found {len(services)}"
|
||||
|
||||
# Find our services
|
||||
simple_service: UserService | None = None
|
||||
service_with_args: UserService | None = None
|
||||
|
||||
for service in services:
|
||||
if service.name == "test_simple_service":
|
||||
simple_service = service
|
||||
elif service.name == "test_service_with_args":
|
||||
service_with_args = service
|
||||
|
||||
assert simple_service is not None, "test_simple_service not found"
|
||||
assert service_with_args is not None, "test_service_with_args not found"
|
||||
|
||||
# Verify service arguments
|
||||
assert len(service_with_args.args) == 4, (
|
||||
f"Expected 4 args, found {len(service_with_args.args)}"
|
||||
)
|
||||
|
||||
# Check arg types
|
||||
arg_types = {arg.name: arg.type for arg in service_with_args.args}
|
||||
assert arg_types["arg_string"] == UserServiceArgType.STRING
|
||||
assert arg_types["arg_int"] == UserServiceArgType.INT
|
||||
assert arg_types["arg_bool"] == UserServiceArgType.BOOL
|
||||
assert arg_types["arg_float"] == UserServiceArgType.FLOAT
|
||||
|
||||
# Track state changes
|
||||
states: dict[int, EntityState] = {}
|
||||
states_future: asyncio.Future[None] = loop.create_future()
|
||||
|
||||
def on_state(state: EntityState) -> None:
|
||||
states[state.key] = state
|
||||
# Check if we have initial states for connection sensors
|
||||
if (
|
||||
client_connected.key in states
|
||||
and last_client_info.key in states
|
||||
and not states_future.done()
|
||||
):
|
||||
states_future.set_result(None)
|
||||
|
||||
client.subscribe_states(on_state)
|
||||
|
||||
# Wait for initial states
|
||||
await asyncio.wait_for(states_future, timeout=5.0)
|
||||
|
||||
# Verify on_client_connected trigger fired
|
||||
connected_state = states.get(client_connected.key)
|
||||
assert connected_state is not None
|
||||
assert connected_state.state is True, "Client should be connected"
|
||||
|
||||
# Verify client info was captured
|
||||
client_info_state = states.get(last_client_info.key)
|
||||
assert client_info_state is not None
|
||||
assert isinstance(client_info_state.state, str)
|
||||
assert len(client_info_state.state) > 0, "Client info should not be empty"
|
||||
|
||||
# Test simple service
|
||||
service_future: asyncio.Future[None] = loop.create_future()
|
||||
|
||||
def check_service_called(state: EntityState) -> None:
|
||||
if state.key == service_called_sensor.key and state.state is True:
|
||||
if not service_future.done():
|
||||
service_future.set_result(None)
|
||||
|
||||
# Update callback to check for service execution
|
||||
client.subscribe_states(check_service_called)
|
||||
|
||||
# Call simple service
|
||||
client.execute_service(simple_service, {})
|
||||
|
||||
# Wait for service to execute
|
||||
await asyncio.wait_for(service_future, timeout=5.0)
|
||||
|
||||
# Test service with arguments
|
||||
arg_future: asyncio.Future[None] = loop.create_future()
|
||||
expected_float = 42.5
|
||||
|
||||
def check_arg_sensor(state: EntityState) -> None:
|
||||
if (
|
||||
state.key == service_arg_sensor.key
|
||||
and abs(state.state - expected_float) < 0.01
|
||||
):
|
||||
if not arg_future.done():
|
||||
arg_future.set_result(None)
|
||||
|
||||
client.subscribe_states(check_arg_sensor)
|
||||
|
||||
# Call service with arguments
|
||||
client.execute_service(
|
||||
service_with_args,
|
||||
{
|
||||
"arg_string": "test_string",
|
||||
"arg_int": 123,
|
||||
"arg_bool": True,
|
||||
"arg_float": expected_float,
|
||||
},
|
||||
)
|
||||
|
||||
# Wait for service with args to execute
|
||||
await asyncio.wait_for(arg_future, timeout=5.0)
|
||||
|
||||
# After disconnecting first client, reconnect and verify triggers work
|
||||
async with api_client_connected() as client2:
|
||||
# Subscribe to states with new client
|
||||
states2: dict[int, EntityState] = {}
|
||||
connected_future: asyncio.Future[None] = loop.create_future()
|
||||
|
||||
def on_state2(state: EntityState) -> None:
|
||||
states2[state.key] = state
|
||||
# Check for reconnection
|
||||
if state.key == client_connected.key and state.state is True:
|
||||
if not connected_future.done():
|
||||
connected_future.set_result(None)
|
||||
|
||||
client2.subscribe_states(on_state2)
|
||||
|
||||
# Wait for connected state
|
||||
await asyncio.wait_for(connected_future, timeout=5.0)
|
||||
|
||||
# Verify client is connected again (on_client_connected fired)
|
||||
assert states2[client_connected.key].state is True, (
|
||||
"Client should be reconnected"
|
||||
)
|
||||
|
||||
# The client_disconnected_event should be ON from when we disconnected
|
||||
# (it was set ON by on_client_disconnected trigger)
|
||||
disconnected_state = states2.get(client_disconnected_event.key)
|
||||
assert disconnected_state is not None
|
||||
assert disconnected_state.state is True, (
|
||||
"Disconnect event should be ON from previous disconnect"
|
||||
)
|
166
tests/integration/test_scheduler_string_test.py
Normal file
166
tests/integration/test_scheduler_string_test.py
Normal file
@@ -0,0 +1,166 @@
|
||||
"""Test scheduler string optimization with static and dynamic strings."""
|
||||
|
||||
import asyncio
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
from .types import APIClientConnectedFactory, RunCompiledFunction
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_scheduler_string_test(
|
||||
yaml_config: str,
|
||||
run_compiled: RunCompiledFunction,
|
||||
api_client_connected: APIClientConnectedFactory,
|
||||
) -> None:
|
||||
"""Test that scheduler handles both static and dynamic strings correctly."""
|
||||
# Track counts
|
||||
timeout_count = 0
|
||||
interval_count = 0
|
||||
|
||||
# Events for each test completion
|
||||
static_timeout_1_fired = asyncio.Event()
|
||||
static_timeout_2_fired = asyncio.Event()
|
||||
static_interval_fired = asyncio.Event()
|
||||
static_interval_cancelled = asyncio.Event()
|
||||
empty_string_timeout_fired = asyncio.Event()
|
||||
static_timeout_cancelled = asyncio.Event()
|
||||
dynamic_timeout_fired = asyncio.Event()
|
||||
dynamic_interval_fired = asyncio.Event()
|
||||
cancel_test_done = asyncio.Event()
|
||||
final_results_logged = asyncio.Event()
|
||||
|
||||
# Track interval counts
|
||||
static_interval_count = 0
|
||||
dynamic_interval_count = 0
|
||||
|
||||
def on_log_line(line: str) -> None:
|
||||
nonlocal \
|
||||
timeout_count, \
|
||||
interval_count, \
|
||||
static_interval_count, \
|
||||
dynamic_interval_count
|
||||
|
||||
# Strip ANSI color codes
|
||||
clean_line = re.sub(r"\x1b\[[0-9;]*m", "", line)
|
||||
|
||||
# Check for static timeout completions
|
||||
if "Static timeout 1 fired" in clean_line:
|
||||
static_timeout_1_fired.set()
|
||||
timeout_count += 1
|
||||
|
||||
elif "Static timeout 2 fired" in clean_line:
|
||||
static_timeout_2_fired.set()
|
||||
timeout_count += 1
|
||||
|
||||
# Check for static interval
|
||||
elif "Static interval 1 fired" in clean_line:
|
||||
match = re.search(r"count: (\d+)", clean_line)
|
||||
if match:
|
||||
static_interval_count = int(match.group(1))
|
||||
static_interval_fired.set()
|
||||
|
||||
elif "Cancelled static interval 1" in clean_line:
|
||||
static_interval_cancelled.set()
|
||||
|
||||
# Check for empty string timeout
|
||||
elif "Empty string timeout fired" in clean_line:
|
||||
empty_string_timeout_fired.set()
|
||||
|
||||
# Check for static timeout cancellation
|
||||
elif "Cancelled static timeout using const char*" in clean_line:
|
||||
static_timeout_cancelled.set()
|
||||
|
||||
# Check for dynamic string tests
|
||||
elif "Dynamic timeout fired" in clean_line:
|
||||
dynamic_timeout_fired.set()
|
||||
timeout_count += 1
|
||||
|
||||
elif "Dynamic interval fired" in clean_line:
|
||||
dynamic_interval_count += 1
|
||||
dynamic_interval_fired.set()
|
||||
|
||||
# Check for cancel test
|
||||
elif "Cancelled timeout using different string object" in clean_line:
|
||||
cancel_test_done.set()
|
||||
|
||||
# Check for final results
|
||||
elif "Final results" in clean_line:
|
||||
match = re.search(r"Timeouts: (\d+), Intervals: (\d+)", clean_line)
|
||||
if match:
|
||||
timeout_count = int(match.group(1))
|
||||
interval_count = int(match.group(2))
|
||||
final_results_logged.set()
|
||||
|
||||
async with (
|
||||
run_compiled(yaml_config, line_callback=on_log_line),
|
||||
api_client_connected() as client,
|
||||
):
|
||||
# Verify we can connect
|
||||
device_info = await client.device_info()
|
||||
assert device_info is not None
|
||||
assert device_info.name == "scheduler-string-test"
|
||||
|
||||
# Wait for static string tests
|
||||
try:
|
||||
await asyncio.wait_for(static_timeout_1_fired.wait(), timeout=0.5)
|
||||
except asyncio.TimeoutError:
|
||||
pytest.fail("Static timeout 1 did not fire within 0.5 seconds")
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(static_timeout_2_fired.wait(), timeout=0.5)
|
||||
except asyncio.TimeoutError:
|
||||
pytest.fail("Static timeout 2 did not fire within 0.5 seconds")
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(static_interval_fired.wait(), timeout=1.0)
|
||||
except asyncio.TimeoutError:
|
||||
pytest.fail("Static interval did not fire within 1 second")
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(static_interval_cancelled.wait(), timeout=2.0)
|
||||
except asyncio.TimeoutError:
|
||||
pytest.fail("Static interval was not cancelled within 2 seconds")
|
||||
|
||||
# Verify static interval ran at least 3 times
|
||||
assert static_interval_count >= 2, (
|
||||
f"Expected static interval to run at least 3 times, got {static_interval_count + 1}"
|
||||
)
|
||||
|
||||
# Verify static timeout was cancelled
|
||||
assert static_timeout_cancelled.is_set(), (
|
||||
"Static timeout should have been cancelled"
|
||||
)
|
||||
|
||||
# Wait for dynamic string tests
|
||||
try:
|
||||
await asyncio.wait_for(dynamic_timeout_fired.wait(), timeout=1.0)
|
||||
except asyncio.TimeoutError:
|
||||
pytest.fail("Dynamic timeout did not fire within 1 second")
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(dynamic_interval_fired.wait(), timeout=1.5)
|
||||
except asyncio.TimeoutError:
|
||||
pytest.fail("Dynamic interval did not fire within 1.5 seconds")
|
||||
|
||||
# Wait for cancel test
|
||||
try:
|
||||
await asyncio.wait_for(cancel_test_done.wait(), timeout=1.0)
|
||||
except asyncio.TimeoutError:
|
||||
pytest.fail("Cancel test did not complete within 1 second")
|
||||
|
||||
# Wait for final results
|
||||
try:
|
||||
await asyncio.wait_for(final_results_logged.wait(), timeout=4.0)
|
||||
except asyncio.TimeoutError:
|
||||
pytest.fail("Final results were not logged within 4 seconds")
|
||||
|
||||
# Verify results
|
||||
assert timeout_count >= 3, f"Expected at least 3 timeouts, got {timeout_count}"
|
||||
assert interval_count >= 3, (
|
||||
f"Expected at least 3 interval fires, got {interval_count}"
|
||||
)
|
||||
|
||||
# Empty string timeout DOES fire (scheduler accepts empty names)
|
||||
assert empty_string_timeout_fired.is_set(), "Empty string timeout should fire"
|
Reference in New Issue
Block a user