From 9b6707c1c0b1a6e38bd1ff30d9a0621dd6aff7a9 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 15 Oct 2025 18:25:42 -1000 Subject: [PATCH] tests --- .../fixtures/sensor_filters_nan_handling.yaml | 50 +++ ...sensor_filters_ring_buffer_wraparound.yaml | 36 ++ .../sensor_filters_sliding_window.yaml | 78 ++++ .../test_sensor_filters_sliding_window.py | 385 ++++++++++++++++++ 4 files changed, 549 insertions(+) create mode 100644 tests/integration/fixtures/sensor_filters_nan_handling.yaml create mode 100644 tests/integration/fixtures/sensor_filters_ring_buffer_wraparound.yaml create mode 100644 tests/integration/fixtures/sensor_filters_sliding_window.yaml create mode 100644 tests/integration/test_sensor_filters_sliding_window.py diff --git a/tests/integration/fixtures/sensor_filters_nan_handling.yaml b/tests/integration/fixtures/sensor_filters_nan_handling.yaml new file mode 100644 index 0000000000..20fae64e8b --- /dev/null +++ b/tests/integration/fixtures/sensor_filters_nan_handling.yaml @@ -0,0 +1,50 @@ +esphome: + name: test-nan-handling + +host: +api: + batch_delay: 0ms # Disable batching to receive all state updates +logger: + level: DEBUG + +sensor: + - platform: template + name: "Source NaN Sensor" + id: source_nan_sensor + accuracy_decimals: 2 + + - platform: copy + source_id: source_nan_sensor + name: "Min NaN Sensor" + id: min_nan_sensor + filters: + - min: + window_size: 5 + send_every: 5 + + - platform: copy + source_id: source_nan_sensor + name: "Max NaN Sensor" + id: max_nan_sensor + filters: + - max: + window_size: 5 + send_every: 5 + +button: + - platform: template + name: "Publish NaN Values Button" + id: publish_nan_button + on_press: + - lambda: |- + // Publish 10 values with NaN mixed in: 10, NaN, 5, NaN, 15, 8, NaN, 12, 3, NaN + id(source_nan_sensor).publish_state(10.0); + id(source_nan_sensor).publish_state(NAN); + id(source_nan_sensor).publish_state(5.0); + id(source_nan_sensor).publish_state(NAN); + id(source_nan_sensor).publish_state(15.0); + id(source_nan_sensor).publish_state(8.0); + id(source_nan_sensor).publish_state(NAN); + id(source_nan_sensor).publish_state(12.0); + id(source_nan_sensor).publish_state(3.0); + id(source_nan_sensor).publish_state(NAN); diff --git a/tests/integration/fixtures/sensor_filters_ring_buffer_wraparound.yaml b/tests/integration/fixtures/sensor_filters_ring_buffer_wraparound.yaml new file mode 100644 index 0000000000..1ff9ec542a --- /dev/null +++ b/tests/integration/fixtures/sensor_filters_ring_buffer_wraparound.yaml @@ -0,0 +1,36 @@ +esphome: + name: test-ring-buffer-wraparound + +host: +api: + batch_delay: 0ms # Disable batching to receive all state updates +logger: + level: DEBUG + +sensor: + - platform: template + name: "Source Wraparound Sensor" + id: source_wraparound + accuracy_decimals: 2 + + - platform: copy + source_id: source_wraparound + name: "Wraparound Min Sensor" + id: wraparound_min_sensor + filters: + - min: + window_size: 3 + send_every: 3 + +button: + - platform: template + name: "Publish Wraparound Button" + id: publish_wraparound_button + on_press: + - lambda: |- + // Publish 9 values to test ring buffer wraparound + // Values: 10, 20, 30, 5, 25, 15, 40, 35, 20 + float values[] = {10.0, 20.0, 30.0, 5.0, 25.0, 15.0, 40.0, 35.0, 20.0}; + for (int i = 0; i < 9; i++) { + id(source_wraparound).publish_state(values[i]); + } diff --git a/tests/integration/fixtures/sensor_filters_sliding_window.yaml b/tests/integration/fixtures/sensor_filters_sliding_window.yaml new file mode 100644 index 0000000000..a2ae3182b8 --- /dev/null +++ b/tests/integration/fixtures/sensor_filters_sliding_window.yaml @@ -0,0 +1,78 @@ +esphome: + name: test-sliding-window-filters + +host: +api: + batch_delay: 0ms # Disable batching to receive all state updates +logger: + level: DEBUG + +# Template sensor that we'll use to publish values +sensor: + - platform: template + name: "Source Sensor" + id: source_sensor + accuracy_decimals: 2 + + # Min filter sensor + - platform: copy + source_id: source_sensor + name: "Min Sensor" + id: min_sensor + filters: + - min: + window_size: 5 + send_every: 5 + + # Max filter sensor + - platform: copy + source_id: source_sensor + name: "Max Sensor" + id: max_sensor + filters: + - max: + window_size: 5 + send_every: 5 + + # Median filter sensor + - platform: copy + source_id: source_sensor + name: "Median Sensor" + id: median_sensor + filters: + - median: + window_size: 5 + send_every: 5 + + # Quantile filter sensor (90th percentile) + - platform: copy + source_id: source_sensor + name: "Quantile Sensor" + id: quantile_sensor + filters: + - quantile: + window_size: 5 + send_every: 5 + quantile: 0.9 + + # Moving average filter sensor + - platform: copy + source_id: source_sensor + name: "Moving Avg Sensor" + id: moving_avg_sensor + filters: + - sliding_window_moving_average: + window_size: 5 + send_every: 5 + +# Button to trigger publishing test values +button: + - platform: template + name: "Publish Values Button" + id: publish_button + on_press: + - lambda: |- + // Publish 10 values: 1.0, 2.0, ..., 10.0 + for (int i = 1; i <= 10; i++) { + id(source_sensor).publish_state(float(i)); + } diff --git a/tests/integration/test_sensor_filters_sliding_window.py b/tests/integration/test_sensor_filters_sliding_window.py new file mode 100644 index 0000000000..943502c38f --- /dev/null +++ b/tests/integration/test_sensor_filters_sliding_window.py @@ -0,0 +1,385 @@ +"""Test sensor sliding window filter functionality.""" + +from __future__ import annotations + +import asyncio + +from aioesphomeapi import EntityInfo, EntityState, SensorState +import pytest + +from .types import APIClientConnectedFactory, RunCompiledFunction + + +def build_key_to_sensor_mapping( + entities: list[EntityInfo], sensor_names: list[str] +) -> dict[int, str]: + """Build a mapping from entity keys to sensor names. + + Args: + entities: List of entity info objects from the API + sensor_names: List of sensor names to search for in object_ids + + Returns: + Dictionary mapping entity keys to sensor names + """ + key_to_sensor: dict[int, str] = {} + for entity in entities: + obj_id = entity.object_id.lower() + for sensor_name in sensor_names: + if sensor_name in obj_id: + key_to_sensor[entity.key] = sensor_name + break + return key_to_sensor + + +@pytest.mark.asyncio +async def test_sensor_filters_sliding_window( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that sliding window filters (min, max, median, quantile, moving_average) work correctly.""" + loop = asyncio.get_running_loop() + + # Track state changes for each sensor + sensor_states: dict[str, list[float]] = { + "min_sensor": [], + "max_sensor": [], + "median_sensor": [], + "quantile_sensor": [], + "moving_avg_sensor": [], + } + + # Futures to track when we receive expected values + min_received = loop.create_future() + max_received = loop.create_future() + median_received = loop.create_future() + quantile_received = loop.create_future() + moving_avg_received = loop.create_future() + + def on_state(state: EntityState) -> None: + """Track sensor state updates.""" + if not isinstance(state, SensorState): + return + + # Skip NaN values (initial states) + if state.missing_state: + return + + # Get the sensor name from the key mapping + sensor_name = key_to_sensor.get(state.key) + if sensor_name and sensor_name in sensor_states: + sensor_states[sensor_name].append(state.state) + + # Check if we received the expected final value + # After publishing 10 values [1.0, 2.0, ..., 10.0], the window has the last 5: [2, 3, 4, 5, 6] + # Filters send at position 1 and position 6 (send_every=5 means every 5th value after first) + if ( + sensor_name == "min_sensor" + and abs(state.state - 2.0) < 0.01 + and not min_received.done() + ): + min_received.set_result(True) + elif ( + sensor_name == "max_sensor" + and abs(state.state - 6.0) < 0.01 + and not max_received.done() + ): + max_received.set_result(True) + elif ( + sensor_name == "median_sensor" + and abs(state.state - 4.0) < 0.01 + and not median_received.done() + ): + # Median of [2, 3, 4, 5, 6] = 4 + median_received.set_result(True) + elif ( + sensor_name == "quantile_sensor" + and abs(state.state - 6.0) < 0.01 + and not quantile_received.done() + ): + # 90th percentile of [2, 3, 4, 5, 6] = 6 + quantile_received.set_result(True) + elif ( + sensor_name == "moving_avg_sensor" + and abs(state.state - 4.0) < 0.01 + and not moving_avg_received.done() + ): + # Average of [2, 3, 4, 5, 6] = 4 + moving_avg_received.set_result(True) + + async with ( + run_compiled(yaml_config), + api_client_connected() as client, + ): + # Get entities first to build key mapping + entities, services = await client.list_entities_services() + + # Build key-to-sensor mapping + key_to_sensor = build_key_to_sensor_mapping( + entities, + [ + "min_sensor", + "max_sensor", + "median_sensor", + "quantile_sensor", + "moving_avg_sensor", + ], + ) + + # Subscribe to state changes AFTER building mapping + client.subscribe_states(on_state) + + # Find the publish button + publish_button = next( + (e for e in entities if "publish_values_button" in e.object_id.lower()), + None, + ) + assert publish_button is not None, "Publish Values Button not found" + + # Press the button to publish test values + client.button_command(publish_button.key) + + # Wait for all sensors to receive their final values + try: + await asyncio.wait_for( + asyncio.gather( + min_received, + max_received, + median_received, + quantile_received, + moving_avg_received, + ), + timeout=10.0, + ) + except TimeoutError: + # Provide detailed failure info + pytest.fail( + f"Timeout waiting for expected values. Received states:\n" + f" min: {sensor_states['min_sensor']}\n" + f" max: {sensor_states['max_sensor']}\n" + f" median: {sensor_states['median_sensor']}\n" + f" quantile: {sensor_states['quantile_sensor']}\n" + f" moving_avg: {sensor_states['moving_avg_sensor']}" + ) + + # Verify we got the expected values + # With batch_delay: 0ms, we should receive all outputs + # Filters output at positions 1 and 6 (send_every: 5) + assert len(sensor_states["min_sensor"]) == 2, ( + f"Min sensor should have 2 values, got {len(sensor_states['min_sensor'])}: {sensor_states['min_sensor']}" + ) + assert len(sensor_states["max_sensor"]) == 2, ( + f"Max sensor should have 2 values, got {len(sensor_states['max_sensor'])}: {sensor_states['max_sensor']}" + ) + assert len(sensor_states["median_sensor"]) == 2 + assert len(sensor_states["quantile_sensor"]) == 2 + assert len(sensor_states["moving_avg_sensor"]) == 2 + + # Verify the first output (after 1 value: [1]) + assert abs(sensor_states["min_sensor"][0] - 1.0) < 0.01, ( + f"First min should be 1.0, got {sensor_states['min_sensor'][0]}" + ) + assert abs(sensor_states["max_sensor"][0] - 1.0) < 0.01, ( + f"First max should be 1.0, got {sensor_states['max_sensor'][0]}" + ) + assert abs(sensor_states["median_sensor"][0] - 1.0) < 0.01, ( + f"First median should be 1.0, got {sensor_states['median_sensor'][0]}" + ) + assert abs(sensor_states["moving_avg_sensor"][0] - 1.0) < 0.01, ( + f"First moving avg should be 1.0, got {sensor_states['moving_avg_sensor'][0]}" + ) + + # Verify the second output (after 6 values, window has [2, 3, 4, 5, 6]) + assert abs(sensor_states["min_sensor"][1] - 2.0) < 0.01, ( + f"Second min should be 2.0, got {sensor_states['min_sensor'][1]}" + ) + assert abs(sensor_states["max_sensor"][1] - 6.0) < 0.01, ( + f"Second max should be 6.0, got {sensor_states['max_sensor'][1]}" + ) + assert abs(sensor_states["median_sensor"][1] - 4.0) < 0.01, ( + f"Second median should be 4.0, got {sensor_states['median_sensor'][1]}" + ) + assert abs(sensor_states["moving_avg_sensor"][1] - 4.0) < 0.01, ( + f"Second moving avg should be 4.0, got {sensor_states['moving_avg_sensor'][1]}" + ) + + +@pytest.mark.asyncio +async def test_sensor_filters_nan_handling( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that sliding window filters handle NaN values correctly.""" + loop = asyncio.get_running_loop() + + # Track states + min_states: list[float] = [] + max_states: list[float] = [] + + # Future to track completion + filters_completed = loop.create_future() + + def on_state(state: EntityState) -> None: + """Track sensor state updates.""" + if not isinstance(state, SensorState): + return + + # Skip NaN values (initial states) + if state.missing_state: + return + + sensor_name = key_to_sensor.get(state.key) + if sensor_name == "min_nan": + min_states.append(state.state) + elif sensor_name == "max_nan": + max_states.append(state.state) + + # Check if both have received their final values + # With batch_delay: 0ms, we should receive 2 outputs each + if ( + len(min_states) >= 2 + and len(max_states) >= 2 + and not filters_completed.done() + ): + filters_completed.set_result(True) + + async with ( + run_compiled(yaml_config), + api_client_connected() as client, + ): + # Get entities first to build key mapping + entities, services = await client.list_entities_services() + + # Build key-to-sensor mapping + key_to_sensor = build_key_to_sensor_mapping(entities, ["min_nan", "max_nan"]) + + # Subscribe to state changes AFTER building mapping + client.subscribe_states(on_state) + + # Find the publish button + publish_button = next( + (e for e in entities if "publish_nan_values_button" in e.object_id.lower()), + None, + ) + assert publish_button is not None, "Publish NaN Values Button not found" + + # Press the button + client.button_command(publish_button.key) + + # Wait for filters to process + try: + await asyncio.wait_for(filters_completed, timeout=10.0) + except TimeoutError: + pytest.fail( + f"Timeout waiting for NaN handling. Received:\n" + f" min_states: {min_states}\n" + f" max_states: {max_states}" + ) + + # Verify NaN values were ignored + # With batch_delay: 0ms, we should receive both outputs (at positions 1 and 6) + # Position 1: window=[10], min=10, max=10 + # Position 6: window=[NaN, 5, NaN, 15, 8], ignoring NaN -> [5, 15, 8], min=5, max=15 + assert len(min_states) == 2, ( + f"Should have 2 min states, got {len(min_states)}: {min_states}" + ) + assert len(max_states) == 2, ( + f"Should have 2 max states, got {len(max_states)}: {max_states}" + ) + + # First output + assert abs(min_states[0] - 10.0) < 0.01, ( + f"First min should be 10.0, got {min_states[0]}" + ) + assert abs(max_states[0] - 10.0) < 0.01, ( + f"First max should be 10.0, got {max_states[0]}" + ) + + # Second output - verify NaN values were ignored + assert abs(min_states[1] - 5.0) < 0.01, ( + f"Second min should ignore NaN and return 5.0, got {min_states[1]}" + ) + assert abs(max_states[1] - 15.0) < 0.01, ( + f"Second max should ignore NaN and return 15.0, got {max_states[1]}" + ) + + +@pytest.mark.asyncio +async def test_sensor_filters_ring_buffer_wraparound( + yaml_config: str, + run_compiled: RunCompiledFunction, + api_client_connected: APIClientConnectedFactory, +) -> None: + """Test that ring buffer correctly wraps around when window fills up.""" + loop = asyncio.get_running_loop() + + min_states: list[float] = [] + + test_completed = loop.create_future() + + def on_state(state: EntityState) -> None: + """Track min sensor states.""" + if not isinstance(state, SensorState): + return + + # Skip NaN values (initial states) + if state.missing_state: + return + + sensor_name = key_to_sensor.get(state.key) + if sensor_name == "wraparound_min": + min_states.append(state.state) + # With batch_delay: 0ms, we should receive all 3 outputs + if len(min_states) >= 3 and not test_completed.done(): + test_completed.set_result(True) + + async with ( + run_compiled(yaml_config), + api_client_connected() as client, + ): + # Get entities first to build key mapping + entities, services = await client.list_entities_services() + + # Build key-to-sensor mapping + key_to_sensor = build_key_to_sensor_mapping(entities, ["wraparound_min"]) + + # Subscribe to state changes AFTER building mapping + client.subscribe_states(on_state) + + # Find the publish button + publish_button = next( + (e for e in entities if "publish_wraparound_button" in e.object_id.lower()), + None, + ) + assert publish_button is not None, "Publish Wraparound Button not found" + + # Press the button + # Will publish: 10, 20, 30, 5, 25, 15, 40, 35, 20 + client.button_command(publish_button.key) + + # Wait for completion + try: + await asyncio.wait_for(test_completed, timeout=10.0) + except TimeoutError: + pytest.fail(f"Timeout waiting for wraparound test. Received: {min_states}") + + # Verify outputs + # With window_size=3, send_every=3, we get outputs at positions 1, 4, 7 + # Position 1: window=[10], min=10 + # Position 4: window=[20, 30, 5], min=5 + # Position 7: window=[15, 40, 35], min=15 + # With batch_delay: 0ms, we should receive all 3 outputs + assert len(min_states) == 3, ( + f"Should have 3 states, got {len(min_states)}: {min_states}" + ) + assert abs(min_states[0] - 10.0) < 0.01, ( + f"First min should be 10.0, got {min_states[0]}" + ) + assert abs(min_states[1] - 5.0) < 0.01, ( + f"Second min should be 5.0, got {min_states[1]}" + ) + assert abs(min_states[2] - 15.0) < 0.01, ( + f"Third min should be 15.0, got {min_states[2]}" + )