mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-30 22:53:59 +00:00 
			
		
		
		
	[sensor] Optimize sliding window filters to eliminate heap fragmentation (#11282)
This commit is contained in:
		| @@ -7,6 +7,7 @@ This directory contains end-to-end integration tests for ESPHome, focusing on te | ||||
| - `conftest.py` - Common fixtures and utilities | ||||
| - `const.py` - Constants used throughout the integration tests | ||||
| - `types.py` - Type definitions for fixtures and functions | ||||
| - `state_utils.py` - State handling utilities (e.g., `InitialStateHelper`, `build_key_to_entity_mapping`) | ||||
| - `fixtures/` - YAML configuration files for tests | ||||
| - `test_*.py` - Individual test files | ||||
|  | ||||
| @@ -26,6 +27,32 @@ The `yaml_config` fixture automatically loads YAML configurations based on the t | ||||
| - `reserved_tcp_port` - Reserves a TCP port by holding the socket open until ESPHome needs it | ||||
| - `unused_tcp_port` - Provides the reserved port number for each test | ||||
|  | ||||
| ### Helper Utilities | ||||
|  | ||||
| #### InitialStateHelper (`state_utils.py`) | ||||
|  | ||||
| The `InitialStateHelper` class solves a common problem in integration tests: when an API client connects, ESPHome automatically broadcasts the current state of all entities. This can interfere with tests that want to track only new state changes triggered by test actions. | ||||
|  | ||||
| **What it does:** | ||||
| - Tracks all entities (except stateless ones like buttons) | ||||
| - Swallows the first state broadcast for each entity | ||||
| - Forwards all subsequent state changes to your test callback | ||||
| - Provides `wait_for_initial_states()` to synchronize before test actions | ||||
|  | ||||
| **When to use it:** | ||||
| - Any test that triggers entity state changes and needs to verify them | ||||
| - Tests that would otherwise see duplicate or unexpected states | ||||
| - Tests that need clean separation between initial state and test-triggered changes | ||||
|  | ||||
| **Implementation details:** | ||||
| - Uses `(device_id, key)` tuples to uniquely identify entities across devices | ||||
| - Automatically excludes `ButtonInfo` entities (stateless) | ||||
| - Provides debug logging to track state reception (use `--log-cli-level=DEBUG`) | ||||
| - Safe for concurrent use with multiple entity types | ||||
|  | ||||
| **Future work:** | ||||
| Consider converting existing integration tests to use `InitialStateHelper` for more reliable state tracking and to eliminate race conditions related to initial state broadcasts. | ||||
|  | ||||
| ### Writing Tests | ||||
|  | ||||
| The simplest way to write a test is to use the `run_compiled` and `api_client_connected` fixtures: | ||||
| @@ -125,6 +152,54 @@ async def test_my_sensor( | ||||
| ``` | ||||
|  | ||||
| ##### State Subscription Pattern | ||||
|  | ||||
| **Recommended: Using InitialStateHelper** | ||||
|  | ||||
| When an API client connects, ESPHome automatically sends the current state of all entities. The `InitialStateHelper` (from `state_utils.py`) handles this by swallowing these initial states and only forwarding subsequent state changes to your test callback: | ||||
|  | ||||
| ```python | ||||
| from .state_utils import InitialStateHelper | ||||
|  | ||||
| # Track state changes with futures | ||||
| loop = asyncio.get_running_loop() | ||||
| states: dict[int, EntityState] = {} | ||||
| state_future: asyncio.Future[EntityState] = loop.create_future() | ||||
|  | ||||
| def on_state(state: EntityState) -> None: | ||||
|     """This callback only receives NEW state changes, not initial states.""" | ||||
|     states[state.key] = state | ||||
|     # Check for specific condition using isinstance | ||||
|     if isinstance(state, SensorState) and state.state == expected_value: | ||||
|         if not state_future.done(): | ||||
|             state_future.set_result(state) | ||||
|  | ||||
| # Get entities and set up state synchronization | ||||
| entities, services = await client.list_entities_services() | ||||
| initial_state_helper = InitialStateHelper(entities) | ||||
|  | ||||
| # Subscribe with the wrapper that filters initial states | ||||
| client.subscribe_states(initial_state_helper.on_state_wrapper(on_state)) | ||||
|  | ||||
| # Wait for all initial states to be broadcast | ||||
| try: | ||||
|     await initial_state_helper.wait_for_initial_states() | ||||
| except TimeoutError: | ||||
|     pytest.fail("Timeout waiting for initial states") | ||||
|  | ||||
| # Now perform your test actions - on_state will only receive new changes | ||||
| # ... trigger state changes ... | ||||
|  | ||||
| # Wait for expected state | ||||
| try: | ||||
|     result = await asyncio.wait_for(state_future, timeout=5.0) | ||||
| except asyncio.TimeoutError: | ||||
|     pytest.fail(f"Expected state not received. Got: {list(states.values())}") | ||||
| ``` | ||||
|  | ||||
| **Legacy: Manual State Tracking** | ||||
|  | ||||
| If you need to handle initial states manually (not recommended for new tests): | ||||
|  | ||||
| ```python | ||||
| # Track state changes with futures | ||||
| loop = asyncio.get_running_loop() | ||||
|   | ||||
							
								
								
									
										58
									
								
								tests/integration/fixtures/sensor_filters_batch_window.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										58
									
								
								tests/integration/fixtures/sensor_filters_batch_window.yaml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,58 @@ | ||||
| esphome: | ||||
|   name: test-batch-window-filters | ||||
|  | ||||
| host: | ||||
| api: | ||||
|   batch_delay: 0ms  # Disable batching to receive all state updates | ||||
| logger: | ||||
|   level: DEBUG | ||||
|  | ||||
| # Template sensor that we'll use to publish values | ||||
| sensor: | ||||
|   - platform: template | ||||
|     name: "Source Sensor" | ||||
|     id: source_sensor | ||||
|     accuracy_decimals: 2 | ||||
|  | ||||
|   # Batch window filters (window_size == send_every) - use streaming filters | ||||
|   - platform: copy | ||||
|     source_id: source_sensor | ||||
|     name: "Min Sensor" | ||||
|     id: min_sensor | ||||
|     filters: | ||||
|       - min: | ||||
|           window_size: 5 | ||||
|           send_every: 5 | ||||
|           send_first_at: 1 | ||||
|  | ||||
|   - platform: copy | ||||
|     source_id: source_sensor | ||||
|     name: "Max Sensor" | ||||
|     id: max_sensor | ||||
|     filters: | ||||
|       - max: | ||||
|           window_size: 5 | ||||
|           send_every: 5 | ||||
|           send_first_at: 1 | ||||
|  | ||||
|   - platform: copy | ||||
|     source_id: source_sensor | ||||
|     name: "Moving Avg Sensor" | ||||
|     id: moving_avg_sensor | ||||
|     filters: | ||||
|       - sliding_window_moving_average: | ||||
|           window_size: 5 | ||||
|           send_every: 5 | ||||
|           send_first_at: 1 | ||||
|  | ||||
| # Button to trigger publishing test values | ||||
| button: | ||||
|   - platform: template | ||||
|     name: "Publish Values Button" | ||||
|     id: publish_button | ||||
|     on_press: | ||||
|       - lambda: |- | ||||
|           // Publish 10 values: 1.0, 2.0, ..., 10.0 | ||||
|           for (int i = 1; i <= 10; i++) { | ||||
|             id(source_sensor).publish_state(float(i)); | ||||
|           } | ||||
							
								
								
									
										84
									
								
								tests/integration/fixtures/sensor_filters_nan_handling.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										84
									
								
								tests/integration/fixtures/sensor_filters_nan_handling.yaml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,84 @@ | ||||
| esphome: | ||||
|   name: test-nan-handling | ||||
|  | ||||
| host: | ||||
| api: | ||||
|   batch_delay: 0ms  # Disable batching to receive all state updates | ||||
| logger: | ||||
|   level: DEBUG | ||||
|  | ||||
| sensor: | ||||
|   - platform: template | ||||
|     name: "Source NaN Sensor" | ||||
|     id: source_nan_sensor | ||||
|     accuracy_decimals: 2 | ||||
|  | ||||
|   - platform: copy | ||||
|     source_id: source_nan_sensor | ||||
|     name: "Min NaN Sensor" | ||||
|     id: min_nan_sensor | ||||
|     filters: | ||||
|       - min: | ||||
|           window_size: 5 | ||||
|           send_every: 5 | ||||
|           send_first_at: 1 | ||||
|  | ||||
|   - platform: copy | ||||
|     source_id: source_nan_sensor | ||||
|     name: "Max NaN Sensor" | ||||
|     id: max_nan_sensor | ||||
|     filters: | ||||
|       - max: | ||||
|           window_size: 5 | ||||
|           send_every: 5 | ||||
|           send_first_at: 1 | ||||
|  | ||||
| script: | ||||
|   - id: publish_nan_values_script | ||||
|     then: | ||||
|       - sensor.template.publish: | ||||
|           id: source_nan_sensor | ||||
|           state: 10.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_nan_sensor | ||||
|           state: !lambda 'return NAN;' | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_nan_sensor | ||||
|           state: 5.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_nan_sensor | ||||
|           state: !lambda 'return NAN;' | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_nan_sensor | ||||
|           state: 15.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_nan_sensor | ||||
|           state: 8.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_nan_sensor | ||||
|           state: !lambda 'return NAN;' | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_nan_sensor | ||||
|           state: 12.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_nan_sensor | ||||
|           state: 3.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_nan_sensor | ||||
|           state: !lambda 'return NAN;' | ||||
|  | ||||
| button: | ||||
|   - platform: template | ||||
|     name: "Publish NaN Values Button" | ||||
|     id: publish_nan_button | ||||
|     on_press: | ||||
|       - script.execute: publish_nan_values_script | ||||
							
								
								
									
										115
									
								
								tests/integration/fixtures/sensor_filters_ring_buffer.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										115
									
								
								tests/integration/fixtures/sensor_filters_ring_buffer.yaml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,115 @@ | ||||
| esphome: | ||||
|   name: test-sliding-window-filters | ||||
|  | ||||
| host: | ||||
| api: | ||||
|   batch_delay: 0ms  # Disable batching to receive all state updates | ||||
| logger: | ||||
|   level: DEBUG | ||||
|  | ||||
| # Template sensor that we'll use to publish values | ||||
| sensor: | ||||
|   - platform: template | ||||
|     name: "Source Sensor" | ||||
|     id: source_sensor | ||||
|     accuracy_decimals: 2 | ||||
|  | ||||
|   # ACTUAL sliding window filters (window_size != send_every) - use ring buffers | ||||
|   # Window of 5, send every 2 values | ||||
|   - platform: copy | ||||
|     source_id: source_sensor | ||||
|     name: "Sliding Min Sensor" | ||||
|     id: sliding_min_sensor | ||||
|     filters: | ||||
|       - min: | ||||
|           window_size: 5 | ||||
|           send_every: 2 | ||||
|           send_first_at: 1 | ||||
|  | ||||
|   - platform: copy | ||||
|     source_id: source_sensor | ||||
|     name: "Sliding Max Sensor" | ||||
|     id: sliding_max_sensor | ||||
|     filters: | ||||
|       - max: | ||||
|           window_size: 5 | ||||
|           send_every: 2 | ||||
|           send_first_at: 1 | ||||
|  | ||||
|   - platform: copy | ||||
|     source_id: source_sensor | ||||
|     name: "Sliding Median Sensor" | ||||
|     id: sliding_median_sensor | ||||
|     filters: | ||||
|       - median: | ||||
|           window_size: 5 | ||||
|           send_every: 2 | ||||
|           send_first_at: 1 | ||||
|  | ||||
|   - platform: copy | ||||
|     source_id: source_sensor | ||||
|     name: "Sliding Moving Avg Sensor" | ||||
|     id: sliding_moving_avg_sensor | ||||
|     filters: | ||||
|       - sliding_window_moving_average: | ||||
|           window_size: 5 | ||||
|           send_every: 2 | ||||
|           send_first_at: 1 | ||||
|  | ||||
| # Button to trigger publishing test values | ||||
| script: | ||||
|   - id: publish_values_script | ||||
|     then: | ||||
|       # Publish 10 values: 1.0, 2.0, ..., 10.0 | ||||
|       # With window_size=5, send_every=2, send_first_at=1: | ||||
|       # - Output at position 1: window=[1], min=1, max=1, median=1, avg=1 | ||||
|       # - Output at position 3: window=[1,2,3], min=1, max=3, median=2, avg=2 | ||||
|       # - Output at position 5: window=[1,2,3,4,5], min=1, max=5, median=3, avg=3 | ||||
|       # - Output at position 7: window=[3,4,5,6,7], min=3, max=7, median=5, avg=5 | ||||
|       # - Output at position 9: window=[5,6,7,8,9], min=5, max=9, median=7, avg=7 | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 1.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 2.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 3.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 4.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 5.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 6.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 7.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 8.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 9.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 10.0 | ||||
|  | ||||
| button: | ||||
|   - platform: template | ||||
|     name: "Publish Values Button" | ||||
|     id: publish_button | ||||
|     on_press: | ||||
|       - script.execute: publish_values_script | ||||
| @@ -0,0 +1,72 @@ | ||||
| esphome: | ||||
|   name: test-ring-buffer-wraparound | ||||
|  | ||||
| host: | ||||
| api: | ||||
|   batch_delay: 0ms  # Disable batching to receive all state updates | ||||
| logger: | ||||
|   level: DEBUG | ||||
|  | ||||
| sensor: | ||||
|   - platform: template | ||||
|     name: "Source Wraparound Sensor" | ||||
|     id: source_wraparound | ||||
|     accuracy_decimals: 2 | ||||
|  | ||||
|   - platform: copy | ||||
|     source_id: source_wraparound | ||||
|     name: "Wraparound Min Sensor" | ||||
|     id: wraparound_min_sensor | ||||
|     filters: | ||||
|       - min: | ||||
|           window_size: 3 | ||||
|           send_every: 3 | ||||
|           send_first_at: 1 | ||||
|  | ||||
| script: | ||||
|   - id: publish_wraparound_script | ||||
|     then: | ||||
|       # Publish 9 values to test ring buffer wraparound | ||||
|       # Values: 10, 20, 30, 5, 25, 15, 40, 35, 20 | ||||
|       - sensor.template.publish: | ||||
|           id: source_wraparound | ||||
|           state: 10.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_wraparound | ||||
|           state: 20.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_wraparound | ||||
|           state: 30.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_wraparound | ||||
|           state: 5.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_wraparound | ||||
|           state: 25.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_wraparound | ||||
|           state: 15.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_wraparound | ||||
|           state: 40.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_wraparound | ||||
|           state: 35.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_wraparound | ||||
|           state: 20.0 | ||||
|  | ||||
| button: | ||||
|   - platform: template | ||||
|     name: "Publish Wraparound Button" | ||||
|     id: publish_wraparound_button | ||||
|     on_press: | ||||
|       - script.execute: publish_wraparound_script | ||||
							
								
								
									
										123
									
								
								tests/integration/fixtures/sensor_filters_sliding_window.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										123
									
								
								tests/integration/fixtures/sensor_filters_sliding_window.yaml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,123 @@ | ||||
| esphome: | ||||
|   name: test-sliding-window-filters | ||||
|  | ||||
| host: | ||||
| api: | ||||
|   batch_delay: 0ms  # Disable batching to receive all state updates | ||||
| logger: | ||||
|   level: DEBUG | ||||
|  | ||||
| # Template sensor that we'll use to publish values | ||||
| sensor: | ||||
|   - platform: template | ||||
|     name: "Source Sensor" | ||||
|     id: source_sensor | ||||
|     accuracy_decimals: 2 | ||||
|  | ||||
|   # Min filter sensor | ||||
|   - platform: copy | ||||
|     source_id: source_sensor | ||||
|     name: "Min Sensor" | ||||
|     id: min_sensor | ||||
|     filters: | ||||
|       - min: | ||||
|           window_size: 5 | ||||
|           send_every: 5 | ||||
|           send_first_at: 1 | ||||
|  | ||||
|   # Max filter sensor | ||||
|   - platform: copy | ||||
|     source_id: source_sensor | ||||
|     name: "Max Sensor" | ||||
|     id: max_sensor | ||||
|     filters: | ||||
|       - max: | ||||
|           window_size: 5 | ||||
|           send_every: 5 | ||||
|           send_first_at: 1 | ||||
|  | ||||
|   # Median filter sensor | ||||
|   - platform: copy | ||||
|     source_id: source_sensor | ||||
|     name: "Median Sensor" | ||||
|     id: median_sensor | ||||
|     filters: | ||||
|       - median: | ||||
|           window_size: 5 | ||||
|           send_every: 5 | ||||
|           send_first_at: 1 | ||||
|  | ||||
|   # Quantile filter sensor (90th percentile) | ||||
|   - platform: copy | ||||
|     source_id: source_sensor | ||||
|     name: "Quantile Sensor" | ||||
|     id: quantile_sensor | ||||
|     filters: | ||||
|       - quantile: | ||||
|           window_size: 5 | ||||
|           send_every: 5 | ||||
|           send_first_at: 1 | ||||
|           quantile: 0.9 | ||||
|  | ||||
|   # Moving average filter sensor | ||||
|   - platform: copy | ||||
|     source_id: source_sensor | ||||
|     name: "Moving Avg Sensor" | ||||
|     id: moving_avg_sensor | ||||
|     filters: | ||||
|       - sliding_window_moving_average: | ||||
|           window_size: 5 | ||||
|           send_every: 5 | ||||
|           send_first_at: 1 | ||||
|  | ||||
| # Script to publish values with delays | ||||
| script: | ||||
|   - id: publish_values_script | ||||
|     then: | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 1.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 2.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 3.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 4.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 5.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 6.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 7.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 8.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 9.0 | ||||
|       - delay: 20ms | ||||
|       - sensor.template.publish: | ||||
|           id: source_sensor | ||||
|           state: 10.0 | ||||
|  | ||||
| # Button to trigger publishing test values | ||||
| button: | ||||
|   - platform: template | ||||
|     name: "Publish Values Button" | ||||
|     id: publish_button | ||||
|     on_press: | ||||
|       - script.execute: publish_values_script | ||||
							
								
								
									
										167
									
								
								tests/integration/state_utils.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										167
									
								
								tests/integration/state_utils.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,167 @@ | ||||
| """Shared utilities for ESPHome integration tests - state handling.""" | ||||
|  | ||||
| from __future__ import annotations | ||||
|  | ||||
| import asyncio | ||||
| import logging | ||||
|  | ||||
| from aioesphomeapi import ButtonInfo, EntityInfo, EntityState | ||||
|  | ||||
| _LOGGER = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
| def build_key_to_entity_mapping( | ||||
|     entities: list[EntityInfo], entity_names: list[str] | ||||
| ) -> dict[int, str]: | ||||
|     """Build a mapping from entity keys to entity names. | ||||
|  | ||||
|     Args: | ||||
|         entities: List of entity info objects from the API | ||||
|         entity_names: List of entity names to search for in object_ids | ||||
|  | ||||
|     Returns: | ||||
|         Dictionary mapping entity keys to entity names | ||||
|     """ | ||||
|     key_to_entity: dict[int, str] = {} | ||||
|     for entity in entities: | ||||
|         obj_id = entity.object_id.lower() | ||||
|         for entity_name in entity_names: | ||||
|             if entity_name in obj_id: | ||||
|                 key_to_entity[entity.key] = entity_name | ||||
|                 break | ||||
|     return key_to_entity | ||||
|  | ||||
|  | ||||
| class InitialStateHelper: | ||||
|     """Helper to wait for initial states before processing test states. | ||||
|  | ||||
|     When an API client connects, ESPHome sends the current state of all entities. | ||||
|     This helper wraps the user's state callback and swallows the first state for | ||||
|     each entity, then forwards all subsequent states to the user callback. | ||||
|  | ||||
|     Usage: | ||||
|         entities, services = await client.list_entities_services() | ||||
|         helper = InitialStateHelper(entities) | ||||
|         client.subscribe_states(helper.on_state_wrapper(user_callback)) | ||||
|         await helper.wait_for_initial_states() | ||||
|     """ | ||||
|  | ||||
|     def __init__(self, entities: list[EntityInfo]) -> None: | ||||
|         """Initialize the helper. | ||||
|  | ||||
|         Args: | ||||
|             entities: All entities from list_entities_services() | ||||
|         """ | ||||
|         # Set of (device_id, key) tuples waiting for initial state | ||||
|         # Buttons are stateless, so exclude them | ||||
|         self._wait_initial_states = { | ||||
|             (entity.device_id, entity.key) | ||||
|             for entity in entities | ||||
|             if not isinstance(entity, ButtonInfo) | ||||
|         } | ||||
|         # Keep entity info for debugging - use (device_id, key) tuple | ||||
|         self._entities_by_id = { | ||||
|             (entity.device_id, entity.key): entity for entity in entities | ||||
|         } | ||||
|  | ||||
|         # Log all entities | ||||
|         _LOGGER.debug( | ||||
|             "InitialStateHelper: Found %d total entities: %s", | ||||
|             len(entities), | ||||
|             [(type(e).__name__, e.object_id) for e in entities], | ||||
|         ) | ||||
|  | ||||
|         # Log which ones we're waiting for | ||||
|         _LOGGER.debug( | ||||
|             "InitialStateHelper: Waiting for %d entities (excluding ButtonInfo): %s", | ||||
|             len(self._wait_initial_states), | ||||
|             [self._entities_by_id[k].object_id for k in self._wait_initial_states], | ||||
|         ) | ||||
|  | ||||
|         # Log which ones we're NOT waiting for | ||||
|         not_waiting = { | ||||
|             (e.device_id, e.key) for e in entities | ||||
|         } - self._wait_initial_states | ||||
|         if not_waiting: | ||||
|             not_waiting_info = [ | ||||
|                 f"{type(self._entities_by_id[k]).__name__}:{self._entities_by_id[k].object_id}" | ||||
|                 for k in not_waiting | ||||
|             ] | ||||
|             _LOGGER.debug( | ||||
|                 "InitialStateHelper: NOT waiting for %d entities: %s", | ||||
|                 len(not_waiting), | ||||
|                 not_waiting_info, | ||||
|             ) | ||||
|  | ||||
|         # Create future in the running event loop | ||||
|         self._initial_states_received = asyncio.get_running_loop().create_future() | ||||
|         # If no entities to wait for, mark complete immediately | ||||
|         if not self._wait_initial_states: | ||||
|             self._initial_states_received.set_result(True) | ||||
|  | ||||
|     def on_state_wrapper(self, user_callback): | ||||
|         """Wrap a user callback to track initial states. | ||||
|  | ||||
|         Args: | ||||
|             user_callback: The user's state callback function | ||||
|  | ||||
|         Returns: | ||||
|             Wrapped callback that swallows first state per entity, forwards rest | ||||
|         """ | ||||
|  | ||||
|         def wrapper(state: EntityState) -> None: | ||||
|             """Swallow initial state per entity, forward subsequent states.""" | ||||
|             # Create entity identifier tuple | ||||
|             entity_id = (state.device_id, state.key) | ||||
|  | ||||
|             # Log which entity is sending state | ||||
|             if entity_id in self._entities_by_id: | ||||
|                 entity = self._entities_by_id[entity_id] | ||||
|                 _LOGGER.debug( | ||||
|                     "Received state for %s (type: %s, device_id: %s, key: %d)", | ||||
|                     entity.object_id, | ||||
|                     type(entity).__name__, | ||||
|                     state.device_id, | ||||
|                     state.key, | ||||
|                 ) | ||||
|  | ||||
|             # If this entity is waiting for initial state | ||||
|             if entity_id in self._wait_initial_states: | ||||
|                 # Remove from waiting set | ||||
|                 self._wait_initial_states.discard(entity_id) | ||||
|  | ||||
|                 _LOGGER.debug( | ||||
|                     "Swallowed initial state for %s, %d entities remaining", | ||||
|                     self._entities_by_id[entity_id].object_id | ||||
|                     if entity_id in self._entities_by_id | ||||
|                     else entity_id, | ||||
|                     len(self._wait_initial_states), | ||||
|                 ) | ||||
|  | ||||
|                 # Check if we've now seen all entities | ||||
|                 if ( | ||||
|                     not self._wait_initial_states | ||||
|                     and not self._initial_states_received.done() | ||||
|                 ): | ||||
|                     _LOGGER.debug("All initial states received") | ||||
|                     self._initial_states_received.set_result(True) | ||||
|  | ||||
|                 # Don't forward initial state to user | ||||
|                 return | ||||
|  | ||||
|             # Forward subsequent states to user callback | ||||
|             _LOGGER.debug("Forwarding state to user callback") | ||||
|             user_callback(state) | ||||
|  | ||||
|         return wrapper | ||||
|  | ||||
|     async def wait_for_initial_states(self, timeout: float = 5.0) -> None: | ||||
|         """Wait for all initial states to be received. | ||||
|  | ||||
|         Args: | ||||
|             timeout: Maximum time to wait in seconds | ||||
|  | ||||
|         Raises: | ||||
|             asyncio.TimeoutError: If initial states aren't received within timeout | ||||
|         """ | ||||
|         await asyncio.wait_for(self._initial_states_received, timeout=timeout) | ||||
							
								
								
									
										151
									
								
								tests/integration/test_sensor_filters_ring_buffer.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										151
									
								
								tests/integration/test_sensor_filters_ring_buffer.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,151 @@ | ||||
| """Test sensor ring buffer filter functionality (window_size != send_every).""" | ||||
|  | ||||
| from __future__ import annotations | ||||
|  | ||||
| import asyncio | ||||
|  | ||||
| from aioesphomeapi import EntityState, SensorState | ||||
| import pytest | ||||
|  | ||||
| from .state_utils import InitialStateHelper, build_key_to_entity_mapping | ||||
| from .types import APIClientConnectedFactory, RunCompiledFunction | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_sensor_filters_ring_buffer( | ||||
|     yaml_config: str, | ||||
|     run_compiled: RunCompiledFunction, | ||||
|     api_client_connected: APIClientConnectedFactory, | ||||
| ) -> None: | ||||
|     """Test that ring buffer filters (window_size != send_every) work correctly.""" | ||||
|     loop = asyncio.get_running_loop() | ||||
|  | ||||
|     # Track state changes for each sensor | ||||
|     sensor_states: dict[str, list[float]] = { | ||||
|         "sliding_min": [], | ||||
|         "sliding_max": [], | ||||
|         "sliding_median": [], | ||||
|         "sliding_moving_avg": [], | ||||
|     } | ||||
|  | ||||
|     # Futures to track when we receive expected values | ||||
|     all_updates_received = loop.create_future() | ||||
|  | ||||
|     def on_state(state: EntityState) -> None: | ||||
|         """Track sensor state updates.""" | ||||
|         if not isinstance(state, SensorState): | ||||
|             return | ||||
|  | ||||
|         # Skip NaN values | ||||
|         if state.missing_state: | ||||
|             return | ||||
|  | ||||
|         # Get the sensor name from the key mapping | ||||
|         sensor_name = key_to_sensor.get(state.key) | ||||
|         if not sensor_name or sensor_name not in sensor_states: | ||||
|             return | ||||
|  | ||||
|         sensor_states[sensor_name].append(state.state) | ||||
|  | ||||
|         # Check if we've received enough updates from all sensors | ||||
|         # With send_every=2, send_first_at=1, we expect 5 outputs per sensor | ||||
|         if ( | ||||
|             len(sensor_states["sliding_min"]) >= 5 | ||||
|             and len(sensor_states["sliding_max"]) >= 5 | ||||
|             and len(sensor_states["sliding_median"]) >= 5 | ||||
|             and len(sensor_states["sliding_moving_avg"]) >= 5 | ||||
|             and not all_updates_received.done() | ||||
|         ): | ||||
|             all_updates_received.set_result(True) | ||||
|  | ||||
|     async with ( | ||||
|         run_compiled(yaml_config), | ||||
|         api_client_connected() as client, | ||||
|     ): | ||||
|         # Get entities first to build key mapping | ||||
|         entities, services = await client.list_entities_services() | ||||
|  | ||||
|         # Build key-to-sensor mapping | ||||
|         key_to_sensor = build_key_to_entity_mapping( | ||||
|             entities, | ||||
|             [ | ||||
|                 "sliding_min", | ||||
|                 "sliding_max", | ||||
|                 "sliding_median", | ||||
|                 "sliding_moving_avg", | ||||
|             ], | ||||
|         ) | ||||
|  | ||||
|         # Set up initial state helper with all entities | ||||
|         initial_state_helper = InitialStateHelper(entities) | ||||
|  | ||||
|         # Subscribe to state changes with wrapper | ||||
|         client.subscribe_states(initial_state_helper.on_state_wrapper(on_state)) | ||||
|  | ||||
|         # Wait for initial states to be sent before pressing button | ||||
|         try: | ||||
|             await initial_state_helper.wait_for_initial_states() | ||||
|         except TimeoutError: | ||||
|             pytest.fail("Timeout waiting for initial states") | ||||
|  | ||||
|         # Find the publish button | ||||
|         publish_button = next( | ||||
|             (e for e in entities if "publish_values_button" in e.object_id.lower()), | ||||
|             None, | ||||
|         ) | ||||
|         assert publish_button is not None, "Publish Values Button not found" | ||||
|  | ||||
|         # Press the button to publish test values | ||||
|         client.button_command(publish_button.key) | ||||
|  | ||||
|         # Wait for all sensors to receive their values | ||||
|         try: | ||||
|             await asyncio.wait_for(all_updates_received, timeout=10.0) | ||||
|         except TimeoutError: | ||||
|             # Provide detailed failure info | ||||
|             pytest.fail( | ||||
|                 f"Timeout waiting for updates. Received states:\n" | ||||
|                 f"  min: {sensor_states['sliding_min']}\n" | ||||
|                 f"  max: {sensor_states['sliding_max']}\n" | ||||
|                 f"  median: {sensor_states['sliding_median']}\n" | ||||
|                 f"  moving_avg: {sensor_states['sliding_moving_avg']}" | ||||
|             ) | ||||
|  | ||||
|         # Verify we got 5 outputs per sensor (positions 1, 3, 5, 7, 9) | ||||
|         assert len(sensor_states["sliding_min"]) == 5, ( | ||||
|             f"Min sensor should have 5 values, got {len(sensor_states['sliding_min'])}: {sensor_states['sliding_min']}" | ||||
|         ) | ||||
|         assert len(sensor_states["sliding_max"]) == 5 | ||||
|         assert len(sensor_states["sliding_median"]) == 5 | ||||
|         assert len(sensor_states["sliding_moving_avg"]) == 5 | ||||
|  | ||||
|         # Verify the values at each output position | ||||
|         # Position 1: window=[1] | ||||
|         assert sensor_states["sliding_min"][0] == pytest.approx(1.0) | ||||
|         assert sensor_states["sliding_max"][0] == pytest.approx(1.0) | ||||
|         assert sensor_states["sliding_median"][0] == pytest.approx(1.0) | ||||
|         assert sensor_states["sliding_moving_avg"][0] == pytest.approx(1.0) | ||||
|  | ||||
|         # Position 3: window=[1,2,3] | ||||
|         assert sensor_states["sliding_min"][1] == pytest.approx(1.0) | ||||
|         assert sensor_states["sliding_max"][1] == pytest.approx(3.0) | ||||
|         assert sensor_states["sliding_median"][1] == pytest.approx(2.0) | ||||
|         assert sensor_states["sliding_moving_avg"][1] == pytest.approx(2.0) | ||||
|  | ||||
|         # Position 5: window=[1,2,3,4,5] | ||||
|         assert sensor_states["sliding_min"][2] == pytest.approx(1.0) | ||||
|         assert sensor_states["sliding_max"][2] == pytest.approx(5.0) | ||||
|         assert sensor_states["sliding_median"][2] == pytest.approx(3.0) | ||||
|         assert sensor_states["sliding_moving_avg"][2] == pytest.approx(3.0) | ||||
|  | ||||
|         # Position 7: window=[3,4,5,6,7] (ring buffer wrapped) | ||||
|         assert sensor_states["sliding_min"][3] == pytest.approx(3.0) | ||||
|         assert sensor_states["sliding_max"][3] == pytest.approx(7.0) | ||||
|         assert sensor_states["sliding_median"][3] == pytest.approx(5.0) | ||||
|         assert sensor_states["sliding_moving_avg"][3] == pytest.approx(5.0) | ||||
|  | ||||
|         # Position 9: window=[5,6,7,8,9] (ring buffer wrapped) | ||||
|         assert sensor_states["sliding_min"][4] == pytest.approx(5.0) | ||||
|         assert sensor_states["sliding_max"][4] == pytest.approx(9.0) | ||||
|         assert sensor_states["sliding_median"][4] == pytest.approx(7.0) | ||||
|         assert sensor_states["sliding_moving_avg"][4] == pytest.approx(7.0) | ||||
							
								
								
									
										395
									
								
								tests/integration/test_sensor_filters_sliding_window.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										395
									
								
								tests/integration/test_sensor_filters_sliding_window.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,395 @@ | ||||
| """Test sensor sliding window filter functionality.""" | ||||
|  | ||||
| from __future__ import annotations | ||||
|  | ||||
| import asyncio | ||||
|  | ||||
| from aioesphomeapi import EntityState, SensorState | ||||
| import pytest | ||||
|  | ||||
| from .state_utils import InitialStateHelper, build_key_to_entity_mapping | ||||
| from .types import APIClientConnectedFactory, RunCompiledFunction | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_sensor_filters_sliding_window( | ||||
|     yaml_config: str, | ||||
|     run_compiled: RunCompiledFunction, | ||||
|     api_client_connected: APIClientConnectedFactory, | ||||
| ) -> None: | ||||
|     """Test that sliding window filters (min, max, median, quantile, moving_average) work correctly.""" | ||||
|     loop = asyncio.get_running_loop() | ||||
|  | ||||
|     # Track state changes for each sensor | ||||
|     sensor_states: dict[str, list[float]] = { | ||||
|         "min_sensor": [], | ||||
|         "max_sensor": [], | ||||
|         "median_sensor": [], | ||||
|         "quantile_sensor": [], | ||||
|         "moving_avg_sensor": [], | ||||
|     } | ||||
|  | ||||
|     # Futures to track when we receive expected values | ||||
|     min_received = loop.create_future() | ||||
|     max_received = loop.create_future() | ||||
|     median_received = loop.create_future() | ||||
|     quantile_received = loop.create_future() | ||||
|     moving_avg_received = loop.create_future() | ||||
|  | ||||
|     def on_state(state: EntityState) -> None: | ||||
|         """Track sensor state updates.""" | ||||
|         if not isinstance(state, SensorState): | ||||
|             return | ||||
|  | ||||
|         # Skip NaN values | ||||
|         if state.missing_state: | ||||
|             return | ||||
|  | ||||
|         # Get the sensor name from the key mapping | ||||
|         sensor_name = key_to_sensor.get(state.key) | ||||
|         if not sensor_name or sensor_name not in sensor_states: | ||||
|             return | ||||
|  | ||||
|         sensor_states[sensor_name].append(state.state) | ||||
|  | ||||
|         # Check if we received the expected final value | ||||
|         # After publishing 10 values [1.0, 2.0, ..., 10.0], the window has the last 5: [2, 3, 4, 5, 6] | ||||
|         # Filters send at position 1 and position 6 (send_every=5 means every 5th value after first) | ||||
|         if ( | ||||
|             sensor_name == "min_sensor" | ||||
|             and state.state == pytest.approx(2.0) | ||||
|             and not min_received.done() | ||||
|         ): | ||||
|             min_received.set_result(True) | ||||
|         elif ( | ||||
|             sensor_name == "max_sensor" | ||||
|             and state.state == pytest.approx(6.0) | ||||
|             and not max_received.done() | ||||
|         ): | ||||
|             max_received.set_result(True) | ||||
|         elif ( | ||||
|             sensor_name == "median_sensor" | ||||
|             and state.state == pytest.approx(4.0) | ||||
|             and not median_received.done() | ||||
|         ): | ||||
|             # Median of [2, 3, 4, 5, 6] = 4 | ||||
|             median_received.set_result(True) | ||||
|         elif ( | ||||
|             sensor_name == "quantile_sensor" | ||||
|             and state.state == pytest.approx(6.0) | ||||
|             and not quantile_received.done() | ||||
|         ): | ||||
|             # 90th percentile of [2, 3, 4, 5, 6] = 6 | ||||
|             quantile_received.set_result(True) | ||||
|         elif ( | ||||
|             sensor_name == "moving_avg_sensor" | ||||
|             and state.state == pytest.approx(4.0) | ||||
|             and not moving_avg_received.done() | ||||
|         ): | ||||
|             # Average of [2, 3, 4, 5, 6] = 4 | ||||
|             moving_avg_received.set_result(True) | ||||
|  | ||||
|     async with ( | ||||
|         run_compiled(yaml_config), | ||||
|         api_client_connected() as client, | ||||
|     ): | ||||
|         # Get entities first to build key mapping | ||||
|         entities, services = await client.list_entities_services() | ||||
|  | ||||
|         # Build key-to-sensor mapping | ||||
|         key_to_sensor = build_key_to_entity_mapping( | ||||
|             entities, | ||||
|             [ | ||||
|                 "min_sensor", | ||||
|                 "max_sensor", | ||||
|                 "median_sensor", | ||||
|                 "quantile_sensor", | ||||
|                 "moving_avg_sensor", | ||||
|             ], | ||||
|         ) | ||||
|  | ||||
|         # Set up initial state helper with all entities | ||||
|         initial_state_helper = InitialStateHelper(entities) | ||||
|  | ||||
|         # Subscribe to state changes with wrapper | ||||
|         client.subscribe_states(initial_state_helper.on_state_wrapper(on_state)) | ||||
|  | ||||
|         # Wait for initial states to be sent before pressing button | ||||
|         try: | ||||
|             await initial_state_helper.wait_for_initial_states() | ||||
|         except TimeoutError: | ||||
|             pytest.fail("Timeout waiting for initial states") | ||||
|  | ||||
|         # Find the publish button | ||||
|         publish_button = next( | ||||
|             (e for e in entities if "publish_values_button" in e.object_id.lower()), | ||||
|             None, | ||||
|         ) | ||||
|         assert publish_button is not None, "Publish Values Button not found" | ||||
|  | ||||
|         # Press the button to publish test values | ||||
|         client.button_command(publish_button.key) | ||||
|  | ||||
|         # Wait for all sensors to receive their final values | ||||
|         try: | ||||
|             await asyncio.wait_for( | ||||
|                 asyncio.gather( | ||||
|                     min_received, | ||||
|                     max_received, | ||||
|                     median_received, | ||||
|                     quantile_received, | ||||
|                     moving_avg_received, | ||||
|                 ), | ||||
|                 timeout=10.0, | ||||
|             ) | ||||
|         except TimeoutError: | ||||
|             # Provide detailed failure info | ||||
|             pytest.fail( | ||||
|                 f"Timeout waiting for expected values. Received states:\n" | ||||
|                 f"  min: {sensor_states['min_sensor']}\n" | ||||
|                 f"  max: {sensor_states['max_sensor']}\n" | ||||
|                 f"  median: {sensor_states['median_sensor']}\n" | ||||
|                 f"  quantile: {sensor_states['quantile_sensor']}\n" | ||||
|                 f"  moving_avg: {sensor_states['moving_avg_sensor']}" | ||||
|             ) | ||||
|  | ||||
|         # Verify we got the expected values | ||||
|         # With batch_delay: 0ms, we should receive all outputs | ||||
|         # Filters output at positions 1 and 6 (send_every: 5) | ||||
|         assert len(sensor_states["min_sensor"]) == 2, ( | ||||
|             f"Min sensor should have 2 values, got {len(sensor_states['min_sensor'])}: {sensor_states['min_sensor']}" | ||||
|         ) | ||||
|         assert len(sensor_states["max_sensor"]) == 2, ( | ||||
|             f"Max sensor should have 2 values, got {len(sensor_states['max_sensor'])}: {sensor_states['max_sensor']}" | ||||
|         ) | ||||
|         assert len(sensor_states["median_sensor"]) == 2 | ||||
|         assert len(sensor_states["quantile_sensor"]) == 2 | ||||
|         assert len(sensor_states["moving_avg_sensor"]) == 2 | ||||
|  | ||||
|         # Verify the first output (after 1 value: [1]) | ||||
|         assert sensor_states["min_sensor"][0] == pytest.approx(1.0), ( | ||||
|             f"First min should be 1.0, got {sensor_states['min_sensor'][0]}" | ||||
|         ) | ||||
|         assert sensor_states["max_sensor"][0] == pytest.approx(1.0), ( | ||||
|             f"First max should be 1.0, got {sensor_states['max_sensor'][0]}" | ||||
|         ) | ||||
|         assert sensor_states["median_sensor"][0] == pytest.approx(1.0), ( | ||||
|             f"First median should be 1.0, got {sensor_states['median_sensor'][0]}" | ||||
|         ) | ||||
|         assert sensor_states["moving_avg_sensor"][0] == pytest.approx(1.0), ( | ||||
|             f"First moving avg should be 1.0, got {sensor_states['moving_avg_sensor'][0]}" | ||||
|         ) | ||||
|  | ||||
|         # Verify the second output (after 6 values, window has [2, 3, 4, 5, 6]) | ||||
|         assert sensor_states["min_sensor"][1] == pytest.approx(2.0), ( | ||||
|             f"Second min should be 2.0, got {sensor_states['min_sensor'][1]}" | ||||
|         ) | ||||
|         assert sensor_states["max_sensor"][1] == pytest.approx(6.0), ( | ||||
|             f"Second max should be 6.0, got {sensor_states['max_sensor'][1]}" | ||||
|         ) | ||||
|         assert sensor_states["median_sensor"][1] == pytest.approx(4.0), ( | ||||
|             f"Second median should be 4.0, got {sensor_states['median_sensor'][1]}" | ||||
|         ) | ||||
|         assert sensor_states["moving_avg_sensor"][1] == pytest.approx(4.0), ( | ||||
|             f"Second moving avg should be 4.0, got {sensor_states['moving_avg_sensor'][1]}" | ||||
|         ) | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_sensor_filters_nan_handling( | ||||
|     yaml_config: str, | ||||
|     run_compiled: RunCompiledFunction, | ||||
|     api_client_connected: APIClientConnectedFactory, | ||||
| ) -> None: | ||||
|     """Test that sliding window filters handle NaN values correctly.""" | ||||
|     loop = asyncio.get_running_loop() | ||||
|  | ||||
|     # Track states | ||||
|     min_states: list[float] = [] | ||||
|     max_states: list[float] = [] | ||||
|  | ||||
|     # Future to track completion | ||||
|     filters_completed = loop.create_future() | ||||
|  | ||||
|     def on_state(state: EntityState) -> None: | ||||
|         """Track sensor state updates.""" | ||||
|         if not isinstance(state, SensorState): | ||||
|             return | ||||
|  | ||||
|         # Skip NaN values | ||||
|         if state.missing_state: | ||||
|             return | ||||
|  | ||||
|         sensor_name = key_to_sensor.get(state.key) | ||||
|  | ||||
|         if sensor_name == "min_nan": | ||||
|             min_states.append(state.state) | ||||
|         elif sensor_name == "max_nan": | ||||
|             max_states.append(state.state) | ||||
|  | ||||
|         # Check if both have received their final values | ||||
|         # With batch_delay: 0ms, we should receive 2 outputs each | ||||
|         if ( | ||||
|             len(min_states) >= 2 | ||||
|             and len(max_states) >= 2 | ||||
|             and not filters_completed.done() | ||||
|         ): | ||||
|             filters_completed.set_result(True) | ||||
|  | ||||
|     async with ( | ||||
|         run_compiled(yaml_config), | ||||
|         api_client_connected() as client, | ||||
|     ): | ||||
|         # Get entities first to build key mapping | ||||
|         entities, services = await client.list_entities_services() | ||||
|  | ||||
|         # Build key-to-sensor mapping | ||||
|         key_to_sensor = build_key_to_entity_mapping(entities, ["min_nan", "max_nan"]) | ||||
|  | ||||
|         # Set up initial state helper with all entities | ||||
|         initial_state_helper = InitialStateHelper(entities) | ||||
|  | ||||
|         # Subscribe to state changes with wrapper | ||||
|         client.subscribe_states(initial_state_helper.on_state_wrapper(on_state)) | ||||
|  | ||||
|         # Wait for initial states | ||||
|         try: | ||||
|             await initial_state_helper.wait_for_initial_states() | ||||
|         except TimeoutError: | ||||
|             pytest.fail("Timeout waiting for initial states") | ||||
|  | ||||
|         # Find the publish button | ||||
|         publish_button = next( | ||||
|             (e for e in entities if "publish_nan_values_button" in e.object_id.lower()), | ||||
|             None, | ||||
|         ) | ||||
|         assert publish_button is not None, "Publish NaN Values Button not found" | ||||
|  | ||||
|         # Press the button | ||||
|         client.button_command(publish_button.key) | ||||
|  | ||||
|         # Wait for filters to process | ||||
|         try: | ||||
|             await asyncio.wait_for(filters_completed, timeout=10.0) | ||||
|         except TimeoutError: | ||||
|             pytest.fail( | ||||
|                 f"Timeout waiting for NaN handling. Received:\n" | ||||
|                 f"  min_states: {min_states}\n" | ||||
|                 f"  max_states: {max_states}" | ||||
|             ) | ||||
|  | ||||
|         # Verify NaN values were ignored | ||||
|         # With batch_delay: 0ms, we should receive both outputs (at positions 1 and 6) | ||||
|         # Position 1: window=[10], min=10, max=10 | ||||
|         # Position 6: window=[NaN, 5, NaN, 15, 8], ignoring NaN -> [5, 15, 8], min=5, max=15 | ||||
|         assert len(min_states) == 2, ( | ||||
|             f"Should have 2 min states, got {len(min_states)}: {min_states}" | ||||
|         ) | ||||
|         assert len(max_states) == 2, ( | ||||
|             f"Should have 2 max states, got {len(max_states)}: {max_states}" | ||||
|         ) | ||||
|  | ||||
|         # First output | ||||
|         assert min_states[0] == pytest.approx(10.0), ( | ||||
|             f"First min should be 10.0, got {min_states[0]}" | ||||
|         ) | ||||
|         assert max_states[0] == pytest.approx(10.0), ( | ||||
|             f"First max should be 10.0, got {max_states[0]}" | ||||
|         ) | ||||
|  | ||||
|         # Second output - verify NaN values were ignored | ||||
|         assert min_states[1] == pytest.approx(5.0), ( | ||||
|             f"Second min should ignore NaN and return 5.0, got {min_states[1]}" | ||||
|         ) | ||||
|         assert max_states[1] == pytest.approx(15.0), ( | ||||
|             f"Second max should ignore NaN and return 15.0, got {max_states[1]}" | ||||
|         ) | ||||
|  | ||||
|  | ||||
| @pytest.mark.asyncio | ||||
| async def test_sensor_filters_ring_buffer_wraparound( | ||||
|     yaml_config: str, | ||||
|     run_compiled: RunCompiledFunction, | ||||
|     api_client_connected: APIClientConnectedFactory, | ||||
| ) -> None: | ||||
|     """Test that ring buffer correctly wraps around when window fills up.""" | ||||
|     loop = asyncio.get_running_loop() | ||||
|  | ||||
|     min_states: list[float] = [] | ||||
|  | ||||
|     test_completed = loop.create_future() | ||||
|  | ||||
|     def on_state(state: EntityState) -> None: | ||||
|         """Track min sensor states.""" | ||||
|         if not isinstance(state, SensorState): | ||||
|             return | ||||
|  | ||||
|         # Skip NaN values | ||||
|         if state.missing_state: | ||||
|             return | ||||
|  | ||||
|         sensor_name = key_to_sensor.get(state.key) | ||||
|  | ||||
|         if sensor_name == "wraparound_min": | ||||
|             min_states.append(state.state) | ||||
|             # With batch_delay: 0ms, we should receive all 3 outputs | ||||
|             if len(min_states) >= 3 and not test_completed.done(): | ||||
|                 test_completed.set_result(True) | ||||
|  | ||||
|     async with ( | ||||
|         run_compiled(yaml_config), | ||||
|         api_client_connected() as client, | ||||
|     ): | ||||
|         # Get entities first to build key mapping | ||||
|         entities, services = await client.list_entities_services() | ||||
|  | ||||
|         # Build key-to-sensor mapping | ||||
|         key_to_sensor = build_key_to_entity_mapping(entities, ["wraparound_min"]) | ||||
|  | ||||
|         # Set up initial state helper with all entities | ||||
|         initial_state_helper = InitialStateHelper(entities) | ||||
|  | ||||
|         # Subscribe to state changes with wrapper | ||||
|         client.subscribe_states(initial_state_helper.on_state_wrapper(on_state)) | ||||
|  | ||||
|         # Wait for initial state | ||||
|         try: | ||||
|             await initial_state_helper.wait_for_initial_states() | ||||
|         except TimeoutError: | ||||
|             pytest.fail("Timeout waiting for initial state") | ||||
|  | ||||
|         # Find the publish button | ||||
|         publish_button = next( | ||||
|             (e for e in entities if "publish_wraparound_button" in e.object_id.lower()), | ||||
|             None, | ||||
|         ) | ||||
|         assert publish_button is not None, "Publish Wraparound Button not found" | ||||
|  | ||||
|         # Press the button | ||||
|         # Will publish: 10, 20, 30, 5, 25, 15, 40, 35, 20 | ||||
|         client.button_command(publish_button.key) | ||||
|  | ||||
|         # Wait for completion | ||||
|         try: | ||||
|             await asyncio.wait_for(test_completed, timeout=10.0) | ||||
|         except TimeoutError: | ||||
|             pytest.fail(f"Timeout waiting for wraparound test. Received: {min_states}") | ||||
|  | ||||
|         # Verify outputs | ||||
|         # With window_size=3, send_every=3, we get outputs at positions 1, 4, 7 | ||||
|         # Position 1: window=[10], min=10 | ||||
|         # Position 4: window=[20, 30, 5], min=5 | ||||
|         # Position 7: window=[15, 40, 35], min=15 | ||||
|         # With batch_delay: 0ms, we should receive all 3 outputs | ||||
|         assert len(min_states) == 3, ( | ||||
|             f"Should have 3 states, got {len(min_states)}: {min_states}" | ||||
|         ) | ||||
|         assert min_states[0] == pytest.approx(10.0), ( | ||||
|             f"First min should be 10.0, got {min_states[0]}" | ||||
|         ) | ||||
|         assert min_states[1] == pytest.approx(5.0), ( | ||||
|             f"Second min should be 5.0, got {min_states[1]}" | ||||
|         ) | ||||
|         assert min_states[2] == pytest.approx(15.0), ( | ||||
|             f"Third min should be 15.0, got {min_states[2]}" | ||||
|         ) | ||||
		Reference in New Issue
	
	Block a user