1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-21 11:13:46 +01:00
This commit is contained in:
J. Nick Koston
2025-10-15 19:58:18 -10:00
parent f75f11b550
commit 855df423ee

View File

@@ -0,0 +1,163 @@
"""Test sensor ring buffer filter functionality (window_size != send_every)."""
from __future__ import annotations
import asyncio
from aioesphomeapi import EntityInfo, EntityState, SensorState
import pytest
from .types import APIClientConnectedFactory, RunCompiledFunction
def build_key_to_sensor_mapping(
entities: list[EntityInfo], sensor_names: list[str]
) -> dict[int, str]:
"""Build a mapping from entity keys to sensor names.
Args:
entities: List of entity info objects from the API
sensor_names: List of sensor names to search for in object_ids
Returns:
Dictionary mapping entity keys to sensor names
"""
key_to_sensor: dict[int, str] = {}
for entity in entities:
obj_id = entity.object_id.lower()
for sensor_name in sensor_names:
if sensor_name in obj_id:
key_to_sensor[entity.key] = sensor_name
break
return key_to_sensor
@pytest.mark.asyncio
async def test_sensor_filters_ring_buffer(
yaml_config: str,
run_compiled: RunCompiledFunction,
api_client_connected: APIClientConnectedFactory,
) -> None:
"""Test that ring buffer filters (window_size != send_every) work correctly."""
loop = asyncio.get_running_loop()
# Track state changes for each sensor
sensor_states: dict[str, list[float]] = {
"sliding_min": [],
"sliding_max": [],
"sliding_median": [],
"sliding_moving_avg": [],
}
# Futures to track when we receive expected values
all_updates_received = loop.create_future()
def on_state(state: EntityState) -> None:
"""Track sensor state updates."""
if not isinstance(state, SensorState):
return
# Skip NaN values (initial states)
if state.missing_state:
return
# Get the sensor name from the key mapping
sensor_name = key_to_sensor.get(state.key)
if not sensor_name or sensor_name not in sensor_states:
return
sensor_states[sensor_name].append(state.state)
# Check if we've received enough updates from all sensors
# With send_every=2, send_first_at=1, we expect 5 outputs per sensor
if (
len(sensor_states["sliding_min"]) >= 5
and len(sensor_states["sliding_max"]) >= 5
and len(sensor_states["sliding_median"]) >= 5
and len(sensor_states["sliding_moving_avg"]) >= 5
and not all_updates_received.done()
):
all_updates_received.set_result(True)
async with (
run_compiled(yaml_config),
api_client_connected() as client,
):
# Get entities first to build key mapping
entities, services = await client.list_entities_services()
# Build key-to-sensor mapping
key_to_sensor = build_key_to_sensor_mapping(
entities,
[
"sliding_min",
"sliding_max",
"sliding_median",
"sliding_moving_avg",
],
)
# Subscribe to state changes AFTER building mapping
client.subscribe_states(on_state)
# Find the publish button
publish_button = next(
(e for e in entities if "publish_values_button" in e.object_id.lower()),
None,
)
assert publish_button is not None, "Publish Values Button not found"
# Press the button to publish test values
client.button_command(publish_button.key)
# Wait for all sensors to receive their values
try:
await asyncio.wait_for(all_updates_received, timeout=10.0)
except TimeoutError:
# Provide detailed failure info
pytest.fail(
f"Timeout waiting for updates. Received states:\n"
f" min: {sensor_states['sliding_min']}\n"
f" max: {sensor_states['sliding_max']}\n"
f" median: {sensor_states['sliding_median']}\n"
f" moving_avg: {sensor_states['sliding_moving_avg']}"
)
# Verify we got 5 outputs per sensor (positions 1, 3, 5, 7, 9)
assert len(sensor_states["sliding_min"]) == 5, (
f"Min sensor should have 5 values, got {len(sensor_states['sliding_min'])}: {sensor_states['sliding_min']}"
)
assert len(sensor_states["sliding_max"]) == 5
assert len(sensor_states["sliding_median"]) == 5
assert len(sensor_states["sliding_moving_avg"]) == 5
# Verify the values at each output position
# Position 1: window=[1]
assert abs(sensor_states["sliding_min"][0] - 1.0) < 0.01
assert abs(sensor_states["sliding_max"][0] - 1.0) < 0.01
assert abs(sensor_states["sliding_median"][0] - 1.0) < 0.01
assert abs(sensor_states["sliding_moving_avg"][0] - 1.0) < 0.01
# Position 3: window=[1,2,3]
assert abs(sensor_states["sliding_min"][1] - 1.0) < 0.01
assert abs(sensor_states["sliding_max"][1] - 3.0) < 0.01
assert abs(sensor_states["sliding_median"][1] - 2.0) < 0.01
assert abs(sensor_states["sliding_moving_avg"][1] - 2.0) < 0.01
# Position 5: window=[1,2,3,4,5]
assert abs(sensor_states["sliding_min"][2] - 1.0) < 0.01
assert abs(sensor_states["sliding_max"][2] - 5.0) < 0.01
assert abs(sensor_states["sliding_median"][2] - 3.0) < 0.01
assert abs(sensor_states["sliding_moving_avg"][2] - 3.0) < 0.01
# Position 7: window=[3,4,5,6,7] (ring buffer wrapped)
assert abs(sensor_states["sliding_min"][3] - 3.0) < 0.01
assert abs(sensor_states["sliding_max"][3] - 7.0) < 0.01
assert abs(sensor_states["sliding_median"][3] - 5.0) < 0.01
assert abs(sensor_states["sliding_moving_avg"][3] - 5.0) < 0.01
# Position 9: window=[5,6,7,8,9] (ring buffer wrapped)
assert abs(sensor_states["sliding_min"][4] - 5.0) < 0.01
assert abs(sensor_states["sliding_max"][4] - 9.0) < 0.01
assert abs(sensor_states["sliding_median"][4] - 7.0) < 0.01
assert abs(sensor_states["sliding_moving_avg"][4] - 7.0) < 0.01