1
0
mirror of https://github.com/esphome/esphome.git synced 2025-09-06 05:12:21 +01:00
Files
esphome/tests/integration/test_scheduler_pool.py
J. Nick Koston 3115c6fdbf preen
2025-09-02 11:05:42 -05:00

195 lines
7.2 KiB
Python

"""Integration test for scheduler memory pool functionality."""
from __future__ import annotations
import asyncio
import re
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_scheduler_pool(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test that the scheduler memory pool is working correctly with realistic usage.
This test simulates real-world scheduler usage patterns and verifies that:
1. Items are recycled to the pool when timeouts complete naturally
2. Items are recycled when intervals/timeouts are cancelled
3. Items are reused from the pool for new scheduler operations
4. The pool grows gradually based on actual usage patterns
5. Pool operations are logged correctly with debug scheduler enabled
"""
# Track log messages to verify pool behavior
log_lines: list[str] = []
pool_reuse_count = 0
pool_recycle_count = 0
pool_full_count = 0
new_alloc_count = 0
# Patterns to match pool operations
reuse_pattern = re.compile(r"Reused item from pool \(pool size now: (\d+)\)")
recycle_pattern = re.compile(r"Recycled item to pool \(pool size now: (\d+)\)")
pool_full_pattern = re.compile(r"Pool full \(size: (\d+)\), deleting item")
new_alloc_pattern = re.compile(r"Allocated new item \(pool empty\)")
# Futures to track when test phases complete
loop = asyncio.get_running_loop()
test_complete_future: asyncio.Future[bool] = loop.create_future()
phase_futures = {
1: loop.create_future(),
2: loop.create_future(),
3: loop.create_future(),
4: loop.create_future(),
5: loop.create_future(),
}
def check_output(line: str) -> None:
"""Check log output for pool operations and phase completion."""
nonlocal pool_reuse_count, pool_recycle_count, pool_full_count, new_alloc_count
log_lines.append(line)
# Track pool operations
if reuse_pattern.search(line):
pool_reuse_count += 1
elif recycle_pattern.search(line):
pool_recycle_count += 1
elif pool_full_pattern.search(line):
pool_full_count += 1
elif new_alloc_pattern.search(line):
new_alloc_count += 1
# Track phase completion
for phase_num in range(1, 6):
if (
f"Phase {phase_num} complete" in line
and not phase_futures[phase_num].done()
):
phase_futures[phase_num].set_result(True)
# Check for test completion
if "Pool recycling test complete" in line and not test_complete_future.done():
test_complete_future.set_result(True)
# Run the test with log monitoring
async with (
run_compiled(yaml_config, line_callback=check_output),
api_client_connected() as client,
):
# Verify device is running
device_info = await client.device_info()
assert device_info is not None
assert device_info.name == "scheduler-pool-test"
# Get list of services
entities, services = await client.list_entities_services()
service_names = {s.name for s in services}
# Verify all test services are available
expected_services = {
"run_phase_1",
"run_phase_2",
"run_phase_3",
"run_phase_4",
"run_phase_5",
"run_complete",
}
assert expected_services.issubset(service_names), (
f"Missing services: {expected_services - service_names}"
)
# Get service objects
phase_services = {
num: next(s for s in services if s.name == f"run_phase_{num}")
for num in range(1, 6)
}
complete_service = next(s for s in services if s.name == "run_complete")
try:
# Phase 1: Component lifecycle
client.execute_service(phase_services[1], {})
await asyncio.wait_for(phase_futures[1], timeout=3.0)
await asyncio.sleep(0.5) # Let timeouts complete
# Phase 2: Sensor polling
client.execute_service(phase_services[2], {})
await asyncio.wait_for(phase_futures[2], timeout=3.0)
await asyncio.sleep(1.0) # Let intervals run a bit
# Phase 3: Communication patterns
client.execute_service(phase_services[3], {})
await asyncio.wait_for(phase_futures[3], timeout=3.0)
await asyncio.sleep(1.0) # Let heartbeat run
# Phase 4: Defer patterns
client.execute_service(phase_services[4], {})
await asyncio.wait_for(phase_futures[4], timeout=3.0)
await asyncio.sleep(2.0) # Let everything settle and recycle
# Phase 5: Pool reuse verification
client.execute_service(phase_services[5], {})
await asyncio.wait_for(phase_futures[5], timeout=3.0)
await asyncio.sleep(0.5) # Let reuse tests complete
# Complete test
client.execute_service(complete_service, {})
await asyncio.wait_for(test_complete_future, timeout=2.0)
except TimeoutError as e:
# Print debug info if test times out
recent_logs = "\n".join(log_lines[-30:])
phases_completed = [num for num, fut in phase_futures.items() if fut.done()]
pytest.fail(
f"Test timed out waiting for phase/completion. Error: {e}\n"
f" Phases completed: {phases_completed}\n"
f" Pool stats:\n"
f" Reuse count: {pool_reuse_count}\n"
f" Recycle count: {pool_recycle_count}\n"
f" Pool full count: {pool_full_count}\n"
f" New alloc count: {new_alloc_count}\n"
f"Recent logs:\n{recent_logs}"
)
# Verify all test phases ran
for phase_num in range(1, 6):
assert phase_futures[phase_num].done(), f"Phase {phase_num} did not complete"
# Verify pool behavior
assert pool_recycle_count > 0, "Should have recycled items to pool"
# Check pool metrics
if pool_recycle_count > 0:
max_pool_size = 0
for line in log_lines:
if match := recycle_pattern.search(line):
size = int(match.group(1))
max_pool_size = max(max_pool_size, size)
# Pool can grow up to its maximum of 10
assert max_pool_size <= 10, f"Pool grew beyond maximum ({max_pool_size})"
# Log summary for debugging
print("\nScheduler Pool Test Summary (Python Orchestrated):")
print(f" Items recycled to pool: {pool_recycle_count}")
print(f" Items reused from pool: {pool_reuse_count}")
print(f" Pool full events: {pool_full_count}")
print(f" New allocations: {new_alloc_count}")
print(" All phases completed successfully")
# Verify reuse happened
if pool_reuse_count == 0 and pool_recycle_count > 3:
pytest.fail("Pool had items recycled but none were reused")
# Success - pool is working
assert pool_recycle_count > 0 or new_alloc_count < 15, (
"Pool should either recycle items or limit new allocations"
)