mirror of
https://github.com/esphome/esphome.git
synced 2025-11-16 23:05:46 +00:00
Co-authored-by: Clyde Stubbs <2366188+clydebarrow@users.noreply.github.com> Co-authored-by: J. Nick Koston <nick@koston.org> Co-authored-by: pre-commit-ci-lite[bot] <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com> Co-authored-by: J. Nick Koston <nick@home-assistant.io>
93 lines
3.3 KiB
Python
93 lines
3.3 KiB
Python
"""Integration test for API conditional memory optimization with triggers and services."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import collections
|
|
import re
|
|
|
|
import pytest
|
|
|
|
from .types import APIClientConnectedFactory, RunCompiledFunction
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_action_concurrent_reentry(
|
|
yaml_config: str,
|
|
run_compiled: RunCompiledFunction,
|
|
api_client_connected: APIClientConnectedFactory,
|
|
) -> None:
|
|
"""
|
|
This test runs a script in parallel with varying arguments and verifies if
|
|
each script keeps its original argument throughout its execution
|
|
"""
|
|
test_complete = asyncio.Event()
|
|
expected = {0, 1, 2, 3, 4}
|
|
|
|
# Patterns to match in logs
|
|
after_wait_until_pattern = re.compile(r"AFTER wait_until ARG (\d+)")
|
|
after_script_wait_pattern = re.compile(r"AFTER script\.wait ARG (\d+)")
|
|
after_repeat_pattern = re.compile(r"AFTER repeat ARG (\d+)")
|
|
in_repeat_pattern = re.compile(r"IN repeat (\d+) ARG (\d+)")
|
|
after_while_pattern = re.compile(r"AFTER while ARG (\d+)")
|
|
in_while_pattern = re.compile(r"IN while ARG (\d+)")
|
|
|
|
after_wait_until_args = []
|
|
after_script_wait_args = []
|
|
after_while_args = []
|
|
in_while_args = []
|
|
after_repeat_args = []
|
|
in_repeat_args = collections.defaultdict(list)
|
|
|
|
def check_output(line: str) -> None:
|
|
"""Check log output for expected messages."""
|
|
if test_complete.is_set():
|
|
return
|
|
|
|
if mo := after_wait_until_pattern.search(line):
|
|
after_wait_until_args.append(int(mo.group(1)))
|
|
elif mo := after_script_wait_pattern.search(line):
|
|
after_script_wait_args.append(int(mo.group(1)))
|
|
elif mo := in_while_pattern.search(line):
|
|
in_while_args.append(int(mo.group(1)))
|
|
elif mo := after_while_pattern.search(line):
|
|
after_while_args.append(int(mo.group(1)))
|
|
elif mo := in_repeat_pattern.search(line):
|
|
in_repeat_args[int(mo.group(1))].append(int(mo.group(2)))
|
|
elif mo := after_repeat_pattern.search(line):
|
|
after_repeat_args.append(int(mo.group(1)))
|
|
if len(after_repeat_args) == len(expected):
|
|
test_complete.set()
|
|
|
|
# Run with log monitoring
|
|
async with (
|
|
run_compiled(yaml_config, line_callback=check_output),
|
|
api_client_connected() as client,
|
|
):
|
|
# Verify device info
|
|
device_info = await client.device_info()
|
|
assert device_info is not None
|
|
assert device_info.name == "action-concurrent-reentry"
|
|
|
|
# Wait for tests to complete with timeout
|
|
try:
|
|
await asyncio.wait_for(test_complete.wait(), timeout=8.0)
|
|
except TimeoutError:
|
|
pytest.fail("test timed out")
|
|
|
|
# order may change, but all args must be present
|
|
for args in in_repeat_args.values():
|
|
assert set(args) == expected
|
|
assert set(in_repeat_args.keys()) == {0, 1, 2}
|
|
assert set(after_wait_until_args) == expected, after_wait_until_args
|
|
assert set(after_script_wait_args) == expected, after_script_wait_args
|
|
assert set(after_repeat_args) == expected, after_repeat_args
|
|
assert set(after_while_args) == expected, after_while_args
|
|
assert dict(collections.Counter(in_while_args)) == {
|
|
0: 5,
|
|
1: 4,
|
|
2: 3,
|
|
3: 2,
|
|
4: 1,
|
|
}, in_while_args
|