1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-22 11:43:51 +01:00

fix flakey

This commit is contained in:
J. Nick Koston
2025-10-16 06:01:32 -10:00
parent b4ba2aff30
commit 3ba2212cfc
2 changed files with 52 additions and 12 deletions

View File

@@ -8,6 +8,7 @@ from aioesphomeapi import EntityState, SensorState
import pytest import pytest
from .sensor_test_utils import build_key_to_sensor_mapping from .sensor_test_utils import build_key_to_sensor_mapping
from .state_utils import InitialStateHelper
from .types import APIClientConnectedFactory, RunCompiledFunction from .types import APIClientConnectedFactory, RunCompiledFunction
@@ -36,7 +37,7 @@ async def test_sensor_filters_ring_buffer(
if not isinstance(state, SensorState): if not isinstance(state, SensorState):
return return
# Skip NaN values (initial states) # Skip NaN values
if state.missing_state: if state.missing_state:
return return
@@ -76,8 +77,17 @@ async def test_sensor_filters_ring_buffer(
], ],
) )
# Subscribe to state changes AFTER building mapping # Set up initial state helper with all entities
client.subscribe_states(on_state) initial_state_helper = InitialStateHelper(entities)
# Subscribe to state changes with wrapper
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
# Wait for initial states to be sent before pressing button
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Find the publish button # Find the publish button
publish_button = next( publish_button = next(

View File

@@ -8,6 +8,7 @@ from aioesphomeapi import EntityState, SensorState
import pytest import pytest
from .sensor_test_utils import build_key_to_sensor_mapping from .sensor_test_utils import build_key_to_sensor_mapping
from .state_utils import InitialStateHelper
from .types import APIClientConnectedFactory, RunCompiledFunction from .types import APIClientConnectedFactory, RunCompiledFunction
@@ -41,7 +42,7 @@ async def test_sensor_filters_sliding_window(
if not isinstance(state, SensorState): if not isinstance(state, SensorState):
return return
# Skip NaN values (initial states) # Skip NaN values
if state.missing_state: if state.missing_state:
return return
@@ -108,8 +109,17 @@ async def test_sensor_filters_sliding_window(
], ],
) )
# Subscribe to state changes AFTER building mapping # Set up initial state helper with all entities
client.subscribe_states(on_state) initial_state_helper = InitialStateHelper(entities)
# Subscribe to state changes with wrapper
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
# Wait for initial states to be sent before pressing button
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Find the publish button # Find the publish button
publish_button = next( publish_button = next(
@@ -207,11 +217,12 @@ async def test_sensor_filters_nan_handling(
if not isinstance(state, SensorState): if not isinstance(state, SensorState):
return return
# Skip NaN values (initial states) # Skip NaN values
if state.missing_state: if state.missing_state:
return return
sensor_name = key_to_sensor.get(state.key) sensor_name = key_to_sensor.get(state.key)
if sensor_name == "min_nan": if sensor_name == "min_nan":
min_states.append(state.state) min_states.append(state.state)
elif sensor_name == "max_nan": elif sensor_name == "max_nan":
@@ -236,8 +247,17 @@ async def test_sensor_filters_nan_handling(
# Build key-to-sensor mapping # Build key-to-sensor mapping
key_to_sensor = build_key_to_sensor_mapping(entities, ["min_nan", "max_nan"]) key_to_sensor = build_key_to_sensor_mapping(entities, ["min_nan", "max_nan"])
# Subscribe to state changes AFTER building mapping # Set up initial state helper with all entities
client.subscribe_states(on_state) initial_state_helper = InitialStateHelper(entities)
# Subscribe to state changes with wrapper
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
# Wait for initial states
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Find the publish button # Find the publish button
publish_button = next( publish_button = next(
@@ -305,11 +325,12 @@ async def test_sensor_filters_ring_buffer_wraparound(
if not isinstance(state, SensorState): if not isinstance(state, SensorState):
return return
# Skip NaN values (initial states) # Skip NaN values
if state.missing_state: if state.missing_state:
return return
sensor_name = key_to_sensor.get(state.key) sensor_name = key_to_sensor.get(state.key)
if sensor_name == "wraparound_min": if sensor_name == "wraparound_min":
min_states.append(state.state) min_states.append(state.state)
# With batch_delay: 0ms, we should receive all 3 outputs # With batch_delay: 0ms, we should receive all 3 outputs
@@ -326,8 +347,17 @@ async def test_sensor_filters_ring_buffer_wraparound(
# Build key-to-sensor mapping # Build key-to-sensor mapping
key_to_sensor = build_key_to_sensor_mapping(entities, ["wraparound_min"]) key_to_sensor = build_key_to_sensor_mapping(entities, ["wraparound_min"])
# Subscribe to state changes AFTER building mapping # Set up initial state helper with all entities
client.subscribe_states(on_state) initial_state_helper = InitialStateHelper(entities)
# Subscribe to state changes with wrapper
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
# Wait for initial state
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial state")
# Find the publish button # Find the publish button
publish_button = next( publish_button = next(