mirror of
https://github.com/esphome/esphome.git
synced 2025-09-02 03:12:20 +01:00
50 lines
1.6 KiB
Python
50 lines
1.6 KiB
Python
"""Integration test for Host mode with sensor."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
from aioesphomeapi import EntityState
|
|
import pytest
|
|
|
|
from .types import APIClientConnectedFactory, RunCompiledFunction
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_host_mode_with_sensor(
|
|
yaml_config: str,
|
|
run_compiled: RunCompiledFunction,
|
|
api_client_connected: APIClientConnectedFactory,
|
|
) -> None:
|
|
"""Test Host mode with a sensor component."""
|
|
# Write, compile and run the ESPHome device, then connect to API
|
|
async with run_compiled(yaml_config), api_client_connected() as client:
|
|
# Subscribe to state changes
|
|
states: dict[int, EntityState] = {}
|
|
sensor_future: asyncio.Future[EntityState] = asyncio.Future()
|
|
|
|
def on_state(state: EntityState) -> None:
|
|
states[state.key] = state
|
|
# If this is our sensor with value 42.0, resolve the future
|
|
if (
|
|
hasattr(state, "state")
|
|
and state.state == 42.0
|
|
and not sensor_future.done()
|
|
):
|
|
sensor_future.set_result(state)
|
|
|
|
client.subscribe_states(on_state)
|
|
|
|
# Wait for sensor with specific value (42.0) with timeout
|
|
try:
|
|
test_sensor_state = await asyncio.wait_for(sensor_future, timeout=5.0)
|
|
except asyncio.TimeoutError:
|
|
pytest.fail(
|
|
f"Sensor with value 42.0 not received within 5 seconds. "
|
|
f"Received states: {list(states.values())}"
|
|
)
|
|
|
|
# Verify the sensor state
|
|
assert test_sensor_state.state == 42.0
|
|
assert len(states) > 0, "No states received"
|