1
0
mirror of https://github.com/esphome/esphome.git synced 2025-10-31 07:03:55 +00:00

validation should happen sooner

This commit is contained in:
J. Nick Koston
2025-06-22 21:18:04 +02:00
parent d3b18debf9
commit 2b9b7e2853
2 changed files with 62 additions and 85 deletions

View File

@@ -4,8 +4,6 @@ import logging
import os import os
from pathlib import Path from pathlib import Path
import voluptuous as vol
from esphome import automation, core from esphome import automation, core
import esphome.codegen as cg import esphome.codegen as cg
import esphome.config_validation as cv import esphome.config_validation as cv
@@ -108,58 +106,59 @@ def validate_hostname(config):
return config return config
def validate_id_hash_collisions(config: dict) -> dict: def validate_ids_and_references(config: ConfigType) -> ConfigType:
"""Validate that there are no hash collisions between IDs.""" """Validate that there are no hash collisions between IDs and that area_id references are valid."""
area_hashes: dict[int, str] = {}
# Check main area # Helper to check hash collisions
if CONF_AREA in config: def check_hash_collision(
area_id: core.ID = config[CONF_AREA][CONF_ID] id_obj: core.ID,
if area_id.id: hash_dict: dict[int, str],
area_hash = fnv1a_32bit_hash(area_id.id) item_type: str,
area_hashes[area_hash] = area_id.id path: list[str | int],
) -> bool:
# Check areas list if not id_obj.id:
for area in config[CONF_AREAS]: return False
area_id: core.ID = area[CONF_ID]
if not area_id.id:
continue
area_hash = fnv1a_32bit_hash(area_id.id)
if area_hash not in area_hashes:
area_hashes[area_hash] = area_id.id
continue
# Skip exact duplicates (handled by IDPassValidationStep)
if area_id.id == area_hashes[area_hash]:
continue
hash_val: int = fnv1a_32bit_hash(id_obj.id)
if hash_val in hash_dict and hash_dict[hash_val] != id_obj.id:
raise cv.Invalid( raise cv.Invalid(
f"Area ID '{area_id.id}' with hash {area_hash} collides with " f"{item_type} ID '{id_obj.id}' with hash {hash_val} collides with "
f"existing area ID '{area_hashes[area_hash]}'", f"existing {item_type.lower()} ID '{hash_dict[hash_val]}'",
path=[CONF_AREAS, area_id.id], path=path,
)
hash_dict[hash_val] = id_obj.id
return True
# Collect all areas
all_areas: list[dict[str, str | core.ID]] = []
if CONF_AREA in config:
all_areas.append(config[CONF_AREA])
all_areas.extend(config[CONF_AREAS])
# Validate area hash collisions and collect IDs
area_hashes: dict[int, str] = {}
area_ids: set[str] = set()
for area in all_areas:
area_id: core.ID = area[CONF_ID]
if check_hash_collision(area_id, area_hashes, "Area", [CONF_AREAS, area_id.id]):
area_ids.add(area_id.id)
# Validate device hash collisions and area references
device_hashes: dict[int, str] = {}
for i, device in enumerate(config[CONF_DEVICES]):
device_id: core.ID = device[CONF_ID]
check_hash_collision(
device_id, device_hashes, "Device", [CONF_DEVICES, device_id.id]
) )
# Check device hash collisions # Validate area_id reference if present
device_hashes: dict[int, str] = {} if CONF_AREA_ID in device:
for device in config[CONF_DEVICES]: area_ref_id: core.ID = device[CONF_AREA_ID]
device_id: core.ID = device[CONF_ID] if area_ref_id.id not in area_ids:
if not device_id.id:
continue
device_hash = fnv1a_32bit_hash(device_id.id)
if device_hash not in device_hashes:
device_hashes[device_hash] = device_id.id
continue
# Skip exact duplicates (handled by IDPassValidationStep)
if device_id.id == device_hashes[device_hash]:
continue
raise cv.Invalid( raise cv.Invalid(
f"Device ID '{device_id.id}' with hash {device_hash} collides " f"Device '{device[CONF_NAME]}' has an area_id '{area_ref_id.id}'"
f"with existing device ID '{device_hashes[device_hash]}'", " that does not exist.",
path=[CONF_DEVICES, device_id.id], path=[CONF_DEVICES, i, CONF_AREA_ID],
) )
return config return config
@@ -289,7 +288,7 @@ CONFIG_SCHEMA = cv.All(
} }
), ),
validate_hostname, validate_hostname,
validate_id_hash_collisions, validate_ids_and_references,
) )
PRELOAD_CONFIG_SCHEMA = cv.Schema( PRELOAD_CONFIG_SCHEMA = cv.Schema(
@@ -537,44 +536,25 @@ async def to_code(config: ConfigType) -> None:
if config[CONF_PLATFORMIO_OPTIONS]: if config[CONF_PLATFORMIO_OPTIONS]:
CORE.add_job(_add_platformio_options, config[CONF_PLATFORMIO_OPTIONS]) CORE.add_job(_add_platformio_options, config[CONF_PLATFORMIO_OPTIONS])
# Helper function to process an area configuration # Process areas
def process_area(
area_conf: dict[str, str | core.ID],
area_ids: set[str],
) -> None:
"""Process and register an area configuration."""
area_id: core.ID = area_conf[CONF_ID]
area_id_str: str = area_id.id
area_id_hash = fnv1a_32bit_hash(area_id_str)
area_name: str = area_conf[CONF_NAME]
area_ids.add(area_id_str)
area_var = cg.new_Pvariable(area_id)
cg.add(area_var.set_area_id(area_id_hash))
cg.add(area_var.set_name(area_name))
cg.add(cg.App.register_area(area_var))
# Initialize tracking structures
area_ids: set[str] = set()
# Collect all areas to process
all_areas: list[dict[str, str | core.ID]] = [] all_areas: list[dict[str, str | core.ID]] = []
if CONF_AREA in config:
# Add top-level area if present all_areas.append(config[CONF_AREA])
if area_conf := config.get(CONF_AREA):
all_areas.append(area_conf)
# Add areas from CONF_AREAS list
all_areas.extend(config[CONF_AREAS]) all_areas.extend(config[CONF_AREAS])
# Reserve space for areas and process them
if all_areas: if all_areas:
cg.add(cg.RawStatement(f"App.reserve_area({len(all_areas)});")) cg.add(cg.RawStatement(f"App.reserve_area({len(all_areas)});"))
cg.add_define("USE_AREAS") cg.add_define("USE_AREAS")
for area_conf in all_areas: for area_conf in all_areas:
process_area(area_conf, area_ids) area_id: core.ID = area_conf[CONF_ID]
area_id_hash: int = fnv1a_32bit_hash(area_id.id)
area_name: str = area_conf[CONF_NAME]
area_var = cg.new_Pvariable(area_id)
cg.add(area_var.set_area_id(area_id_hash))
cg.add(area_var.set_name(area_name))
cg.add(cg.App.register_area(area_var))
# Process devices # Process devices
devices: list[dict[str, str | core.ID]] = config[CONF_DEVICES] devices: list[dict[str, str | core.ID]] = config[CONF_DEVICES]
@@ -598,12 +578,6 @@ async def to_code(config: ConfigType) -> None:
# Set area if specified # Set area if specified
if CONF_AREA_ID in dev_conf: if CONF_AREA_ID in dev_conf:
area_id: core.ID = dev_conf[CONF_AREA_ID] area_id: core.ID = dev_conf[CONF_AREA_ID]
if area_id.id not in area_ids:
raise vol.Invalid(
f"Device '{device_name}' has an area_id '{area_id.id}'"
" that does not exist.",
path=[CONF_DEVICES, dev_conf[CONF_ID], CONF_AREA_ID],
)
area_id_hash = fnv1a_32bit_hash(area_id.id) area_id_hash = fnv1a_32bit_hash(area_id.id)
cg.add(dev.set_area_id(area_id_hash)) cg.add(dev.set_area_id(area_id_hash))

View File

@@ -205,7 +205,10 @@ def test_device_with_invalid_area_id(
# Check for the specific error message in stdout # Check for the specific error message in stdout
captured = capsys.readouterr() captured = capsys.readouterr()
assert "Couldn't find ID 'nonexistent_area'" in captured.out assert (
"Device 'Test Device' has an area_id 'nonexistent_area' that does not exist."
in captured.out
)
def test_device_id_hash_collision( def test_device_id_hash_collision(