diff --git a/esphome/core/config.py b/esphome/core/config.py index 45ba214e44..4d28a81229 100644 --- a/esphome/core/config.py +++ b/esphome/core/config.py @@ -108,6 +108,50 @@ def validate_hostname(config): return config +def validate_id_hash_collisions(config: dict) -> dict: + """Validate that there are no hash collisions between IDs of the same type.""" + from esphome.helpers import fnv1a_32bit_hash + + # Check area hash collisions + area_hashes: dict[int, str] = {} + + # Check main area if present + if CONF_AREA in config: + area_id: core.ID = config[CONF_AREA][CONF_ID] + if area_id.id: + area_hash = fnv1a_32bit_hash(area_id.id) + area_hashes[area_hash] = area_id.id + + # Check areas list + for area in config.get(CONF_AREAS, []): + area_id: core.ID = area[CONF_ID] + if area_id.id: + area_hash = fnv1a_32bit_hash(area_id.id) + if area_hash in area_hashes: + raise cv.Invalid( + f"Area ID '{area_id.id}' with hash {area_hash} collides with " + f"existing area ID '{area_hashes[area_hash]}'", + path=[CONF_AREAS, area_id.id], + ) + area_hashes[area_hash] = area_id.id + + # Check device hash collisions + device_hashes: dict[int, str] = {} + for device in config.get(CONF_DEVICES, []): + device_id: core.ID = device[CONF_ID] + if device_id.id: + device_hash = fnv1a_32bit_hash(device_id.id) + if device_hash in device_hashes: + raise cv.Invalid( + f"Device ID '{device_id.id}' with hash {device_hash} collides with " + f"existing device ID '{device_hashes[device_hash]}'", + path=[CONF_DEVICES, device_id.id], + ) + device_hashes[device_hash] = device_id.id + + return config + + def valid_include(value): # Look for "<...>" includes if value.startswith("<") and value.endswith(">"): @@ -232,6 +276,7 @@ CONFIG_SCHEMA = cv.All( } ), validate_hostname, + validate_id_hash_collisions, ) PRELOAD_CONFIG_SCHEMA = cv.Schema( @@ -397,17 +442,6 @@ async def _add_platform_reserves() -> None: cg.add(cg.RawStatement(f"App.reserve_{platform_name}({count});"), prepend=True) -def _verify_no_collisions( - hashes: dict[int, str], id: str, id_hash: int, conf_key: str -) -> None: - """Verify that the given id and name do not collide with existing ones.""" - if id_hash in hashes: - raise vol.Invalid( - f"ID '{id}' with hash {id_hash} collides with existing ID '{hashes[id_hash]}'", - path=[conf_key], - ) - - @coroutine_with_priority(100.0) async def to_code(config: ConfigType) -> None: cg.add_global(cg.global_ns.namespace("esphome").using) @@ -493,9 +527,7 @@ async def to_code(config: ConfigType) -> None: # Helper function to process an area configuration def process_area( area_conf: dict[str, str | core.ID], - area_hashes: dict[int, str], area_ids: set[str], - conf_path: str | None = None, ) -> None: """Process and register an area configuration.""" area_id: core.ID = area_conf[CONF_ID] @@ -503,10 +535,6 @@ async def to_code(config: ConfigType) -> None: area_id_hash = fnv1a_32bit_hash(area_id_str) area_name: str = area_conf[CONF_NAME] - if conf_path: # Only verify collisions for areas from CONF_AREAS list - _verify_no_collisions(area_hashes, area_id, area_id_hash, conf_path) - - area_hashes[area_id_hash] = area_name area_ids.add(area_id_str) area_var = cg.new_Pvariable(area_id) @@ -515,27 +543,25 @@ async def to_code(config: ConfigType) -> None: cg.add(cg.App.register_area(area_var)) # Initialize tracking structures - area_hashes: dict[int, str] = {} area_ids: set[str] = set() - device_hashes: dict[int, str] = {} # Collect all areas to process - all_areas: list[tuple[dict[str, str | core.ID], str | None]] = [] + all_areas: list[dict[str, str | core.ID]] = [] # Add top-level area if present if area_conf := config.get(CONF_AREA): - all_areas.append((area_conf, None)) + all_areas.append(area_conf) # Add areas from CONF_AREAS list - all_areas.extend((area, CONF_AREAS) for area in config[CONF_AREAS]) + all_areas.extend(config[CONF_AREAS]) # Reserve space for areas and process them if all_areas: cg.add(cg.RawStatement(f"App.reserve_area({len(all_areas)});")) cg.add_define("USE_AREAS") - for area_conf, conf_path in all_areas: - process_area(area_conf, area_hashes, area_ids, conf_path) + for area_conf in all_areas: + process_area(area_conf, area_ids) # Process devices devices: list[dict[str, str | core.ID]] = config[CONF_DEVICES] @@ -552,9 +578,6 @@ async def to_code(config: ConfigType) -> None: device_id_hash = fnv1a_32bit_hash(device_id.id) device_name: str = dev_conf[CONF_NAME] - _verify_no_collisions(device_hashes, device_id, device_id_hash, CONF_DEVICES) - device_hashes[device_id_hash] = device_name - dev = cg.new_Pvariable(device_id) cg.add(dev.set_device_id(device_id_hash)) cg.add(dev.set_name(device_name)) diff --git a/tests/unit_tests/core/test_config.py b/tests/unit_tests/core/test_config.py index d31a66bdf6..11a80e4cc5 100644 --- a/tests/unit_tests/core/test_config.py +++ b/tests/unit_tests/core/test_config.py @@ -172,7 +172,11 @@ def test_area_id_collision( # Check for the specific error message in stdout captured = capsys.readouterr() - assert "ID duplicate_id redefined! Check esphome->area->id." in captured.out + # Since duplicate IDs have the same hash, our hash collision detection catches this + assert ( + "Area ID 'duplicate_id' with hash 1805131238 collides with existing area ID 'duplicate_id'" + in captured.out + ) def test_device_without_area(yaml_file: Callable[[str], str]) -> None: @@ -193,3 +197,47 @@ def test_device_without_area(yaml_file: Callable[[str], str]) -> None: # Verify no area_id is present assert "area_id" not in device + + +def test_device_with_invalid_area_id( + yaml_file: Callable[[str], str], capsys: pytest.CaptureFixture[str] +) -> None: + """Test that device with non-existent area_id fails validation.""" + result = load_config_from_fixture(yaml_file, "device_invalid_area.yaml") + assert result is None + + # Check for the specific error message in stdout + captured = capsys.readouterr() + assert "Couldn't find ID 'nonexistent_area'" in captured.out + + +def test_device_id_hash_collision( + yaml_file: Callable[[str], str], capsys: pytest.CaptureFixture[str] +) -> None: + """Test that device IDs with hash collisions are detected.""" + result = load_config_from_fixture(yaml_file, "device_id_collision.yaml") + assert result is None + + # Check for the specific error message about hash collision + captured = capsys.readouterr() + # The error message shows the ID that collides and includes the hash value + assert ( + "Device ID 'd6ka' with hash 3082558663 collides with existing device ID 'test_2258'" + in captured.out + ) + + +def test_area_id_hash_collision( + yaml_file: Callable[[str], str], capsys: pytest.CaptureFixture[str] +) -> None: + """Test that area IDs with hash collisions are detected.""" + result = load_config_from_fixture(yaml_file, "area_id_hash_collision.yaml") + assert result is None + + # Check for the specific error message about hash collision + captured = capsys.readouterr() + # The error message shows the ID that collides and includes the hash value + assert ( + "Area ID 'd6ka' with hash 3082558663 collides with existing area ID 'test_2258'" + in captured.out + )