diff --git a/tests/integration/state_utils.py b/tests/integration/state_utils.py new file mode 100644 index 0000000000..7392393501 --- /dev/null +++ b/tests/integration/state_utils.py @@ -0,0 +1,146 @@ +"""Shared utilities for ESPHome integration tests - state handling.""" + +from __future__ import annotations + +import asyncio +import logging + +from aioesphomeapi import ButtonInfo, EntityInfo, EntityState + +_LOGGER = logging.getLogger(__name__) + + +class InitialStateHelper: + """Helper to wait for initial states before processing test states. + + When an API client connects, ESPHome sends the current state of all entities. + This helper wraps the user's state callback and swallows the first state for + each entity, then forwards all subsequent states to the user callback. + + Usage: + entities, services = await client.list_entities_services() + helper = InitialStateHelper(entities) + client.subscribe_states(helper.on_state_wrapper(user_callback)) + await helper.wait_for_initial_states() + """ + + def __init__(self, entities: list[EntityInfo]) -> None: + """Initialize the helper. + + Args: + entities: All entities from list_entities_services() + """ + # Set of (device_id, key) tuples waiting for initial state + # Buttons are stateless, so exclude them + self._wait_initial_states = { + (entity.device_id, entity.key) + for entity in entities + if not isinstance(entity, ButtonInfo) + } + # Keep entity info for debugging - use (device_id, key) tuple + self._entities_by_id = { + (entity.device_id, entity.key): entity for entity in entities + } + + # Log all entities + _LOGGER.debug( + "InitialStateHelper: Found %d total entities: %s", + len(entities), + [(type(e).__name__, e.object_id) for e in entities], + ) + + # Log which ones we're waiting for + _LOGGER.debug( + "InitialStateHelper: Waiting for %d entities (excluding ButtonInfo): %s", + len(self._wait_initial_states), + [self._entities_by_id[k].object_id for k in self._wait_initial_states], + ) + + # Log which ones we're NOT waiting for + not_waiting = { + (e.device_id, e.key) for e in entities + } - self._wait_initial_states + _LOGGER.debug( + "InitialStateHelper: NOT waiting for %d entities: %s", + len(not_waiting), + [ + ( + type(self._entities_by_id[k]).__name__, + self._entities_by_id[k].object_id, + ) + for k in not_waiting + ], + ) + + # Create future in the running event loop + self._initial_states_received = asyncio.get_running_loop().create_future() + # If no entities to wait for, mark complete immediately + if not self._wait_initial_states: + self._initial_states_received.set_result(True) + + def on_state_wrapper(self, user_callback): + """Wrap a user callback to track initial states. + + Args: + user_callback: The user's state callback function + + Returns: + Wrapped callback that swallows first state per entity, forwards rest + """ + + def wrapper(state: EntityState) -> None: + """Swallow initial state per entity, forward subsequent states.""" + # Create entity identifier tuple + entity_id = (state.device_id, state.key) + + # Log which entity is sending state + if entity_id in self._entities_by_id: + entity = self._entities_by_id[entity_id] + _LOGGER.debug( + "Received state for %s (type: %s, device_id: %s, key: %d)", + entity.object_id, + type(entity).__name__, + state.device_id, + state.key, + ) + + # If this entity is waiting for initial state + if entity_id in self._wait_initial_states: + # Remove from waiting set + self._wait_initial_states.discard(entity_id) + + _LOGGER.debug( + "Swallowed initial state for %s, %d entities remaining", + self._entities_by_id[entity_id].object_id + if entity_id in self._entities_by_id + else entity_id, + len(self._wait_initial_states), + ) + + # Check if we've now seen all entities + if ( + not self._wait_initial_states + and not self._initial_states_received.done() + ): + _LOGGER.debug("All initial states received") + self._initial_states_received.set_result(True) + + # Don't forward initial state to user + return + + # Forward subsequent states to user callback + _LOGGER.debug("Forwarding state to user callback") + user_callback(state) + + return wrapper + + async def wait_for_initial_states(self, timeout: float = 5.0) -> None: + """Wait for all initial states to be received. + + Args: + timeout: Maximum time to wait in seconds + + Raises: + asyncio.TimeoutError: If initial states aren't received within timeout + """ + await asyncio.wait_for(self._initial_states_received, timeout=timeout) diff --git a/tests/integration/test_sensor_filters_ring_buffer.py b/tests/integration/test_sensor_filters_ring_buffer.py index da4862c14b..5d00986cc2 100644 --- a/tests/integration/test_sensor_filters_ring_buffer.py +++ b/tests/integration/test_sensor_filters_ring_buffer.py @@ -122,31 +122,31 @@ async def test_sensor_filters_ring_buffer( # Verify the values at each output position # Position 1: window=[1] - assert abs(sensor_states["sliding_min"][0] - 1.0) < 0.01 - assert abs(sensor_states["sliding_max"][0] - 1.0) < 0.01 - assert abs(sensor_states["sliding_median"][0] - 1.0) < 0.01 - assert abs(sensor_states["sliding_moving_avg"][0] - 1.0) < 0.01 + assert sensor_states["sliding_min"][0] == pytest.approx(1.0) + assert sensor_states["sliding_max"][0] == pytest.approx(1.0) + assert sensor_states["sliding_median"][0] == pytest.approx(1.0) + assert sensor_states["sliding_moving_avg"][0] == pytest.approx(1.0) # Position 3: window=[1,2,3] - assert abs(sensor_states["sliding_min"][1] - 1.0) < 0.01 - assert abs(sensor_states["sliding_max"][1] - 3.0) < 0.01 - assert abs(sensor_states["sliding_median"][1] - 2.0) < 0.01 - assert abs(sensor_states["sliding_moving_avg"][1] - 2.0) < 0.01 + assert sensor_states["sliding_min"][1] == pytest.approx(1.0) + assert sensor_states["sliding_max"][1] == pytest.approx(3.0) + assert sensor_states["sliding_median"][1] == pytest.approx(2.0) + assert sensor_states["sliding_moving_avg"][1] == pytest.approx(2.0) # Position 5: window=[1,2,3,4,5] - assert abs(sensor_states["sliding_min"][2] - 1.0) < 0.01 - assert abs(sensor_states["sliding_max"][2] - 5.0) < 0.01 - assert abs(sensor_states["sliding_median"][2] - 3.0) < 0.01 - assert abs(sensor_states["sliding_moving_avg"][2] - 3.0) < 0.01 + assert sensor_states["sliding_min"][2] == pytest.approx(1.0) + assert sensor_states["sliding_max"][2] == pytest.approx(5.0) + assert sensor_states["sliding_median"][2] == pytest.approx(3.0) + assert sensor_states["sliding_moving_avg"][2] == pytest.approx(3.0) # Position 7: window=[3,4,5,6,7] (ring buffer wrapped) - assert abs(sensor_states["sliding_min"][3] - 3.0) < 0.01 - assert abs(sensor_states["sliding_max"][3] - 7.0) < 0.01 - assert abs(sensor_states["sliding_median"][3] - 5.0) < 0.01 - assert abs(sensor_states["sliding_moving_avg"][3] - 5.0) < 0.01 + assert sensor_states["sliding_min"][3] == pytest.approx(3.0) + assert sensor_states["sliding_max"][3] == pytest.approx(7.0) + assert sensor_states["sliding_median"][3] == pytest.approx(5.0) + assert sensor_states["sliding_moving_avg"][3] == pytest.approx(5.0) # Position 9: window=[5,6,7,8,9] (ring buffer wrapped) - assert abs(sensor_states["sliding_min"][4] - 5.0) < 0.01 - assert abs(sensor_states["sliding_max"][4] - 9.0) < 0.01 - assert abs(sensor_states["sliding_median"][4] - 7.0) < 0.01 - assert abs(sensor_states["sliding_moving_avg"][4] - 7.0) < 0.01 + assert sensor_states["sliding_min"][4] == pytest.approx(5.0) + assert sensor_states["sliding_max"][4] == pytest.approx(9.0) + assert sensor_states["sliding_median"][4] == pytest.approx(7.0) + assert sensor_states["sliding_moving_avg"][4] == pytest.approx(7.0) diff --git a/tests/integration/test_sensor_filters_sliding_window.py b/tests/integration/test_sensor_filters_sliding_window.py index 389cbf2659..57ab65acd4 100644 --- a/tests/integration/test_sensor_filters_sliding_window.py +++ b/tests/integration/test_sensor_filters_sliding_window.py @@ -58,33 +58,33 @@ async def test_sensor_filters_sliding_window( # Filters send at position 1 and position 6 (send_every=5 means every 5th value after first) if ( sensor_name == "min_sensor" - and abs(state.state - 2.0) < 0.01 + and state.state == pytest.approx(2.0) and not min_received.done() ): min_received.set_result(True) elif ( sensor_name == "max_sensor" - and abs(state.state - 6.0) < 0.01 + and state.state == pytest.approx(6.0) and not max_received.done() ): max_received.set_result(True) elif ( sensor_name == "median_sensor" - and abs(state.state - 4.0) < 0.01 + and state.state == pytest.approx(4.0) and not median_received.done() ): # Median of [2, 3, 4, 5, 6] = 4 median_received.set_result(True) elif ( sensor_name == "quantile_sensor" - and abs(state.state - 6.0) < 0.01 + and state.state == pytest.approx(6.0) and not quantile_received.done() ): # 90th percentile of [2, 3, 4, 5, 6] = 6 quantile_received.set_result(True) elif ( sensor_name == "moving_avg_sensor" - and abs(state.state - 4.0) < 0.01 + and state.state == pytest.approx(4.0) and not moving_avg_received.done() ): # Average of [2, 3, 4, 5, 6] = 4 @@ -168,30 +168,30 @@ async def test_sensor_filters_sliding_window( assert len(sensor_states["moving_avg_sensor"]) == 2 # Verify the first output (after 1 value: [1]) - assert abs(sensor_states["min_sensor"][0] - 1.0) < 0.01, ( + assert sensor_states["min_sensor"][0] == pytest.approx(1.0), ( f"First min should be 1.0, got {sensor_states['min_sensor'][0]}" ) - assert abs(sensor_states["max_sensor"][0] - 1.0) < 0.01, ( + assert sensor_states["max_sensor"][0] == pytest.approx(1.0), ( f"First max should be 1.0, got {sensor_states['max_sensor'][0]}" ) - assert abs(sensor_states["median_sensor"][0] - 1.0) < 0.01, ( + assert sensor_states["median_sensor"][0] == pytest.approx(1.0), ( f"First median should be 1.0, got {sensor_states['median_sensor'][0]}" ) - assert abs(sensor_states["moving_avg_sensor"][0] - 1.0) < 0.01, ( + assert sensor_states["moving_avg_sensor"][0] == pytest.approx(1.0), ( f"First moving avg should be 1.0, got {sensor_states['moving_avg_sensor'][0]}" ) # Verify the second output (after 6 values, window has [2, 3, 4, 5, 6]) - assert abs(sensor_states["min_sensor"][1] - 2.0) < 0.01, ( + assert sensor_states["min_sensor"][1] == pytest.approx(2.0), ( f"Second min should be 2.0, got {sensor_states['min_sensor'][1]}" ) - assert abs(sensor_states["max_sensor"][1] - 6.0) < 0.01, ( + assert sensor_states["max_sensor"][1] == pytest.approx(6.0), ( f"Second max should be 6.0, got {sensor_states['max_sensor'][1]}" ) - assert abs(sensor_states["median_sensor"][1] - 4.0) < 0.01, ( + assert sensor_states["median_sensor"][1] == pytest.approx(4.0), ( f"Second median should be 4.0, got {sensor_states['median_sensor'][1]}" ) - assert abs(sensor_states["moving_avg_sensor"][1] - 4.0) < 0.01, ( + assert sensor_states["moving_avg_sensor"][1] == pytest.approx(4.0), ( f"Second moving avg should be 4.0, got {sensor_states['moving_avg_sensor'][1]}" ) @@ -291,18 +291,18 @@ async def test_sensor_filters_nan_handling( ) # First output - assert abs(min_states[0] - 10.0) < 0.01, ( + assert min_states[0] == pytest.approx(10.0), ( f"First min should be 10.0, got {min_states[0]}" ) - assert abs(max_states[0] - 10.0) < 0.01, ( + assert max_states[0] == pytest.approx(10.0), ( f"First max should be 10.0, got {max_states[0]}" ) # Second output - verify NaN values were ignored - assert abs(min_states[1] - 5.0) < 0.01, ( + assert min_states[1] == pytest.approx(5.0), ( f"Second min should ignore NaN and return 5.0, got {min_states[1]}" ) - assert abs(max_states[1] - 15.0) < 0.01, ( + assert max_states[1] == pytest.approx(15.0), ( f"Second max should ignore NaN and return 15.0, got {max_states[1]}" ) @@ -385,12 +385,12 @@ async def test_sensor_filters_ring_buffer_wraparound( assert len(min_states) == 3, ( f"Should have 3 states, got {len(min_states)}: {min_states}" ) - assert abs(min_states[0] - 10.0) < 0.01, ( + assert min_states[0] == pytest.approx(10.0), ( f"First min should be 10.0, got {min_states[0]}" ) - assert abs(min_states[1] - 5.0) < 0.01, ( + assert min_states[1] == pytest.approx(5.0), ( f"Second min should be 5.0, got {min_states[1]}" ) - assert abs(min_states[2] - 15.0) < 0.01, ( + assert min_states[2] == pytest.approx(15.0), ( f"Third min should be 15.0, got {min_states[2]}" )