mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-25 21:23:53 +01:00 
			
		
		
		
	Co-authored-by: pre-commit-ci-lite[bot] <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com>
		
			
				
	
	
		
			185 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			185 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Integration test for duplicate entity handling with new validation."""
 | |
| 
 | |
| from __future__ import annotations
 | |
| 
 | |
| import asyncio
 | |
| 
 | |
| from aioesphomeapi import EntityInfo
 | |
| import pytest
 | |
| 
 | |
| from .types import APIClientConnectedFactory, RunCompiledFunction
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| async def test_duplicate_entities_on_different_devices(
 | |
|     yaml_config: str,
 | |
|     run_compiled: RunCompiledFunction,
 | |
|     api_client_connected: APIClientConnectedFactory,
 | |
| ) -> None:
 | |
|     """Test that duplicate entity names are allowed on different devices."""
 | |
|     async with run_compiled(yaml_config), api_client_connected() as client:
 | |
|         # Get device info
 | |
|         device_info = await client.device_info()
 | |
|         assert device_info is not None
 | |
| 
 | |
|         # Get devices
 | |
|         devices = device_info.devices
 | |
|         assert len(devices) >= 3, f"Expected at least 3 devices, got {len(devices)}"
 | |
| 
 | |
|         # Find our test devices
 | |
|         controller_1 = next((d for d in devices if d.name == "Controller 1"), None)
 | |
|         controller_2 = next((d for d in devices if d.name == "Controller 2"), None)
 | |
|         controller_3 = next((d for d in devices if d.name == "Controller 3"), None)
 | |
| 
 | |
|         assert controller_1 is not None, "Controller 1 device not found"
 | |
|         assert controller_2 is not None, "Controller 2 device not found"
 | |
|         assert controller_3 is not None, "Controller 3 device not found"
 | |
| 
 | |
|         # Get entity list
 | |
|         entities = await client.list_entities_services()
 | |
|         all_entities: list[EntityInfo] = []
 | |
|         for entity_list in entities[0]:
 | |
|             all_entities.append(entity_list)
 | |
| 
 | |
|         # Group entities by type for easier testing
 | |
|         sensors = [e for e in all_entities if e.__class__.__name__ == "SensorInfo"]
 | |
|         binary_sensors = [
 | |
|             e for e in all_entities if e.__class__.__name__ == "BinarySensorInfo"
 | |
|         ]
 | |
|         text_sensors = [
 | |
|             e for e in all_entities if e.__class__.__name__ == "TextSensorInfo"
 | |
|         ]
 | |
|         switches = [e for e in all_entities if e.__class__.__name__ == "SwitchInfo"]
 | |
|         buttons = [e for e in all_entities if e.__class__.__name__ == "ButtonInfo"]
 | |
|         numbers = [e for e in all_entities if e.__class__.__name__ == "NumberInfo"]
 | |
| 
 | |
|         # Scenario 1: Check sensors with same "Temperature" name on different devices
 | |
|         temp_sensors = [s for s in sensors if s.name == "Temperature"]
 | |
|         assert len(temp_sensors) == 4, (
 | |
|             f"Expected exactly 4 temperature sensors, got {len(temp_sensors)}"
 | |
|         )
 | |
| 
 | |
|         # Verify each sensor is on a different device
 | |
|         temp_device_ids = set()
 | |
|         temp_object_ids = set()
 | |
| 
 | |
|         for sensor in temp_sensors:
 | |
|             temp_device_ids.add(sensor.device_id)
 | |
|             temp_object_ids.add(sensor.object_id)
 | |
| 
 | |
|             # All should have object_id "temperature" (no suffix)
 | |
|             assert sensor.object_id == "temperature", (
 | |
|                 f"Expected object_id 'temperature', got '{sensor.object_id}'"
 | |
|             )
 | |
| 
 | |
|         # Should have 4 different device IDs (including None for main device)
 | |
|         assert len(temp_device_ids) == 4, (
 | |
|             f"Temperature sensors should be on different devices, got {temp_device_ids}"
 | |
|         )
 | |
| 
 | |
|         # Scenario 2: Check binary sensors "Status" on different devices
 | |
|         status_binary = [b for b in binary_sensors if b.name == "Status"]
 | |
|         assert len(status_binary) == 3, (
 | |
|             f"Expected exactly 3 status binary sensors, got {len(status_binary)}"
 | |
|         )
 | |
| 
 | |
|         # All should have object_id "status"
 | |
|         for binary in status_binary:
 | |
|             assert binary.object_id == "status", (
 | |
|                 f"Expected object_id 'status', got '{binary.object_id}'"
 | |
|             )
 | |
| 
 | |
|         # Scenario 3: Check that sensor and binary_sensor can have same name
 | |
|         temp_binary = [b for b in binary_sensors if b.name == "Temperature"]
 | |
|         assert len(temp_binary) == 1, (
 | |
|             f"Expected exactly 1 temperature binary sensor, got {len(temp_binary)}"
 | |
|         )
 | |
|         assert temp_binary[0].object_id == "temperature"
 | |
| 
 | |
|         # Scenario 4: Check text sensors "Device Info" on different devices
 | |
|         info_text = [t for t in text_sensors if t.name == "Device Info"]
 | |
|         assert len(info_text) == 3, (
 | |
|             f"Expected exactly 3 device info text sensors, got {len(info_text)}"
 | |
|         )
 | |
| 
 | |
|         # All should have object_id "device_info"
 | |
|         for text in info_text:
 | |
|             assert text.object_id == "device_info", (
 | |
|                 f"Expected object_id 'device_info', got '{text.object_id}'"
 | |
|             )
 | |
| 
 | |
|         # Scenario 5: Check switches "Power" on different devices
 | |
|         power_switches = [s for s in switches if s.name == "Power"]
 | |
|         assert len(power_switches) == 3, (
 | |
|             f"Expected exactly 3 power switches, got {len(power_switches)}"
 | |
|         )
 | |
| 
 | |
|         # All should have object_id "power"
 | |
|         for switch in power_switches:
 | |
|             assert switch.object_id == "power", (
 | |
|                 f"Expected object_id 'power', got '{switch.object_id}'"
 | |
|             )
 | |
| 
 | |
|         # Scenario 6: Check empty name buttons (should use device name)
 | |
|         empty_buttons = [b for b in buttons if b.name == ""]
 | |
|         assert len(empty_buttons) == 3, (
 | |
|             f"Expected exactly 3 empty name buttons, got {len(empty_buttons)}"
 | |
|         )
 | |
| 
 | |
|         # Group by device
 | |
|         c1_buttons = [b for b in empty_buttons if b.device_id == controller_1.device_id]
 | |
|         c2_buttons = [b for b in empty_buttons if b.device_id == controller_2.device_id]
 | |
| 
 | |
|         # For main device, device_id is 0
 | |
|         main_buttons = [b for b in empty_buttons if b.device_id == 0]
 | |
| 
 | |
|         # Check object IDs for empty name entities
 | |
|         assert len(c1_buttons) == 1 and c1_buttons[0].object_id == "controller_1"
 | |
|         assert len(c2_buttons) == 1 and c2_buttons[0].object_id == "controller_2"
 | |
|         assert (
 | |
|             len(main_buttons) == 1
 | |
|             and main_buttons[0].object_id == "duplicate-entities-test"
 | |
|         )
 | |
| 
 | |
|         # Scenario 7: Check special characters in number names
 | |
|         temp_numbers = [n for n in numbers if n.name == "Temperature Setpoint!"]
 | |
|         assert len(temp_numbers) == 2, (
 | |
|             f"Expected exactly 2 temperature setpoint numbers, got {len(temp_numbers)}"
 | |
|         )
 | |
| 
 | |
|         # Special characters should be sanitized to _ in object_id
 | |
|         for number in temp_numbers:
 | |
|             assert number.object_id == "temperature_setpoint_", (
 | |
|                 f"Expected object_id 'temperature_setpoint_', got '{number.object_id}'"
 | |
|             )
 | |
| 
 | |
|         # Verify we can get states for all entities (ensures they're functional)
 | |
|         loop = asyncio.get_running_loop()
 | |
|         states_future: asyncio.Future[None] = loop.create_future()
 | |
|         state_count = 0
 | |
|         expected_count = (
 | |
|             len(sensors)
 | |
|             + len(binary_sensors)
 | |
|             + len(text_sensors)
 | |
|             + len(switches)
 | |
|             + len(buttons)
 | |
|             + len(numbers)
 | |
|         )
 | |
| 
 | |
|         def on_state(state) -> None:
 | |
|             nonlocal state_count
 | |
|             state_count += 1
 | |
|             if state_count >= expected_count and not states_future.done():
 | |
|                 states_future.set_result(None)
 | |
| 
 | |
|         client.subscribe_states(on_state)
 | |
| 
 | |
|         # Wait for all entity states
 | |
|         try:
 | |
|             await asyncio.wait_for(states_future, timeout=10.0)
 | |
|         except TimeoutError:
 | |
|             pytest.fail(
 | |
|                 f"Did not receive all entity states within 10 seconds. "
 | |
|                 f"Expected {expected_count}, received {state_count}"
 | |
|             )
 |