From 2b9b7e285379096911829fef9ea4a6981fdb0c33 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 22 Jun 2025 21:18:04 +0200 Subject: [PATCH] validation should happen sooner --- esphome/core/config.py | 142 +++++++++++---------------- tests/unit_tests/core/test_config.py | 5 +- 2 files changed, 62 insertions(+), 85 deletions(-) diff --git a/esphome/core/config.py b/esphome/core/config.py index d08441d3fd..fb658de6b9 100644 --- a/esphome/core/config.py +++ b/esphome/core/config.py @@ -4,8 +4,6 @@ import logging import os from pathlib import Path -import voluptuous as vol - from esphome import automation, core import esphome.codegen as cg import esphome.config_validation as cv @@ -108,60 +106,61 @@ def validate_hostname(config): return config -def validate_id_hash_collisions(config: dict) -> dict: - """Validate that there are no hash collisions between IDs.""" - area_hashes: dict[int, str] = {} +def validate_ids_and_references(config: ConfigType) -> ConfigType: + """Validate that there are no hash collisions between IDs and that area_id references are valid.""" - # Check main area + # Helper to check hash collisions + def check_hash_collision( + id_obj: core.ID, + hash_dict: dict[int, str], + item_type: str, + path: list[str | int], + ) -> bool: + if not id_obj.id: + return False + + hash_val: int = fnv1a_32bit_hash(id_obj.id) + if hash_val in hash_dict and hash_dict[hash_val] != id_obj.id: + raise cv.Invalid( + f"{item_type} ID '{id_obj.id}' with hash {hash_val} collides with " + f"existing {item_type.lower()} ID '{hash_dict[hash_val]}'", + path=path, + ) + hash_dict[hash_val] = id_obj.id + return True + + # Collect all areas + all_areas: list[dict[str, str | core.ID]] = [] if CONF_AREA in config: - area_id: core.ID = config[CONF_AREA][CONF_ID] - if area_id.id: - area_hash = fnv1a_32bit_hash(area_id.id) - area_hashes[area_hash] = area_id.id + all_areas.append(config[CONF_AREA]) + all_areas.extend(config[CONF_AREAS]) - # Check areas list - for area in config[CONF_AREAS]: + # Validate area hash collisions and collect IDs + area_hashes: dict[int, str] = {} + area_ids: set[str] = set() + for area in all_areas: area_id: core.ID = area[CONF_ID] - if not area_id.id: - continue + if check_hash_collision(area_id, area_hashes, "Area", [CONF_AREAS, area_id.id]): + area_ids.add(area_id.id) - area_hash = fnv1a_32bit_hash(area_id.id) - if area_hash not in area_hashes: - area_hashes[area_hash] = area_id.id - continue - - # Skip exact duplicates (handled by IDPassValidationStep) - if area_id.id == area_hashes[area_hash]: - continue - - raise cv.Invalid( - f"Area ID '{area_id.id}' with hash {area_hash} collides with " - f"existing area ID '{area_hashes[area_hash]}'", - path=[CONF_AREAS, area_id.id], - ) - - # Check device hash collisions + # Validate device hash collisions and area references device_hashes: dict[int, str] = {} - for device in config[CONF_DEVICES]: + for i, device in enumerate(config[CONF_DEVICES]): device_id: core.ID = device[CONF_ID] - if not device_id.id: - continue - - device_hash = fnv1a_32bit_hash(device_id.id) - if device_hash not in device_hashes: - device_hashes[device_hash] = device_id.id - continue - - # Skip exact duplicates (handled by IDPassValidationStep) - if device_id.id == device_hashes[device_hash]: - continue - - raise cv.Invalid( - f"Device ID '{device_id.id}' with hash {device_hash} collides " - f"with existing device ID '{device_hashes[device_hash]}'", - path=[CONF_DEVICES, device_id.id], + check_hash_collision( + device_id, device_hashes, "Device", [CONF_DEVICES, device_id.id] ) + # Validate area_id reference if present + if CONF_AREA_ID in device: + area_ref_id: core.ID = device[CONF_AREA_ID] + if area_ref_id.id not in area_ids: + raise cv.Invalid( + f"Device '{device[CONF_NAME]}' has an area_id '{area_ref_id.id}'" + " that does not exist.", + path=[CONF_DEVICES, i, CONF_AREA_ID], + ) + return config @@ -289,7 +288,7 @@ CONFIG_SCHEMA = cv.All( } ), validate_hostname, - validate_id_hash_collisions, + validate_ids_and_references, ) PRELOAD_CONFIG_SCHEMA = cv.Schema( @@ -537,44 +536,25 @@ async def to_code(config: ConfigType) -> None: if config[CONF_PLATFORMIO_OPTIONS]: CORE.add_job(_add_platformio_options, config[CONF_PLATFORMIO_OPTIONS]) - # Helper function to process an area configuration - def process_area( - area_conf: dict[str, str | core.ID], - area_ids: set[str], - ) -> None: - """Process and register an area configuration.""" - area_id: core.ID = area_conf[CONF_ID] - area_id_str: str = area_id.id - area_id_hash = fnv1a_32bit_hash(area_id_str) - area_name: str = area_conf[CONF_NAME] - - area_ids.add(area_id_str) - - area_var = cg.new_Pvariable(area_id) - cg.add(area_var.set_area_id(area_id_hash)) - cg.add(area_var.set_name(area_name)) - cg.add(cg.App.register_area(area_var)) - - # Initialize tracking structures - area_ids: set[str] = set() - - # Collect all areas to process + # Process areas all_areas: list[dict[str, str | core.ID]] = [] - - # Add top-level area if present - if area_conf := config.get(CONF_AREA): - all_areas.append(area_conf) - - # Add areas from CONF_AREAS list + if CONF_AREA in config: + all_areas.append(config[CONF_AREA]) all_areas.extend(config[CONF_AREAS]) - # Reserve space for areas and process them if all_areas: cg.add(cg.RawStatement(f"App.reserve_area({len(all_areas)});")) cg.add_define("USE_AREAS") for area_conf in all_areas: - process_area(area_conf, area_ids) + area_id: core.ID = area_conf[CONF_ID] + area_id_hash: int = fnv1a_32bit_hash(area_id.id) + area_name: str = area_conf[CONF_NAME] + + area_var = cg.new_Pvariable(area_id) + cg.add(area_var.set_area_id(area_id_hash)) + cg.add(area_var.set_name(area_name)) + cg.add(cg.App.register_area(area_var)) # Process devices devices: list[dict[str, str | core.ID]] = config[CONF_DEVICES] @@ -598,12 +578,6 @@ async def to_code(config: ConfigType) -> None: # Set area if specified if CONF_AREA_ID in dev_conf: area_id: core.ID = dev_conf[CONF_AREA_ID] - if area_id.id not in area_ids: - raise vol.Invalid( - f"Device '{device_name}' has an area_id '{area_id.id}'" - " that does not exist.", - path=[CONF_DEVICES, dev_conf[CONF_ID], CONF_AREA_ID], - ) area_id_hash = fnv1a_32bit_hash(area_id.id) cg.add(dev.set_area_id(area_id_hash)) diff --git a/tests/unit_tests/core/test_config.py b/tests/unit_tests/core/test_config.py index ed442b93fa..6a28925dd3 100644 --- a/tests/unit_tests/core/test_config.py +++ b/tests/unit_tests/core/test_config.py @@ -205,7 +205,10 @@ def test_device_with_invalid_area_id( # Check for the specific error message in stdout captured = capsys.readouterr() - assert "Couldn't find ID 'nonexistent_area'" in captured.out + assert ( + "Device 'Test Device' has an area_id 'nonexistent_area' that does not exist." + in captured.out + ) def test_device_id_hash_collision(