1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-24 04:33:49 +01:00

Merge branch 'min_filter_ring_buffer' into integration

This commit is contained in:
J. Nick Koston
2025-10-15 23:51:44 -10:00
6 changed files with 33 additions and 57 deletions

View File

@@ -0,0 +1,27 @@
"""Shared utilities for sensor integration tests."""
from __future__ import annotations
from aioesphomeapi import EntityInfo
def build_key_to_sensor_mapping(
entities: list[EntityInfo], sensor_names: list[str]
) -> dict[int, str]:
"""Build a mapping from entity keys to sensor names.
Args:
entities: List of entity info objects from the API
sensor_names: List of sensor names to search for in object_ids
Returns:
Dictionary mapping entity keys to sensor names
"""
key_to_sensor: dict[int, str] = {}
for entity in entities:
obj_id = entity.object_id.lower()
for sensor_name in sensor_names:
if sensor_name in obj_id:
key_to_sensor[entity.key] = sensor_name
break
return key_to_sensor

View File

@@ -4,34 +4,13 @@ from __future__ import annotations
import asyncio
from aioesphomeapi import EntityInfo, EntityState, SensorState
from aioesphomeapi import EntityState, SensorState
import pytest
from .sensor_test_utils import build_key_to_sensor_mapping
from .types import APIClientConnectedFactory, RunCompiledFunction
def build_key_to_sensor_mapping(
entities: list[EntityInfo], sensor_names: list[str]
) -> dict[int, str]:
"""Build a mapping from entity keys to sensor names.
Args:
entities: List of entity info objects from the API
sensor_names: List of sensor names to search for in object_ids
Returns:
Dictionary mapping entity keys to sensor names
"""
key_to_sensor: dict[int, str] = {}
for entity in entities:
obj_id = entity.object_id.lower()
for sensor_name in sensor_names:
if sensor_name in obj_id:
key_to_sensor[entity.key] = sensor_name
break
return key_to_sensor
@pytest.mark.asyncio
async def test_sensor_filters_ring_buffer(
yaml_config: str,

View File

@@ -4,34 +4,13 @@ from __future__ import annotations
import asyncio
from aioesphomeapi import EntityInfo, EntityState, SensorState
from aioesphomeapi import EntityState, SensorState
import pytest
from .sensor_test_utils import build_key_to_sensor_mapping
from .types import APIClientConnectedFactory, RunCompiledFunction
def build_key_to_sensor_mapping(
entities: list[EntityInfo], sensor_names: list[str]
) -> dict[int, str]:
"""Build a mapping from entity keys to sensor names.
Args:
entities: List of entity info objects from the API
sensor_names: List of sensor names to search for in object_ids
Returns:
Dictionary mapping entity keys to sensor names
"""
key_to_sensor: dict[int, str] = {}
for entity in entities:
obj_id = entity.object_id.lower()
for sensor_name in sensor_names:
if sensor_name in obj_id:
key_to_sensor[entity.key] = sensor_name
break
return key_to_sensor
@pytest.mark.asyncio
async def test_sensor_filters_sliding_window(
yaml_config: str,