mirror of
				https://github.com/esphome/esphome.git
				synced 2025-10-26 04:33:47 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			214 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			214 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Integration test for duplicate entity handling with new validation."""
 | |
| 
 | |
| from __future__ import annotations
 | |
| 
 | |
| import asyncio
 | |
| 
 | |
| from aioesphomeapi import EntityInfo
 | |
| import pytest
 | |
| 
 | |
| from .types import APIClientConnectedFactory, RunCompiledFunction
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| async def test_duplicate_entities_not_allowed_on_different_devices(
 | |
|     yaml_config: str,
 | |
|     run_compiled: RunCompiledFunction,
 | |
|     api_client_connected: APIClientConnectedFactory,
 | |
| ) -> None:
 | |
|     """Test that duplicate entity names are NOT allowed on different devices."""
 | |
|     async with run_compiled(yaml_config), api_client_connected() as client:
 | |
|         # Get device info
 | |
|         device_info = await client.device_info()
 | |
|         assert device_info is not None
 | |
| 
 | |
|         # Get devices
 | |
|         devices = device_info.devices
 | |
|         assert len(devices) >= 3, f"Expected at least 3 devices, got {len(devices)}"
 | |
| 
 | |
|         # Find our test devices
 | |
|         controller_1 = next((d for d in devices if d.name == "Controller 1"), None)
 | |
|         controller_2 = next((d for d in devices if d.name == "Controller 2"), None)
 | |
|         controller_3 = next((d for d in devices if d.name == "Controller 3"), None)
 | |
| 
 | |
|         assert controller_1 is not None, "Controller 1 device not found"
 | |
|         assert controller_2 is not None, "Controller 2 device not found"
 | |
|         assert controller_3 is not None, "Controller 3 device not found"
 | |
| 
 | |
|         # Get entity list
 | |
|         entities = await client.list_entities_services()
 | |
|         all_entities: list[EntityInfo] = []
 | |
|         for entity_list in entities[0]:
 | |
|             all_entities.append(entity_list)
 | |
| 
 | |
|         # Group entities by type for easier testing
 | |
|         sensors = [e for e in all_entities if e.__class__.__name__ == "SensorInfo"]
 | |
|         binary_sensors = [
 | |
|             e for e in all_entities if e.__class__.__name__ == "BinarySensorInfo"
 | |
|         ]
 | |
|         text_sensors = [
 | |
|             e for e in all_entities if e.__class__.__name__ == "TextSensorInfo"
 | |
|         ]
 | |
|         switches = [e for e in all_entities if e.__class__.__name__ == "SwitchInfo"]
 | |
|         buttons = [e for e in all_entities if e.__class__.__name__ == "ButtonInfo"]
 | |
|         numbers = [e for e in all_entities if e.__class__.__name__ == "NumberInfo"]
 | |
|         selects = [e for e in all_entities if e.__class__.__name__ == "SelectInfo"]
 | |
| 
 | |
|         # Scenario 1: Check that temperature sensors have unique names per device
 | |
|         temp_sensors = [s for s in sensors if "Temperature" in s.name]
 | |
|         assert len(temp_sensors) == 4, (
 | |
|             f"Expected exactly 4 temperature sensors, got {len(temp_sensors)}"
 | |
|         )
 | |
| 
 | |
|         # Verify each sensor has a unique name
 | |
|         temp_names = set()
 | |
|         temp_object_ids = set()
 | |
| 
 | |
|         for sensor in temp_sensors:
 | |
|             temp_names.add(sensor.name)
 | |
|             temp_object_ids.add(sensor.object_id)
 | |
| 
 | |
|         # Should have 4 unique names
 | |
|         assert len(temp_names) == 4, (
 | |
|             f"Temperature sensors should have unique names, got {temp_names}"
 | |
|         )
 | |
| 
 | |
|         # Object IDs should also be unique
 | |
|         assert len(temp_object_ids) == 4, (
 | |
|             f"Temperature sensors should have unique object_ids, got {temp_object_ids}"
 | |
|         )
 | |
| 
 | |
|         # Scenario 2: Check binary sensors have unique names
 | |
|         status_binary = [b for b in binary_sensors if "Status" in b.name]
 | |
|         assert len(status_binary) == 3, (
 | |
|             f"Expected exactly 3 status binary sensors, got {len(status_binary)}"
 | |
|         )
 | |
| 
 | |
|         # All should have unique object_ids
 | |
|         status_names = set()
 | |
|         for binary in status_binary:
 | |
|             status_names.add(binary.name)
 | |
| 
 | |
|         assert len(status_names) == 3, (
 | |
|             f"Status binary sensors should have unique names, got {status_names}"
 | |
|         )
 | |
| 
 | |
|         # Scenario 3: Check that sensor and binary_sensor can have same name
 | |
|         temp_binary = [b for b in binary_sensors if b.name == "Temperature"]
 | |
|         assert len(temp_binary) == 1, (
 | |
|             f"Expected exactly 1 temperature binary sensor, got {len(temp_binary)}"
 | |
|         )
 | |
|         assert temp_binary[0].object_id == "temperature"
 | |
| 
 | |
|         # Scenario 4: Check text sensors have unique names
 | |
|         info_text = [t for t in text_sensors if "Device Info" in t.name]
 | |
|         assert len(info_text) == 3, (
 | |
|             f"Expected exactly 3 device info text sensors, got {len(info_text)}"
 | |
|         )
 | |
| 
 | |
|         # All should have unique names and object_ids
 | |
|         info_names = set()
 | |
|         for text in info_text:
 | |
|             info_names.add(text.name)
 | |
| 
 | |
|         assert len(info_names) == 3, (
 | |
|             f"Device info text sensors should have unique names, got {info_names}"
 | |
|         )
 | |
| 
 | |
|         # Scenario 5: Check switches have unique names
 | |
|         power_switches = [s for s in switches if "Power" in s.name]
 | |
|         assert len(power_switches) == 4, (
 | |
|             f"Expected exactly 4 power switches, got {len(power_switches)}"
 | |
|         )
 | |
| 
 | |
|         # All should have unique names
 | |
|         power_names = set()
 | |
|         for switch in power_switches:
 | |
|             power_names.add(switch.name)
 | |
| 
 | |
|         assert len(power_names) == 4, (
 | |
|             f"Power switches should have unique names, got {power_names}"
 | |
|         )
 | |
| 
 | |
|         # Scenario 6: Check reset buttons have unique names
 | |
|         reset_buttons = [b for b in buttons if "Reset" in b.name]
 | |
|         assert len(reset_buttons) == 3, (
 | |
|             f"Expected exactly 3 reset buttons, got {len(reset_buttons)}"
 | |
|         )
 | |
| 
 | |
|         # All should have unique names
 | |
|         reset_names = set()
 | |
|         for button in reset_buttons:
 | |
|             reset_names.add(button.name)
 | |
| 
 | |
|         assert len(reset_names) == 3, (
 | |
|             f"Reset buttons should have unique names, got {reset_names}"
 | |
|         )
 | |
| 
 | |
|         # Scenario 7: Check empty name selects (should use device names)
 | |
|         empty_selects = [s for s in selects if s.name == ""]
 | |
|         assert len(empty_selects) == 3, (
 | |
|             f"Expected exactly 3 empty name selects, got {len(empty_selects)}"
 | |
|         )
 | |
| 
 | |
|         # Group by device
 | |
|         c1_selects = [s for s in empty_selects if s.device_id == controller_1.device_id]
 | |
|         c2_selects = [s for s in empty_selects if s.device_id == controller_2.device_id]
 | |
| 
 | |
|         # For main device, device_id is 0
 | |
|         main_selects = [s for s in empty_selects if s.device_id == 0]
 | |
| 
 | |
|         # Check object IDs for empty name entities - they should use device names
 | |
|         assert len(c1_selects) == 1 and c1_selects[0].object_id == "controller_1"
 | |
|         assert len(c2_selects) == 1 and c2_selects[0].object_id == "controller_2"
 | |
|         assert (
 | |
|             len(main_selects) == 1
 | |
|             and main_selects[0].object_id == "duplicate-entities-test"
 | |
|         )
 | |
| 
 | |
|         # Scenario 8: Check special characters in number names - now unique
 | |
|         temp_numbers = [n for n in numbers if "Temperature Setpoint!" in n.name]
 | |
|         assert len(temp_numbers) == 2, (
 | |
|             f"Expected exactly 2 temperature setpoint numbers, got {len(temp_numbers)}"
 | |
|         )
 | |
| 
 | |
|         # Should have unique names
 | |
|         setpoint_names = set()
 | |
|         for number in temp_numbers:
 | |
|             setpoint_names.add(number.name)
 | |
| 
 | |
|         assert len(setpoint_names) == 2, (
 | |
|             f"Temperature setpoint numbers should have unique names, got {setpoint_names}"
 | |
|         )
 | |
| 
 | |
|         # Verify we can get states for all entities (ensures they're functional)
 | |
|         loop = asyncio.get_running_loop()
 | |
|         states_future: asyncio.Future[None] = loop.create_future()
 | |
|         state_count = 0
 | |
|         expected_count = (
 | |
|             len(sensors)
 | |
|             + len(binary_sensors)
 | |
|             + len(text_sensors)
 | |
|             + len(switches)
 | |
|             + len(buttons)
 | |
|             + len(numbers)
 | |
|             + len(selects)
 | |
|         )
 | |
| 
 | |
|         def on_state(state) -> None:
 | |
|             nonlocal state_count
 | |
|             state_count += 1
 | |
|             if state_count >= expected_count and not states_future.done():
 | |
|                 states_future.set_result(None)
 | |
| 
 | |
|         client.subscribe_states(on_state)
 | |
| 
 | |
|         # Wait for all entity states
 | |
|         try:
 | |
|             await asyncio.wait_for(states_future, timeout=10.0)
 | |
|         except asyncio.TimeoutError:
 | |
|             pytest.fail(
 | |
|                 f"Did not receive all entity states within 10 seconds. "
 | |
|                 f"Expected {expected_count}, received {state_count}"
 | |
|             )
 |