1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-25 21:23:53 +01:00
Files
esphome/tests/integration/test_host_mode_sensor.py
2025-05-26 21:31:32 -05:00

50 lines
1.6 KiB
Python

"""Integration test for Host mode with sensor."""
from __future__ import annotations
import asyncio
from aioesphomeapi import EntityState
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
@pytest.mark.asyncio
async def test_host_mode_with_sensor(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test Host mode with a sensor component."""
# Write, compile and run the ESPHome device, then connect to API
async with run_compiled(yaml_config), api_client_connected() as client:
# Subscribe to state changes
states: dict[int, EntityState] = {}
sensor_future: asyncio.Future[EntityState] = asyncio.Future()
def on_state(state: EntityState) -> None:
states[state.key] = state
# If this is our sensor with value 42.0, resolve the future
if (
hasattr(state, "state")
and state.state == 42.0
and not sensor_future.done()
):
sensor_future.set_result(state)
client.subscribe_states(on_state)
# Wait for sensor with specific value (42.0) with timeout
try:
test_sensor_state = await asyncio.wait_for(sensor_future, timeout=5.0)
except asyncio.TimeoutError:
pytest.fail(
f"Sensor with value 42.0 not received within 5 seconds. "
f"Received states: {list(states.values())}"
)
# Verify the sensor state
assert test_sensor_state.state == 42.0
assert len(states) > 0, "No states received"