1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-27 21:23:48 +00:00

Merge branch 'integration' into memory_api

This commit is contained in:
J. Nick Koston
2025-10-17 21:41:47 -10:00
9 changed files with 473 additions and 106 deletions

View File

@@ -65,32 +65,41 @@ optional<float> SlidingWindowFilter::new_value(float value) {
} }
// SortedWindowFilter // SortedWindowFilter
FixedVector<float> SortedWindowFilter::get_sorted_values_() { FixedVector<float> SortedWindowFilter::get_window_values_() {
// Copy window without NaN values using FixedVector (no heap allocation) // Copy window without NaN values using FixedVector (no heap allocation)
FixedVector<float> sorted_values; // Returns unsorted values - caller will use std::nth_element for partial sorting as needed
sorted_values.init(this->window_count_); FixedVector<float> values;
values.init(this->window_count_);
for (size_t i = 0; i < this->window_count_; i++) { for (size_t i = 0; i < this->window_count_; i++) {
float v = this->window_[i]; float v = this->window_[i];
if (!std::isnan(v)) { if (!std::isnan(v)) {
sorted_values.push_back(v); values.push_back(v);
} }
} }
std::sort(sorted_values.begin(), sorted_values.end()); return values;
return sorted_values;
} }
// MedianFilter // MedianFilter
float MedianFilter::compute_result() { float MedianFilter::compute_result() {
FixedVector<float> sorted_values = this->get_sorted_values_(); FixedVector<float> values = this->get_window_values_();
if (sorted_values.empty()) if (values.empty())
return NAN; return NAN;
size_t size = sorted_values.size(); size_t size = values.size();
size_t mid = size / 2;
if (size % 2) { if (size % 2) {
return sorted_values[size / 2]; // Odd number of elements - use nth_element to find middle element
} else { std::nth_element(values.begin(), values.begin() + mid, values.end());
return (sorted_values[size / 2] + sorted_values[(size / 2) - 1]) / 2.0f; return values[mid];
} }
// Even number of elements - need both middle elements
// Use nth_element to find upper middle element
std::nth_element(values.begin(), values.begin() + mid, values.end());
float upper = values[mid];
// Find the maximum of the lower half (which is now everything before mid)
float lower = *std::max_element(values.begin(), values.begin() + mid);
return (lower + upper) / 2.0f;
} }
// SkipInitialFilter // SkipInitialFilter
@@ -111,13 +120,16 @@ QuantileFilter::QuantileFilter(size_t window_size, size_t send_every, size_t sen
: SortedWindowFilter(window_size, send_every, send_first_at), quantile_(quantile) {} : SortedWindowFilter(window_size, send_every, send_first_at), quantile_(quantile) {}
float QuantileFilter::compute_result() { float QuantileFilter::compute_result() {
FixedVector<float> sorted_values = this->get_sorted_values_(); FixedVector<float> values = this->get_window_values_();
if (sorted_values.empty()) if (values.empty())
return NAN; return NAN;
size_t position = ceilf(sorted_values.size() * this->quantile_) - 1; size_t position = ceilf(values.size() * this->quantile_) - 1;
ESP_LOGVV(TAG, "QuantileFilter(%p)::position: %zu/%zu", this, position + 1, sorted_values.size()); ESP_LOGVV(TAG, "QuantileFilter(%p)::position: %zu/%zu", this, position + 1, values.size());
return sorted_values[position];
// Use nth_element to find the quantile element (O(n) instead of O(n log n))
std::nth_element(values.begin(), values.begin() + position, values.end());
return values[position];
} }
// MinFilter // MinFilter

View File

@@ -95,17 +95,17 @@ class MinMaxFilter : public SlidingWindowFilter {
/** Base class for filters that need a sorted window (Median, Quantile). /** Base class for filters that need a sorted window (Median, Quantile).
* *
* Extends SlidingWindowFilter to provide a helper that creates a sorted copy * Extends SlidingWindowFilter to provide a helper that filters out NaN values.
* of non-NaN values from the window. * Derived classes use std::nth_element for efficient partial sorting.
*/ */
class SortedWindowFilter : public SlidingWindowFilter { class SortedWindowFilter : public SlidingWindowFilter {
public: public:
using SlidingWindowFilter::SlidingWindowFilter; using SlidingWindowFilter::SlidingWindowFilter;
protected: protected:
/// Helper to get sorted non-NaN values from the window /// Helper to get non-NaN values from the window (not sorted - caller will use nth_element)
/// Returns empty FixedVector if all values are NaN /// Returns empty FixedVector if all values are NaN
FixedVector<float> get_sorted_values_(); FixedVector<float> get_window_values_();
}; };
/** Simple quantile filter. /** Simple quantile filter.

View File

@@ -0,0 +1,101 @@
sensor:
# Source sensor for testing filters
- platform: template
name: "Source Sensor"
id: source_sensor
lambda: return 42.0;
update_interval: 1s
# Streaming filters (window_size == send_every) - uses StreamingFilter base class
- platform: copy
source_id: source_sensor
name: "Streaming Min Filter"
filters:
- min:
window_size: 10
send_every: 10 # Batch window → StreamingMinFilter
- platform: copy
source_id: source_sensor
name: "Streaming Max Filter"
filters:
- max:
window_size: 10
send_every: 10 # Batch window → StreamingMaxFilter
- platform: copy
source_id: source_sensor
name: "Streaming Moving Average Filter"
filters:
- sliding_window_moving_average:
window_size: 10
send_every: 10 # Batch window → StreamingMovingAverageFilter
# Sliding window filters (window_size != send_every) - uses SlidingWindowFilter base class with ring buffer
- platform: copy
source_id: source_sensor
name: "Sliding Min Filter"
filters:
- min:
window_size: 10
send_every: 5 # Sliding window → MinFilter with ring buffer
- platform: copy
source_id: source_sensor
name: "Sliding Max Filter"
filters:
- max:
window_size: 10
send_every: 5 # Sliding window → MaxFilter with ring buffer
- platform: copy
source_id: source_sensor
name: "Sliding Median Filter"
filters:
- median:
window_size: 10
send_every: 5 # Sliding window → MedianFilter with ring buffer
- platform: copy
source_id: source_sensor
name: "Sliding Quantile Filter"
filters:
- quantile:
window_size: 10
send_every: 5
quantile: 0.9 # Sliding window → QuantileFilter with ring buffer
- platform: copy
source_id: source_sensor
name: "Sliding Moving Average Filter"
filters:
- sliding_window_moving_average:
window_size: 10
send_every: 5 # Sliding window → SlidingWindowMovingAverageFilter with ring buffer
# Edge cases
- platform: copy
source_id: source_sensor
name: "Large Batch Window Min"
filters:
- min:
window_size: 1000
send_every: 1000 # Large batch → StreamingMinFilter (4 bytes, not 4KB)
- platform: copy
source_id: source_sensor
name: "Small Sliding Window"
filters:
- median:
window_size: 3
send_every: 1 # Frequent output → MedianFilter with 3-element ring buffer
# send_first_at parameter test
- platform: copy
source_id: source_sensor
name: "Early Send Filter"
filters:
- max:
window_size: 10
send_every: 10
send_first_at: 1 # Send after first value

View File

@@ -0,0 +1 @@
<<: !include common.yaml

View File

@@ -7,6 +7,7 @@ This directory contains end-to-end integration tests for ESPHome, focusing on te
- `conftest.py` - Common fixtures and utilities - `conftest.py` - Common fixtures and utilities
- `const.py` - Constants used throughout the integration tests - `const.py` - Constants used throughout the integration tests
- `types.py` - Type definitions for fixtures and functions - `types.py` - Type definitions for fixtures and functions
- `state_utils.py` - State handling utilities (e.g., `InitialStateHelper`, `build_key_to_entity_mapping`)
- `fixtures/` - YAML configuration files for tests - `fixtures/` - YAML configuration files for tests
- `test_*.py` - Individual test files - `test_*.py` - Individual test files
@@ -26,6 +27,32 @@ The `yaml_config` fixture automatically loads YAML configurations based on the t
- `reserved_tcp_port` - Reserves a TCP port by holding the socket open until ESPHome needs it - `reserved_tcp_port` - Reserves a TCP port by holding the socket open until ESPHome needs it
- `unused_tcp_port` - Provides the reserved port number for each test - `unused_tcp_port` - Provides the reserved port number for each test
### Helper Utilities
#### InitialStateHelper (`state_utils.py`)
The `InitialStateHelper` class solves a common problem in integration tests: when an API client connects, ESPHome automatically broadcasts the current state of all entities. This can interfere with tests that want to track only new state changes triggered by test actions.
**What it does:**
- Tracks all entities (except stateless ones like buttons)
- Swallows the first state broadcast for each entity
- Forwards all subsequent state changes to your test callback
- Provides `wait_for_initial_states()` to synchronize before test actions
**When to use it:**
- Any test that triggers entity state changes and needs to verify them
- Tests that would otherwise see duplicate or unexpected states
- Tests that need clean separation between initial state and test-triggered changes
**Implementation details:**
- Uses `(device_id, key)` tuples to uniquely identify entities across devices
- Automatically excludes `ButtonInfo` entities (stateless)
- Provides debug logging to track state reception (use `--log-cli-level=DEBUG`)
- Safe for concurrent use with multiple entity types
**Future work:**
Consider converting existing integration tests to use `InitialStateHelper` for more reliable state tracking and to eliminate race conditions related to initial state broadcasts.
### Writing Tests ### Writing Tests
The simplest way to write a test is to use the `run_compiled` and `api_client_connected` fixtures: The simplest way to write a test is to use the `run_compiled` and `api_client_connected` fixtures:
@@ -125,6 +152,54 @@ async def test_my_sensor(
``` ```
##### State Subscription Pattern ##### State Subscription Pattern
**Recommended: Using InitialStateHelper**
When an API client connects, ESPHome automatically sends the current state of all entities. The `InitialStateHelper` (from `state_utils.py`) handles this by swallowing these initial states and only forwarding subsequent state changes to your test callback:
```python
from .state_utils import InitialStateHelper
# Track state changes with futures
loop = asyncio.get_running_loop()
states: dict[int, EntityState] = {}
state_future: asyncio.Future[EntityState] = loop.create_future()
def on_state(state: EntityState) -> None:
"""This callback only receives NEW state changes, not initial states."""
states[state.key] = state
# Check for specific condition using isinstance
if isinstance(state, SensorState) and state.state == expected_value:
if not state_future.done():
state_future.set_result(state)
# Get entities and set up state synchronization
entities, services = await client.list_entities_services()
initial_state_helper = InitialStateHelper(entities)
# Subscribe with the wrapper that filters initial states
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
# Wait for all initial states to be broadcast
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Now perform your test actions - on_state will only receive new changes
# ... trigger state changes ...
# Wait for expected state
try:
result = await asyncio.wait_for(state_future, timeout=5.0)
except asyncio.TimeoutError:
pytest.fail(f"Expected state not received. Got: {list(states.values())}")
```
**Legacy: Manual State Tracking**
If you need to handle initial states manually (not recommended for new tests):
```python ```python
# Track state changes with futures # Track state changes with futures
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()

View File

@@ -1,27 +0,0 @@
"""Shared utilities for sensor integration tests."""
from __future__ import annotations
from aioesphomeapi import EntityInfo
def build_key_to_sensor_mapping(
entities: list[EntityInfo], sensor_names: list[str]
) -> dict[int, str]:
"""Build a mapping from entity keys to sensor names.
Args:
entities: List of entity info objects from the API
sensor_names: List of sensor names to search for in object_ids
Returns:
Dictionary mapping entity keys to sensor names
"""
key_to_sensor: dict[int, str] = {}
for entity in entities:
obj_id = entity.object_id.lower()
for sensor_name in sensor_names:
if sensor_name in obj_id:
key_to_sensor[entity.key] = sensor_name
break
return key_to_sensor

View File

@@ -0,0 +1,167 @@
"""Shared utilities for ESPHome integration tests - state handling."""
from __future__ import annotations
import asyncio
import logging
from aioesphomeapi import ButtonInfo, EntityInfo, EntityState
_LOGGER = logging.getLogger(__name__)
def build_key_to_entity_mapping(
entities: list[EntityInfo], entity_names: list[str]
) -> dict[int, str]:
"""Build a mapping from entity keys to entity names.
Args:
entities: List of entity info objects from the API
entity_names: List of entity names to search for in object_ids
Returns:
Dictionary mapping entity keys to entity names
"""
key_to_entity: dict[int, str] = {}
for entity in entities:
obj_id = entity.object_id.lower()
for entity_name in entity_names:
if entity_name in obj_id:
key_to_entity[entity.key] = entity_name
break
return key_to_entity
class InitialStateHelper:
"""Helper to wait for initial states before processing test states.
When an API client connects, ESPHome sends the current state of all entities.
This helper wraps the user's state callback and swallows the first state for
each entity, then forwards all subsequent states to the user callback.
Usage:
entities, services = await client.list_entities_services()
helper = InitialStateHelper(entities)
client.subscribe_states(helper.on_state_wrapper(user_callback))
await helper.wait_for_initial_states()
"""
def __init__(self, entities: list[EntityInfo]) -> None:
"""Initialize the helper.
Args:
entities: All entities from list_entities_services()
"""
# Set of (device_id, key) tuples waiting for initial state
# Buttons are stateless, so exclude them
self._wait_initial_states = {
(entity.device_id, entity.key)
for entity in entities
if not isinstance(entity, ButtonInfo)
}
# Keep entity info for debugging - use (device_id, key) tuple
self._entities_by_id = {
(entity.device_id, entity.key): entity for entity in entities
}
# Log all entities
_LOGGER.debug(
"InitialStateHelper: Found %d total entities: %s",
len(entities),
[(type(e).__name__, e.object_id) for e in entities],
)
# Log which ones we're waiting for
_LOGGER.debug(
"InitialStateHelper: Waiting for %d entities (excluding ButtonInfo): %s",
len(self._wait_initial_states),
[self._entities_by_id[k].object_id for k in self._wait_initial_states],
)
# Log which ones we're NOT waiting for
not_waiting = {
(e.device_id, e.key) for e in entities
} - self._wait_initial_states
if not_waiting:
not_waiting_info = [
f"{type(self._entities_by_id[k]).__name__}:{self._entities_by_id[k].object_id}"
for k in not_waiting
]
_LOGGER.debug(
"InitialStateHelper: NOT waiting for %d entities: %s",
len(not_waiting),
not_waiting_info,
)
# Create future in the running event loop
self._initial_states_received = asyncio.get_running_loop().create_future()
# If no entities to wait for, mark complete immediately
if not self._wait_initial_states:
self._initial_states_received.set_result(True)
def on_state_wrapper(self, user_callback):
"""Wrap a user callback to track initial states.
Args:
user_callback: The user's state callback function
Returns:
Wrapped callback that swallows first state per entity, forwards rest
"""
def wrapper(state: EntityState) -> None:
"""Swallow initial state per entity, forward subsequent states."""
# Create entity identifier tuple
entity_id = (state.device_id, state.key)
# Log which entity is sending state
if entity_id in self._entities_by_id:
entity = self._entities_by_id[entity_id]
_LOGGER.debug(
"Received state for %s (type: %s, device_id: %s, key: %d)",
entity.object_id,
type(entity).__name__,
state.device_id,
state.key,
)
# If this entity is waiting for initial state
if entity_id in self._wait_initial_states:
# Remove from waiting set
self._wait_initial_states.discard(entity_id)
_LOGGER.debug(
"Swallowed initial state for %s, %d entities remaining",
self._entities_by_id[entity_id].object_id
if entity_id in self._entities_by_id
else entity_id,
len(self._wait_initial_states),
)
# Check if we've now seen all entities
if (
not self._wait_initial_states
and not self._initial_states_received.done()
):
_LOGGER.debug("All initial states received")
self._initial_states_received.set_result(True)
# Don't forward initial state to user
return
# Forward subsequent states to user callback
_LOGGER.debug("Forwarding state to user callback")
user_callback(state)
return wrapper
async def wait_for_initial_states(self, timeout: float = 5.0) -> None:
"""Wait for all initial states to be received.
Args:
timeout: Maximum time to wait in seconds
Raises:
asyncio.TimeoutError: If initial states aren't received within timeout
"""
await asyncio.wait_for(self._initial_states_received, timeout=timeout)

View File

@@ -7,7 +7,7 @@ import asyncio
from aioesphomeapi import EntityState, SensorState from aioesphomeapi import EntityState, SensorState
import pytest import pytest
from .sensor_test_utils import build_key_to_sensor_mapping from .state_utils import InitialStateHelper, build_key_to_entity_mapping
from .types import APIClientConnectedFactory, RunCompiledFunction from .types import APIClientConnectedFactory, RunCompiledFunction
@@ -36,7 +36,7 @@ async def test_sensor_filters_ring_buffer(
if not isinstance(state, SensorState): if not isinstance(state, SensorState):
return return
# Skip NaN values (initial states) # Skip NaN values
if state.missing_state: if state.missing_state:
return return
@@ -66,7 +66,7 @@ async def test_sensor_filters_ring_buffer(
entities, services = await client.list_entities_services() entities, services = await client.list_entities_services()
# Build key-to-sensor mapping # Build key-to-sensor mapping
key_to_sensor = build_key_to_sensor_mapping( key_to_sensor = build_key_to_entity_mapping(
entities, entities,
[ [
"sliding_min", "sliding_min",
@@ -76,8 +76,17 @@ async def test_sensor_filters_ring_buffer(
], ],
) )
# Subscribe to state changes AFTER building mapping # Set up initial state helper with all entities
client.subscribe_states(on_state) initial_state_helper = InitialStateHelper(entities)
# Subscribe to state changes with wrapper
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
# Wait for initial states to be sent before pressing button
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Find the publish button # Find the publish button
publish_button = next( publish_button = next(
@@ -112,31 +121,31 @@ async def test_sensor_filters_ring_buffer(
# Verify the values at each output position # Verify the values at each output position
# Position 1: window=[1] # Position 1: window=[1]
assert abs(sensor_states["sliding_min"][0] - 1.0) < 0.01 assert sensor_states["sliding_min"][0] == pytest.approx(1.0)
assert abs(sensor_states["sliding_max"][0] - 1.0) < 0.01 assert sensor_states["sliding_max"][0] == pytest.approx(1.0)
assert abs(sensor_states["sliding_median"][0] - 1.0) < 0.01 assert sensor_states["sliding_median"][0] == pytest.approx(1.0)
assert abs(sensor_states["sliding_moving_avg"][0] - 1.0) < 0.01 assert sensor_states["sliding_moving_avg"][0] == pytest.approx(1.0)
# Position 3: window=[1,2,3] # Position 3: window=[1,2,3]
assert abs(sensor_states["sliding_min"][1] - 1.0) < 0.01 assert sensor_states["sliding_min"][1] == pytest.approx(1.0)
assert abs(sensor_states["sliding_max"][1] - 3.0) < 0.01 assert sensor_states["sliding_max"][1] == pytest.approx(3.0)
assert abs(sensor_states["sliding_median"][1] - 2.0) < 0.01 assert sensor_states["sliding_median"][1] == pytest.approx(2.0)
assert abs(sensor_states["sliding_moving_avg"][1] - 2.0) < 0.01 assert sensor_states["sliding_moving_avg"][1] == pytest.approx(2.0)
# Position 5: window=[1,2,3,4,5] # Position 5: window=[1,2,3,4,5]
assert abs(sensor_states["sliding_min"][2] - 1.0) < 0.01 assert sensor_states["sliding_min"][2] == pytest.approx(1.0)
assert abs(sensor_states["sliding_max"][2] - 5.0) < 0.01 assert sensor_states["sliding_max"][2] == pytest.approx(5.0)
assert abs(sensor_states["sliding_median"][2] - 3.0) < 0.01 assert sensor_states["sliding_median"][2] == pytest.approx(3.0)
assert abs(sensor_states["sliding_moving_avg"][2] - 3.0) < 0.01 assert sensor_states["sliding_moving_avg"][2] == pytest.approx(3.0)
# Position 7: window=[3,4,5,6,7] (ring buffer wrapped) # Position 7: window=[3,4,5,6,7] (ring buffer wrapped)
assert abs(sensor_states["sliding_min"][3] - 3.0) < 0.01 assert sensor_states["sliding_min"][3] == pytest.approx(3.0)
assert abs(sensor_states["sliding_max"][3] - 7.0) < 0.01 assert sensor_states["sliding_max"][3] == pytest.approx(7.0)
assert abs(sensor_states["sliding_median"][3] - 5.0) < 0.01 assert sensor_states["sliding_median"][3] == pytest.approx(5.0)
assert abs(sensor_states["sliding_moving_avg"][3] - 5.0) < 0.01 assert sensor_states["sliding_moving_avg"][3] == pytest.approx(5.0)
# Position 9: window=[5,6,7,8,9] (ring buffer wrapped) # Position 9: window=[5,6,7,8,9] (ring buffer wrapped)
assert abs(sensor_states["sliding_min"][4] - 5.0) < 0.01 assert sensor_states["sliding_min"][4] == pytest.approx(5.0)
assert abs(sensor_states["sliding_max"][4] - 9.0) < 0.01 assert sensor_states["sliding_max"][4] == pytest.approx(9.0)
assert abs(sensor_states["sliding_median"][4] - 7.0) < 0.01 assert sensor_states["sliding_median"][4] == pytest.approx(7.0)
assert abs(sensor_states["sliding_moving_avg"][4] - 7.0) < 0.01 assert sensor_states["sliding_moving_avg"][4] == pytest.approx(7.0)

View File

@@ -7,7 +7,7 @@ import asyncio
from aioesphomeapi import EntityState, SensorState from aioesphomeapi import EntityState, SensorState
import pytest import pytest
from .sensor_test_utils import build_key_to_sensor_mapping from .state_utils import InitialStateHelper, build_key_to_entity_mapping
from .types import APIClientConnectedFactory, RunCompiledFunction from .types import APIClientConnectedFactory, RunCompiledFunction
@@ -41,7 +41,7 @@ async def test_sensor_filters_sliding_window(
if not isinstance(state, SensorState): if not isinstance(state, SensorState):
return return
# Skip NaN values (initial states) # Skip NaN values
if state.missing_state: if state.missing_state:
return return
@@ -57,33 +57,33 @@ async def test_sensor_filters_sliding_window(
# Filters send at position 1 and position 6 (send_every=5 means every 5th value after first) # Filters send at position 1 and position 6 (send_every=5 means every 5th value after first)
if ( if (
sensor_name == "min_sensor" sensor_name == "min_sensor"
and abs(state.state - 2.0) < 0.01 and state.state == pytest.approx(2.0)
and not min_received.done() and not min_received.done()
): ):
min_received.set_result(True) min_received.set_result(True)
elif ( elif (
sensor_name == "max_sensor" sensor_name == "max_sensor"
and abs(state.state - 6.0) < 0.01 and state.state == pytest.approx(6.0)
and not max_received.done() and not max_received.done()
): ):
max_received.set_result(True) max_received.set_result(True)
elif ( elif (
sensor_name == "median_sensor" sensor_name == "median_sensor"
and abs(state.state - 4.0) < 0.01 and state.state == pytest.approx(4.0)
and not median_received.done() and not median_received.done()
): ):
# Median of [2, 3, 4, 5, 6] = 4 # Median of [2, 3, 4, 5, 6] = 4
median_received.set_result(True) median_received.set_result(True)
elif ( elif (
sensor_name == "quantile_sensor" sensor_name == "quantile_sensor"
and abs(state.state - 6.0) < 0.01 and state.state == pytest.approx(6.0)
and not quantile_received.done() and not quantile_received.done()
): ):
# 90th percentile of [2, 3, 4, 5, 6] = 6 # 90th percentile of [2, 3, 4, 5, 6] = 6
quantile_received.set_result(True) quantile_received.set_result(True)
elif ( elif (
sensor_name == "moving_avg_sensor" sensor_name == "moving_avg_sensor"
and abs(state.state - 4.0) < 0.01 and state.state == pytest.approx(4.0)
and not moving_avg_received.done() and not moving_avg_received.done()
): ):
# Average of [2, 3, 4, 5, 6] = 4 # Average of [2, 3, 4, 5, 6] = 4
@@ -97,7 +97,7 @@ async def test_sensor_filters_sliding_window(
entities, services = await client.list_entities_services() entities, services = await client.list_entities_services()
# Build key-to-sensor mapping # Build key-to-sensor mapping
key_to_sensor = build_key_to_sensor_mapping( key_to_sensor = build_key_to_entity_mapping(
entities, entities,
[ [
"min_sensor", "min_sensor",
@@ -108,8 +108,17 @@ async def test_sensor_filters_sliding_window(
], ],
) )
# Subscribe to state changes AFTER building mapping # Set up initial state helper with all entities
client.subscribe_states(on_state) initial_state_helper = InitialStateHelper(entities)
# Subscribe to state changes with wrapper
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
# Wait for initial states to be sent before pressing button
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Find the publish button # Find the publish button
publish_button = next( publish_button = next(
@@ -158,30 +167,30 @@ async def test_sensor_filters_sliding_window(
assert len(sensor_states["moving_avg_sensor"]) == 2 assert len(sensor_states["moving_avg_sensor"]) == 2
# Verify the first output (after 1 value: [1]) # Verify the first output (after 1 value: [1])
assert abs(sensor_states["min_sensor"][0] - 1.0) < 0.01, ( assert sensor_states["min_sensor"][0] == pytest.approx(1.0), (
f"First min should be 1.0, got {sensor_states['min_sensor'][0]}" f"First min should be 1.0, got {sensor_states['min_sensor'][0]}"
) )
assert abs(sensor_states["max_sensor"][0] - 1.0) < 0.01, ( assert sensor_states["max_sensor"][0] == pytest.approx(1.0), (
f"First max should be 1.0, got {sensor_states['max_sensor'][0]}" f"First max should be 1.0, got {sensor_states['max_sensor'][0]}"
) )
assert abs(sensor_states["median_sensor"][0] - 1.0) < 0.01, ( assert sensor_states["median_sensor"][0] == pytest.approx(1.0), (
f"First median should be 1.0, got {sensor_states['median_sensor'][0]}" f"First median should be 1.0, got {sensor_states['median_sensor'][0]}"
) )
assert abs(sensor_states["moving_avg_sensor"][0] - 1.0) < 0.01, ( assert sensor_states["moving_avg_sensor"][0] == pytest.approx(1.0), (
f"First moving avg should be 1.0, got {sensor_states['moving_avg_sensor'][0]}" f"First moving avg should be 1.0, got {sensor_states['moving_avg_sensor'][0]}"
) )
# Verify the second output (after 6 values, window has [2, 3, 4, 5, 6]) # Verify the second output (after 6 values, window has [2, 3, 4, 5, 6])
assert abs(sensor_states["min_sensor"][1] - 2.0) < 0.01, ( assert sensor_states["min_sensor"][1] == pytest.approx(2.0), (
f"Second min should be 2.0, got {sensor_states['min_sensor'][1]}" f"Second min should be 2.0, got {sensor_states['min_sensor'][1]}"
) )
assert abs(sensor_states["max_sensor"][1] - 6.0) < 0.01, ( assert sensor_states["max_sensor"][1] == pytest.approx(6.0), (
f"Second max should be 6.0, got {sensor_states['max_sensor'][1]}" f"Second max should be 6.0, got {sensor_states['max_sensor'][1]}"
) )
assert abs(sensor_states["median_sensor"][1] - 4.0) < 0.01, ( assert sensor_states["median_sensor"][1] == pytest.approx(4.0), (
f"Second median should be 4.0, got {sensor_states['median_sensor'][1]}" f"Second median should be 4.0, got {sensor_states['median_sensor'][1]}"
) )
assert abs(sensor_states["moving_avg_sensor"][1] - 4.0) < 0.01, ( assert sensor_states["moving_avg_sensor"][1] == pytest.approx(4.0), (
f"Second moving avg should be 4.0, got {sensor_states['moving_avg_sensor'][1]}" f"Second moving avg should be 4.0, got {sensor_states['moving_avg_sensor'][1]}"
) )
@@ -207,11 +216,12 @@ async def test_sensor_filters_nan_handling(
if not isinstance(state, SensorState): if not isinstance(state, SensorState):
return return
# Skip NaN values (initial states) # Skip NaN values
if state.missing_state: if state.missing_state:
return return
sensor_name = key_to_sensor.get(state.key) sensor_name = key_to_sensor.get(state.key)
if sensor_name == "min_nan": if sensor_name == "min_nan":
min_states.append(state.state) min_states.append(state.state)
elif sensor_name == "max_nan": elif sensor_name == "max_nan":
@@ -234,10 +244,19 @@ async def test_sensor_filters_nan_handling(
entities, services = await client.list_entities_services() entities, services = await client.list_entities_services()
# Build key-to-sensor mapping # Build key-to-sensor mapping
key_to_sensor = build_key_to_sensor_mapping(entities, ["min_nan", "max_nan"]) key_to_sensor = build_key_to_entity_mapping(entities, ["min_nan", "max_nan"])
# Subscribe to state changes AFTER building mapping # Set up initial state helper with all entities
client.subscribe_states(on_state) initial_state_helper = InitialStateHelper(entities)
# Subscribe to state changes with wrapper
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
# Wait for initial states
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial states")
# Find the publish button # Find the publish button
publish_button = next( publish_button = next(
@@ -271,18 +290,18 @@ async def test_sensor_filters_nan_handling(
) )
# First output # First output
assert abs(min_states[0] - 10.0) < 0.01, ( assert min_states[0] == pytest.approx(10.0), (
f"First min should be 10.0, got {min_states[0]}" f"First min should be 10.0, got {min_states[0]}"
) )
assert abs(max_states[0] - 10.0) < 0.01, ( assert max_states[0] == pytest.approx(10.0), (
f"First max should be 10.0, got {max_states[0]}" f"First max should be 10.0, got {max_states[0]}"
) )
# Second output - verify NaN values were ignored # Second output - verify NaN values were ignored
assert abs(min_states[1] - 5.0) < 0.01, ( assert min_states[1] == pytest.approx(5.0), (
f"Second min should ignore NaN and return 5.0, got {min_states[1]}" f"Second min should ignore NaN and return 5.0, got {min_states[1]}"
) )
assert abs(max_states[1] - 15.0) < 0.01, ( assert max_states[1] == pytest.approx(15.0), (
f"Second max should ignore NaN and return 15.0, got {max_states[1]}" f"Second max should ignore NaN and return 15.0, got {max_states[1]}"
) )
@@ -305,11 +324,12 @@ async def test_sensor_filters_ring_buffer_wraparound(
if not isinstance(state, SensorState): if not isinstance(state, SensorState):
return return
# Skip NaN values (initial states) # Skip NaN values
if state.missing_state: if state.missing_state:
return return
sensor_name = key_to_sensor.get(state.key) sensor_name = key_to_sensor.get(state.key)
if sensor_name == "wraparound_min": if sensor_name == "wraparound_min":
min_states.append(state.state) min_states.append(state.state)
# With batch_delay: 0ms, we should receive all 3 outputs # With batch_delay: 0ms, we should receive all 3 outputs
@@ -324,10 +344,19 @@ async def test_sensor_filters_ring_buffer_wraparound(
entities, services = await client.list_entities_services() entities, services = await client.list_entities_services()
# Build key-to-sensor mapping # Build key-to-sensor mapping
key_to_sensor = build_key_to_sensor_mapping(entities, ["wraparound_min"]) key_to_sensor = build_key_to_entity_mapping(entities, ["wraparound_min"])
# Subscribe to state changes AFTER building mapping # Set up initial state helper with all entities
client.subscribe_states(on_state) initial_state_helper = InitialStateHelper(entities)
# Subscribe to state changes with wrapper
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
# Wait for initial state
try:
await initial_state_helper.wait_for_initial_states()
except TimeoutError:
pytest.fail("Timeout waiting for initial state")
# Find the publish button # Find the publish button
publish_button = next( publish_button = next(
@@ -355,12 +384,12 @@ async def test_sensor_filters_ring_buffer_wraparound(
assert len(min_states) == 3, ( assert len(min_states) == 3, (
f"Should have 3 states, got {len(min_states)}: {min_states}" f"Should have 3 states, got {len(min_states)}: {min_states}"
) )
assert abs(min_states[0] - 10.0) < 0.01, ( assert min_states[0] == pytest.approx(10.0), (
f"First min should be 10.0, got {min_states[0]}" f"First min should be 10.0, got {min_states[0]}"
) )
assert abs(min_states[1] - 5.0) < 0.01, ( assert min_states[1] == pytest.approx(5.0), (
f"Second min should be 5.0, got {min_states[1]}" f"Second min should be 5.0, got {min_states[1]}"
) )
assert abs(min_states[2] - 15.0) < 0.01, ( assert min_states[2] == pytest.approx(15.0), (
f"Third min should be 15.0, got {min_states[2]}" f"Third min should be 15.0, got {min_states[2]}"
) )