1
0
mirror of https://github.com/esphome/esphome.git synced 2025-09-02 19:32:19 +01:00
Files
esphome/tests/integration/test_scheduler_string_name_stress.py

117 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Stress test for heap scheduler with std::string names from multiple threads."""
import asyncio
from pathlib import Path
import re
from aioesphomeapi import UserService
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_scheduler_string_name_stress(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test that set_timeout/set_interval with std::string names doesn't crash when called from multiple threads."""
# Get the absolute path to the external components directory
external_components_path = str(
Path(__file__).parent / "fixtures" / "external_components"
)
# Replace the placeholder in the YAML config with the actual path
yaml_config = yaml_config.replace(
"EXTERNAL_COMPONENT_PATH", external_components_path
)
# Create a future to signal test completion
loop = asyncio.get_running_loop()
test_complete_future: asyncio.Future[None] = loop.create_future()
# Track executed callbacks and any crashes
executed_callbacks: set[int] = set()
error_messages: list[str] = []
def on_log_line(line: str) -> None:
# Check for crash indicators
if any(
indicator in line.lower()
for indicator in [
"segfault",
"abort",
"assertion",
"heap corruption",
"use after free",
]
):
error_messages.append(line)
if not test_complete_future.done():
test_complete_future.set_exception(Exception(f"Crash detected: {line}"))
return
# Track executed callbacks
match = re.search(r"Executed string-named callback (\d+)", line)
if match:
callback_id = int(match.group(1))
executed_callbacks.add(callback_id)
# Check for completion
if (
"String name stress test complete" in line
and not test_complete_future.done()
):
test_complete_future.set_result(None)
async with (
run_compiled(yaml_config, line_callback=on_log_line),
api_client_connected() as client,
):
# Verify we can connect
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "sched-string-name-stress"
# List entities and services
_, services = await asyncio.wait_for(
client.list_entities_services(), timeout=5.0
)
# Find our test service
run_stress_test_service: UserService | None = None
for service in services:
if service.name == "run_string_name_stress_test":
run_stress_test_service = service
break
assert run_stress_test_service is not None, (
"run_string_name_stress_test service not found"
)
# Call the service to start the test
client.execute_service(run_stress_test_service, {})
# Wait for test to complete or crash
try:
await asyncio.wait_for(test_complete_future, timeout=30.0)
except asyncio.TimeoutError:
pytest.fail(
f"String name stress test timed out. Executed {len(executed_callbacks)} callbacks. "
f"This might indicate a deadlock."
)
# Verify no errors occurred (crashes already handled by exception)
assert not error_messages, f"Errors detected during test: {error_messages}"
# Verify we executed all 1000 callbacks (10 threads × 100 callbacks each)
assert len(executed_callbacks) == 1000, (
f"Expected 1000 callbacks but got {len(executed_callbacks)}"
)
# Verify each callback ID was executed exactly once
for i in range(1000):
assert i in executed_callbacks, f"Callback {i} was not executed"