mirror of
https://github.com/esphome/esphome.git
synced 2025-10-24 12:43:51 +01:00
Add tests for FilterOutValueFilter and ThrottleWithPriorityFilter (#11408)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -99,3 +99,77 @@ sensor:
|
||||
window_size: 10
|
||||
send_every: 10
|
||||
send_first_at: 1 # Send after first value
|
||||
|
||||
# ValueListFilter-based filters tests
|
||||
# FilterOutValueFilter - single value
|
||||
- platform: copy
|
||||
source_id: source_sensor
|
||||
name: "Filter Out Single Value"
|
||||
filters:
|
||||
- filter_out: 42.0 # Should filter out exactly 42.0
|
||||
|
||||
# FilterOutValueFilter - multiple values
|
||||
- platform: copy
|
||||
source_id: source_sensor
|
||||
name: "Filter Out Multiple Values"
|
||||
filters:
|
||||
- filter_out: [0.0, 42.0, 100.0] # List of values to filter
|
||||
|
||||
# FilterOutValueFilter - with NaN
|
||||
- platform: copy
|
||||
source_id: source_sensor
|
||||
name: "Filter Out NaN"
|
||||
filters:
|
||||
- filter_out: nan # Filter out NaN values
|
||||
|
||||
# FilterOutValueFilter - mixed values with NaN
|
||||
- platform: copy
|
||||
source_id: source_sensor
|
||||
name: "Filter Out Mixed with NaN"
|
||||
filters:
|
||||
- filter_out: [nan, 0.0, 42.0]
|
||||
|
||||
# ThrottleWithPriorityFilter - single priority value
|
||||
- platform: copy
|
||||
source_id: source_sensor
|
||||
name: "Throttle with Single Priority"
|
||||
filters:
|
||||
- throttle_with_priority:
|
||||
timeout: 1000ms
|
||||
value: 42.0 # Priority value bypasses throttle
|
||||
|
||||
# ThrottleWithPriorityFilter - multiple priority values
|
||||
- platform: copy
|
||||
source_id: source_sensor
|
||||
name: "Throttle with Multiple Priorities"
|
||||
filters:
|
||||
- throttle_with_priority:
|
||||
timeout: 500ms
|
||||
value: [0.0, 42.0, 100.0] # Multiple priority values
|
||||
|
||||
# ThrottleWithPriorityFilter - with NaN priority
|
||||
- platform: copy
|
||||
source_id: source_sensor
|
||||
name: "Throttle with NaN Priority"
|
||||
filters:
|
||||
- throttle_with_priority:
|
||||
timeout: 1000ms
|
||||
value: nan # NaN as priority value
|
||||
|
||||
# Combined filters - FilterOutValueFilter + other filters
|
||||
- platform: copy
|
||||
source_id: source_sensor
|
||||
name: "Filter Out Then Throttle"
|
||||
filters:
|
||||
- filter_out: [0.0, 100.0]
|
||||
- throttle: 500ms
|
||||
|
||||
# Combined filters - ThrottleWithPriorityFilter + other filters
|
||||
- platform: copy
|
||||
source_id: source_sensor
|
||||
name: "Throttle Priority Then Scale"
|
||||
filters:
|
||||
- throttle_with_priority:
|
||||
timeout: 1000ms
|
||||
value: [42.0]
|
||||
- multiply: 2.0
|
||||
|
||||
332
tests/integration/fixtures/sensor_filters_value_list.yaml
Normal file
332
tests/integration/fixtures/sensor_filters_value_list.yaml
Normal file
@@ -0,0 +1,332 @@
|
||||
esphome:
|
||||
name: test-value-list-filters
|
||||
|
||||
host:
|
||||
api:
|
||||
batch_delay: 0ms # Disable batching to receive all state updates
|
||||
logger:
|
||||
level: DEBUG
|
||||
|
||||
# Template sensors - one for each test to avoid cross-test interference
|
||||
sensor:
|
||||
- platform: template
|
||||
name: "Source Sensor 1"
|
||||
id: source_sensor_1
|
||||
accuracy_decimals: 1
|
||||
|
||||
- platform: template
|
||||
name: "Source Sensor 2"
|
||||
id: source_sensor_2
|
||||
accuracy_decimals: 1
|
||||
|
||||
- platform: template
|
||||
name: "Source Sensor 3"
|
||||
id: source_sensor_3
|
||||
accuracy_decimals: 1
|
||||
|
||||
- platform: template
|
||||
name: "Source Sensor 4"
|
||||
id: source_sensor_4
|
||||
accuracy_decimals: 1
|
||||
|
||||
- platform: template
|
||||
name: "Source Sensor 5"
|
||||
id: source_sensor_5
|
||||
accuracy_decimals: 1
|
||||
|
||||
- platform: template
|
||||
name: "Source Sensor 6"
|
||||
id: source_sensor_6
|
||||
accuracy_decimals: 2
|
||||
|
||||
- platform: template
|
||||
name: "Source Sensor 7"
|
||||
id: source_sensor_7
|
||||
accuracy_decimals: 1
|
||||
|
||||
# FilterOutValueFilter - single value
|
||||
- platform: copy
|
||||
source_id: source_sensor_1
|
||||
name: "Filter Out Single"
|
||||
id: filter_out_single
|
||||
filters:
|
||||
- filter_out: 42.0
|
||||
|
||||
# FilterOutValueFilter - multiple values
|
||||
- platform: copy
|
||||
source_id: source_sensor_2
|
||||
name: "Filter Out Multiple"
|
||||
id: filter_out_multiple
|
||||
filters:
|
||||
- filter_out: [0.0, 42.0, 100.0]
|
||||
|
||||
# FilterOutValueFilter - with NaN
|
||||
- platform: copy
|
||||
source_id: source_sensor_1
|
||||
name: "Filter Out NaN"
|
||||
id: filter_out_nan
|
||||
filters:
|
||||
- filter_out: nan
|
||||
|
||||
# ThrottleWithPriorityFilter - single priority value
|
||||
- platform: copy
|
||||
source_id: source_sensor_3
|
||||
name: "Throttle Priority Single"
|
||||
id: throttle_priority_single
|
||||
filters:
|
||||
- throttle_with_priority:
|
||||
timeout: 200ms
|
||||
value: 42.0
|
||||
|
||||
# ThrottleWithPriorityFilter - multiple priority values
|
||||
- platform: copy
|
||||
source_id: source_sensor_4
|
||||
name: "Throttle Priority Multiple"
|
||||
id: throttle_priority_multiple
|
||||
filters:
|
||||
- throttle_with_priority:
|
||||
timeout: 200ms
|
||||
value: [0.0, 42.0, 100.0]
|
||||
|
||||
# Edge case: Filter Out NaN explicitly
|
||||
- platform: copy
|
||||
source_id: source_sensor_5
|
||||
name: "Filter Out NaN Test"
|
||||
id: filter_out_nan_test
|
||||
filters:
|
||||
- filter_out: nan
|
||||
|
||||
# Edge case: Accuracy decimals - 2 decimals
|
||||
- platform: copy
|
||||
source_id: source_sensor_6
|
||||
name: "Filter Out Accuracy 2"
|
||||
id: filter_out_accuracy_2
|
||||
filters:
|
||||
- filter_out: 42.0
|
||||
|
||||
# Edge case: Throttle with NaN priority
|
||||
- platform: copy
|
||||
source_id: source_sensor_7
|
||||
name: "Throttle Priority NaN"
|
||||
id: throttle_priority_nan
|
||||
filters:
|
||||
- throttle_with_priority:
|
||||
timeout: 200ms
|
||||
value: nan
|
||||
|
||||
# Script to test FilterOutValueFilter
|
||||
script:
|
||||
- id: test_filter_out_single
|
||||
then:
|
||||
# Should pass through: 1.0, 2.0, 3.0
|
||||
# Should filter out: 42.0
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_1
|
||||
state: 1.0
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_1
|
||||
state: 42.0 # Filtered out
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_1
|
||||
state: 2.0
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_1
|
||||
state: 42.0 # Filtered out
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_1
|
||||
state: 3.0
|
||||
|
||||
- id: test_filter_out_multiple
|
||||
then:
|
||||
# Should filter out: 0.0, 42.0, 100.0
|
||||
# Should pass through: 1.0, 2.0, 50.0
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_2
|
||||
state: 0.0 # Filtered out
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_2
|
||||
state: 1.0
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_2
|
||||
state: 42.0 # Filtered out
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_2
|
||||
state: 2.0
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_2
|
||||
state: 100.0 # Filtered out
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_2
|
||||
state: 50.0
|
||||
|
||||
- id: test_throttle_priority_single
|
||||
then:
|
||||
# 42.0 bypasses throttle, other values are throttled
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_3
|
||||
state: 1.0 # First value - passes
|
||||
- delay: 50ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_3
|
||||
state: 2.0 # Throttled
|
||||
- delay: 50ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_3
|
||||
state: 42.0 # Priority - passes immediately
|
||||
- delay: 50ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_3
|
||||
state: 3.0 # Throttled
|
||||
- delay: 250ms # Wait for throttle to expire
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_3
|
||||
state: 4.0 # Passes after timeout
|
||||
|
||||
- id: test_throttle_priority_multiple
|
||||
then:
|
||||
# 0.0, 42.0, 100.0 bypass throttle
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_4
|
||||
state: 1.0 # First value - passes
|
||||
- delay: 50ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_4
|
||||
state: 2.0 # Throttled
|
||||
- delay: 50ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_4
|
||||
state: 0.0 # Priority - passes
|
||||
- delay: 50ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_4
|
||||
state: 3.0 # Throttled
|
||||
- delay: 50ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_4
|
||||
state: 42.0 # Priority - passes
|
||||
- delay: 50ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_4
|
||||
state: 4.0 # Throttled
|
||||
- delay: 50ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_4
|
||||
state: 100.0 # Priority - passes
|
||||
|
||||
- id: test_filter_out_nan
|
||||
then:
|
||||
# NaN should be filtered out, regular values pass
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_5
|
||||
state: 1.0 # Pass
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_5
|
||||
state: !lambda 'return NAN;' # Filtered out
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_5
|
||||
state: 2.0 # Pass
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_5
|
||||
state: !lambda 'return NAN;' # Filtered out
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_5
|
||||
state: 3.0 # Pass
|
||||
|
||||
- id: test_filter_out_accuracy_2
|
||||
then:
|
||||
# With 2 decimal places, 42.00 filtered, 42.01 and 42.15 pass
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_6
|
||||
state: 42.0 # Filtered (rounds to 42.00)
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_6
|
||||
state: 42.01 # Pass (rounds to 42.01)
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_6
|
||||
state: 42.15 # Pass (rounds to 42.15)
|
||||
- delay: 20ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_6
|
||||
state: 42.0 # Filtered (rounds to 42.00)
|
||||
|
||||
- id: test_throttle_priority_nan
|
||||
then:
|
||||
# NaN bypasses throttle, regular values throttled
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_7
|
||||
state: 1.0 # First value - passes
|
||||
- delay: 50ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_7
|
||||
state: 2.0 # Throttled
|
||||
- delay: 50ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_7
|
||||
state: !lambda 'return NAN;' # Priority NaN - passes
|
||||
- delay: 50ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_7
|
||||
state: 3.0 # Throttled
|
||||
- delay: 50ms
|
||||
- sensor.template.publish:
|
||||
id: source_sensor_7
|
||||
state: !lambda 'return NAN;' # Priority NaN - passes
|
||||
|
||||
# Buttons to trigger each test
|
||||
button:
|
||||
- platform: template
|
||||
name: "Test Filter Out Single"
|
||||
id: btn_filter_out_single
|
||||
on_press:
|
||||
- script.execute: test_filter_out_single
|
||||
|
||||
- platform: template
|
||||
name: "Test Filter Out Multiple"
|
||||
id: btn_filter_out_multiple
|
||||
on_press:
|
||||
- script.execute: test_filter_out_multiple
|
||||
|
||||
- platform: template
|
||||
name: "Test Throttle Priority Single"
|
||||
id: btn_throttle_priority_single
|
||||
on_press:
|
||||
- script.execute: test_throttle_priority_single
|
||||
|
||||
- platform: template
|
||||
name: "Test Throttle Priority Multiple"
|
||||
id: btn_throttle_priority_multiple
|
||||
on_press:
|
||||
- script.execute: test_throttle_priority_multiple
|
||||
|
||||
- platform: template
|
||||
name: "Test Filter Out NaN"
|
||||
id: btn_filter_out_nan
|
||||
on_press:
|
||||
- script.execute: test_filter_out_nan
|
||||
|
||||
- platform: template
|
||||
name: "Test Filter Out Accuracy 2"
|
||||
id: btn_filter_out_accuracy_2
|
||||
on_press:
|
||||
- script.execute: test_filter_out_accuracy_2
|
||||
|
||||
- platform: template
|
||||
name: "Test Throttle Priority NaN"
|
||||
id: btn_throttle_priority_nan
|
||||
on_press:
|
||||
- script.execute: test_throttle_priority_nan
|
||||
263
tests/integration/test_sensor_filters_value_list.py
Normal file
263
tests/integration/test_sensor_filters_value_list.py
Normal file
@@ -0,0 +1,263 @@
|
||||
"""Test sensor ValueListFilter functionality (FilterOutValueFilter and ThrottleWithPriorityFilter)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import math
|
||||
|
||||
from aioesphomeapi import ButtonInfo, EntityState, SensorState
|
||||
import pytest
|
||||
|
||||
from .state_utils import InitialStateHelper, build_key_to_entity_mapping
|
||||
from .types import APIClientConnectedFactory, RunCompiledFunction
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sensor_filters_value_list(
|
||||
yaml_config: str,
|
||||
run_compiled: RunCompiledFunction,
|
||||
api_client_connected: APIClientConnectedFactory,
|
||||
) -> None:
|
||||
"""Test that ValueListFilter-based filters work correctly."""
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
# Track state changes for all sensors
|
||||
sensor_values: dict[str, list[float]] = {
|
||||
"filter_out_single": [],
|
||||
"filter_out_multiple": [],
|
||||
"throttle_priority_single": [],
|
||||
"throttle_priority_multiple": [],
|
||||
"filter_out_nan_test": [],
|
||||
"filter_out_accuracy_2": [],
|
||||
"throttle_priority_nan": [],
|
||||
}
|
||||
|
||||
# Futures for each test
|
||||
filter_out_single_done = loop.create_future()
|
||||
filter_out_multiple_done = loop.create_future()
|
||||
throttle_single_done = loop.create_future()
|
||||
throttle_multiple_done = loop.create_future()
|
||||
filter_out_nan_done = loop.create_future()
|
||||
filter_out_accuracy_2_done = loop.create_future()
|
||||
throttle_nan_done = loop.create_future()
|
||||
|
||||
def on_state(state: EntityState) -> None:
|
||||
"""Track sensor state updates."""
|
||||
if not isinstance(state, SensorState) or state.missing_state:
|
||||
return
|
||||
|
||||
sensor_name = key_to_sensor.get(state.key)
|
||||
if sensor_name not in sensor_values:
|
||||
return
|
||||
|
||||
sensor_values[sensor_name].append(state.state)
|
||||
|
||||
# Check completion conditions
|
||||
if (
|
||||
sensor_name == "filter_out_single"
|
||||
and len(sensor_values[sensor_name]) == 3
|
||||
and not filter_out_single_done.done()
|
||||
):
|
||||
filter_out_single_done.set_result(True)
|
||||
elif (
|
||||
sensor_name == "filter_out_multiple"
|
||||
and len(sensor_values[sensor_name]) == 3
|
||||
and not filter_out_multiple_done.done()
|
||||
):
|
||||
filter_out_multiple_done.set_result(True)
|
||||
elif (
|
||||
sensor_name == "throttle_priority_single"
|
||||
and len(sensor_values[sensor_name]) == 3
|
||||
and not throttle_single_done.done()
|
||||
):
|
||||
throttle_single_done.set_result(True)
|
||||
elif (
|
||||
sensor_name == "throttle_priority_multiple"
|
||||
and len(sensor_values[sensor_name]) == 4
|
||||
and not throttle_multiple_done.done()
|
||||
):
|
||||
throttle_multiple_done.set_result(True)
|
||||
elif (
|
||||
sensor_name == "filter_out_nan_test"
|
||||
and len(sensor_values[sensor_name]) == 3
|
||||
and not filter_out_nan_done.done()
|
||||
):
|
||||
filter_out_nan_done.set_result(True)
|
||||
elif (
|
||||
sensor_name == "filter_out_accuracy_2"
|
||||
and len(sensor_values[sensor_name]) == 2
|
||||
and not filter_out_accuracy_2_done.done()
|
||||
):
|
||||
filter_out_accuracy_2_done.set_result(True)
|
||||
elif (
|
||||
sensor_name == "throttle_priority_nan"
|
||||
and len(sensor_values[sensor_name]) == 3
|
||||
and not throttle_nan_done.done()
|
||||
):
|
||||
throttle_nan_done.set_result(True)
|
||||
|
||||
async with (
|
||||
run_compiled(yaml_config),
|
||||
api_client_connected() as client,
|
||||
):
|
||||
# Get entities and build key mapping
|
||||
entities, _ = await client.list_entities_services()
|
||||
key_to_sensor = build_key_to_entity_mapping(
|
||||
entities,
|
||||
{
|
||||
"filter_out_single": "Filter Out Single",
|
||||
"filter_out_multiple": "Filter Out Multiple",
|
||||
"throttle_priority_single": "Throttle Priority Single",
|
||||
"throttle_priority_multiple": "Throttle Priority Multiple",
|
||||
"filter_out_nan_test": "Filter Out NaN Test",
|
||||
"filter_out_accuracy_2": "Filter Out Accuracy 2",
|
||||
"throttle_priority_nan": "Throttle Priority NaN",
|
||||
},
|
||||
)
|
||||
|
||||
# Set up initial state helper with all entities
|
||||
initial_state_helper = InitialStateHelper(entities)
|
||||
|
||||
# Subscribe to state changes with wrapper
|
||||
client.subscribe_states(initial_state_helper.on_state_wrapper(on_state))
|
||||
|
||||
# Wait for initial states
|
||||
await initial_state_helper.wait_for_initial_states()
|
||||
|
||||
# Find all buttons
|
||||
button_name_map = {
|
||||
"Test Filter Out Single": "filter_out_single",
|
||||
"Test Filter Out Multiple": "filter_out_multiple",
|
||||
"Test Throttle Priority Single": "throttle_priority_single",
|
||||
"Test Throttle Priority Multiple": "throttle_priority_multiple",
|
||||
"Test Filter Out NaN": "filter_out_nan",
|
||||
"Test Filter Out Accuracy 2": "filter_out_accuracy_2",
|
||||
"Test Throttle Priority NaN": "throttle_priority_nan",
|
||||
}
|
||||
buttons = {}
|
||||
for entity in entities:
|
||||
if isinstance(entity, ButtonInfo) and entity.name in button_name_map:
|
||||
buttons[button_name_map[entity.name]] = entity.key
|
||||
|
||||
assert len(buttons) == 7, f"Expected 7 buttons, found {len(buttons)}"
|
||||
|
||||
# Test 1: FilterOutValueFilter - single value
|
||||
sensor_values["filter_out_single"].clear()
|
||||
client.button_command(buttons["filter_out_single"])
|
||||
try:
|
||||
await asyncio.wait_for(filter_out_single_done, timeout=2.0)
|
||||
except TimeoutError:
|
||||
pytest.fail(
|
||||
f"Test 1 timed out. Values: {sensor_values['filter_out_single']}"
|
||||
)
|
||||
|
||||
expected = [1.0, 2.0, 3.0]
|
||||
assert sensor_values["filter_out_single"] == pytest.approx(expected), (
|
||||
f"Test 1 failed: expected {expected}, got {sensor_values['filter_out_single']}"
|
||||
)
|
||||
|
||||
# Test 2: FilterOutValueFilter - multiple values
|
||||
sensor_values["filter_out_multiple"].clear()
|
||||
filter_out_multiple_done = loop.create_future()
|
||||
client.button_command(buttons["filter_out_multiple"])
|
||||
try:
|
||||
await asyncio.wait_for(filter_out_multiple_done, timeout=2.0)
|
||||
except TimeoutError:
|
||||
pytest.fail(
|
||||
f"Test 2 timed out. Values: {sensor_values['filter_out_multiple']}"
|
||||
)
|
||||
|
||||
expected = [1.0, 2.0, 50.0]
|
||||
assert sensor_values["filter_out_multiple"] == pytest.approx(expected), (
|
||||
f"Test 2 failed: expected {expected}, got {sensor_values['filter_out_multiple']}"
|
||||
)
|
||||
|
||||
# Test 3: ThrottleWithPriorityFilter - single priority
|
||||
sensor_values["throttle_priority_single"].clear()
|
||||
throttle_single_done = loop.create_future()
|
||||
client.button_command(buttons["throttle_priority_single"])
|
||||
try:
|
||||
await asyncio.wait_for(throttle_single_done, timeout=2.0)
|
||||
except TimeoutError:
|
||||
pytest.fail(
|
||||
f"Test 3 timed out. Values: {sensor_values['throttle_priority_single']}"
|
||||
)
|
||||
|
||||
expected = [1.0, 42.0, 4.0]
|
||||
assert sensor_values["throttle_priority_single"] == pytest.approx(expected), (
|
||||
f"Test 3 failed: expected {expected}, got {sensor_values['throttle_priority_single']}"
|
||||
)
|
||||
|
||||
# Test 4: ThrottleWithPriorityFilter - multiple priorities
|
||||
sensor_values["throttle_priority_multiple"].clear()
|
||||
throttle_multiple_done = loop.create_future()
|
||||
client.button_command(buttons["throttle_priority_multiple"])
|
||||
try:
|
||||
await asyncio.wait_for(throttle_multiple_done, timeout=2.0)
|
||||
except TimeoutError:
|
||||
pytest.fail(
|
||||
f"Test 4 timed out. Values: {sensor_values['throttle_priority_multiple']}"
|
||||
)
|
||||
|
||||
expected = [1.0, 0.0, 42.0, 100.0]
|
||||
assert sensor_values["throttle_priority_multiple"] == pytest.approx(expected), (
|
||||
f"Test 4 failed: expected {expected}, got {sensor_values['throttle_priority_multiple']}"
|
||||
)
|
||||
|
||||
# Test 5: FilterOutValueFilter - NaN handling
|
||||
sensor_values["filter_out_nan_test"].clear()
|
||||
filter_out_nan_done = loop.create_future()
|
||||
client.button_command(buttons["filter_out_nan"])
|
||||
try:
|
||||
await asyncio.wait_for(filter_out_nan_done, timeout=2.0)
|
||||
except TimeoutError:
|
||||
pytest.fail(
|
||||
f"Test 5 timed out. Values: {sensor_values['filter_out_nan_test']}"
|
||||
)
|
||||
|
||||
expected = [1.0, 2.0, 3.0]
|
||||
assert sensor_values["filter_out_nan_test"] == pytest.approx(expected), (
|
||||
f"Test 5 failed: expected {expected}, got {sensor_values['filter_out_nan_test']}"
|
||||
)
|
||||
|
||||
# Test 6: FilterOutValueFilter - Accuracy decimals (2)
|
||||
sensor_values["filter_out_accuracy_2"].clear()
|
||||
filter_out_accuracy_2_done = loop.create_future()
|
||||
client.button_command(buttons["filter_out_accuracy_2"])
|
||||
try:
|
||||
await asyncio.wait_for(filter_out_accuracy_2_done, timeout=2.0)
|
||||
except TimeoutError:
|
||||
pytest.fail(
|
||||
f"Test 6 timed out. Values: {sensor_values['filter_out_accuracy_2']}"
|
||||
)
|
||||
|
||||
expected = [42.01, 42.15]
|
||||
assert sensor_values["filter_out_accuracy_2"] == pytest.approx(expected), (
|
||||
f"Test 6 failed: expected {expected}, got {sensor_values['filter_out_accuracy_2']}"
|
||||
)
|
||||
|
||||
# Test 7: ThrottleWithPriorityFilter - NaN priority
|
||||
sensor_values["throttle_priority_nan"].clear()
|
||||
throttle_nan_done = loop.create_future()
|
||||
client.button_command(buttons["throttle_priority_nan"])
|
||||
try:
|
||||
await asyncio.wait_for(throttle_nan_done, timeout=2.0)
|
||||
except TimeoutError:
|
||||
pytest.fail(
|
||||
f"Test 7 timed out. Values: {sensor_values['throttle_priority_nan']}"
|
||||
)
|
||||
|
||||
# First value (1.0) + two NaN priority values
|
||||
# NaN values will be compared using math.isnan
|
||||
assert len(sensor_values["throttle_priority_nan"]) == 3, (
|
||||
f"Test 7 failed: expected 3 values, got {len(sensor_values['throttle_priority_nan'])}"
|
||||
)
|
||||
assert sensor_values["throttle_priority_nan"][0] == pytest.approx(1.0), (
|
||||
f"Test 7 failed: first value should be 1.0, got {sensor_values['throttle_priority_nan'][0]}"
|
||||
)
|
||||
assert math.isnan(sensor_values["throttle_priority_nan"][1]), (
|
||||
f"Test 7 failed: second value should be NaN, got {sensor_values['throttle_priority_nan'][1]}"
|
||||
)
|
||||
assert math.isnan(sensor_values["throttle_priority_nan"][2]), (
|
||||
f"Test 7 failed: third value should be NaN, got {sensor_values['throttle_priority_nan'][2]}"
|
||||
)
|
||||
Reference in New Issue
Block a user